## Gerar dados com Pyspark no Hadoop e consultá-los com Pyspark no Hadoop

In [1]:
import uuid
import random
import datetime

data = []
for i in range(1000000, 2000000):
    dt = datetime.datetime.fromtimestamp(random.randint(1577847600000000, 1641005999999999) / 1000000)
    data.append(
        {
            "id": str(uuid.uuid4()),
            "code": i,
            "option": "option {0}".format(random.randint(1,5)),
            "description": "description {0}".format(i),
            "value": random.gauss(400, 50),
            "rate": random.random(),
            "created_at": dt,
            "updated_at": dt,
            "status": True if random.randint(0,1) == 1 else False,
            "year": dt.strftime('%Y'),
            "month": dt.strftime('%m')
        }
    )

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[2]') \
    .config('spark.executor.memory', '2g') \
    .config('spark.jars.packages', 'org.apache.spark:spark-avro_2.12:3.2.0') \
    .config('spark.sql.avro.compression.codec', 'snappy') \
    .getOrCreate()

21/12/22 22:53:37 WARN Utils: Your hostname, ubuntu20-vm-01 resolves to a loopback address: 127.0.1.1; using 192.168.15.185 instead (on interface enp0s3)
21/12/22 22:53:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/danilo/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/danilo/.ivy2/cache
The jars for the packages stored in: /home/danilo/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2ea5882e-2e7c-4ce1-892d-8b27c904ae52;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.2.0 in central
	found org.tukaani#xz;1.8 in central
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 285ms :: artifacts dl 10ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.2.0 from central in [default]
	org.spark-project.spark#unused;1.0.0 from central in [default]
	org.tukaani#xz;1.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     

In [3]:
from pyspark.sql.types import *

schema = StructType(
    [
        StructField('id', StringType(), True),
        StructField('code', LongType(), True),
        StructField('option', StringType(), True),
        StructField('description', StringType(), True),
        StructField('value', DoubleType(), True),
        StructField('rate', DoubleType(), True),
        StructField('created_at', TimestampType(), True),
        StructField('updated_at', TimestampType(), True),
        StructField('status', BooleanType(), True),
        StructField('year', StringType(), True),
        StructField('month', StringType(), True)
    ]
)

In [4]:
df = spark.createDataFrame(data, schema)
df.write \
    .format('avro') \
    .option('compression', 'snappy') \
    .partitionBy("year", "month") \
    .save('hdfs://dataserver:9000/warehouse', mode="append")
    #.option("maxRecordsPerFile", 1000) \

21/12/22 22:54:13 WARN TaskSetManager: Stage 0 contains a task of very large size (62978 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [5]:
df = spark.read.format("avro").load("hdfs://dataserver:9000/warehouse")

In [6]:
df.count()

                                                                                

1000000

In [7]:
spark.stop()

## Consultar dados no Hadoop com Drill 

In [8]:
import logging
import urllib
import json
import re
import io
import psycopg2

In [9]:
def read_file_system(file_system_query, parameter):
    try:
        url = 'http://localhost:8047/status'
        method = 'GET'
        request = urllib.request.Request(url=url, method=method)
        with urllib.request.urlopen(request) as f:
            response = f.read().decode('utf-8')
        if not re.search('Running!', response):
            raise Exception('Apache Drill not is running!')
            
        url = 'http://localhost:8047/query.json'
        data = json.dumps({'queryType': 'SQL', 'query': file_system_query.format(**parameter)}).encode('utf-8')
        headers = {'Content-Type': 'application/json'}
        method = 'POST'
        request = urllib.request.Request(url=url, data=data, headers=headers, method=method)
        with urllib.request.urlopen(request) as f:
            response = f.read().decode('utf-8')
            
        if json.loads(response)['queryState'] == 'FAILED':
            raise Exception('Query failed!')
            
        return json.loads(response)['rows']
            
    except Exception as e:
        raise Exception(e)

In [10]:
def json_to_csv(data):
    output = io.StringIO()
    string = []
    for row in data:
        for value in row:
            if row[value]:
                string.append(str(row[value]))
            else:
                string.append('')
        output.write(';'.join(string) + '\n')
        string = []
    output.seek(0)
    return output

In [11]:
def load_pgsql(stage_table_creation, output, table_merge):
    try:
        connection = psycopg2.connect(host='192.168.43.3',port='5432',dbname='dw_pags',user='postgres',password='123456')
        cursor = connection.cursor()
        
        cursor.execute(stage_table_creation)
        connection.commit()
        
        cursor.copy_from(file=output, table='{0}'.format(re.search('table ([a-zA-Z0-9_-]+)', stage_table_creation).group(1)), sep=';', null='')
        connection.commit()
        
        cursor.execute(table_merge)
        connection.commit()
        
        cursor.close()
        connection.close()
    except Exception as e:
        raise Exception(e)

In [13]:
import datetime

if __name__ == '__main__':
    
    logging.basicConfig(level=logging.CRITICAL, format='[%(asctime)s %(levelname)s %(name)s] %(message)s')
    logger = logging.getLogger(__name__)
    
    ################################################### parameter ###################################################
    interval = 86400
    period = 2
    end_interval = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

    while end_interval < datetime.datetime.now():
        end_interval += datetime.timedelta(seconds=interval)
    start_interval = end_interval - datetime.timedelta(seconds=interval)

    parameter = []
    for i in range(period):
        parameter.append(
            {
                "start_interval": (start_interval - datetime.timedelta(seconds=interval * i)).strftime('%Y-%m-%d %H:%M:%S'),
                 "end_interval": (end_interval - datetime.timedelta(seconds=interval * i) - datetime.timedelta(seconds=0)).strftime('%Y-%m-%d %H:%M:%S')
            }
        )
    #################################################################################################################

    file_system_query = """
        select code as nk
             , cast(round(value, 0) as integer) as metric
             , to_char(created_at, 'yyyy-MM-dd HH:mm:ss.SSS') as created_at
          from hdfs.`warehouse`
         where created_at >= '{start_interval}' and created_at < '{end_interval}'
    """
    
    stage_table_creation = """
        create temp table sgt_test(
            nk integer unique,
            metric bigint,
            created_at timestamp
        )
    """
    
    table_merge = """
        insert into dim_test(nk, metric, created_at)
            select nk
                 , metric
                 , created_at
              from sgt_test
        on conflict(nk)
        do update set metric = excluded.metric, created_at = excluded.created_at    
    """
    
    try:
        for value in parameter:
            data = read_file_system(file_system_query, value)
            if data:
                output = json_to_csv(data)
                load_pgsql(stage_table_creation, output, table_merge)
            else:
                logger.warning('Query returns empty!')
    except Exception as e:
        logger.critical(e)