In [None]:
!pip install google-cloud-bigquery
!pip install pyhive
!pip install --user Flask
!pip install firebirdsql
!pip install fdb
!pip install unidecode

# bibliotecas manipulação de dados 
from pyhive import presto
from google.cloud.bigquery.client import Client
import pandas as pd
import numpy as np
import firebirdsql
import fdb
import os
import unidecode
from sqlalchemy import *
import sqlalchemy.dialects.sqlite
import time
from multiprocessing import Pool

In [15]:
## BLOCO CRIAÇÃO DATAFRAME ##


#Função que cria tabela espelhada da camada raw, porém ordenada.
def CREATE_TABLE_ORDENED():
    conn = presto.connect(host = 'datalake-1', port = 18080, username = 'root')
    cursor = conn.cursor()
    cursor.execute("drop table if exists hive.trusted.table_t_cliente_representante_ordened")
    delete = cursor.fetchall()
    print(delete)
    cursor.execute("create table hive.trusted.table_t_cliente_representante_ordened as select * from hive.raw.table_t_cliente_representante order by cast(id_pessoa_cliente as BIGINT)")
    create = cursor.fetchall()
    cursor.execute("select distinct * from hive.trusted.table_t_cliente_representante_ordened limit 1")
    rows = cursor.fetchall()
    cursor.execute("describe hive.trusted.table_t_cliente_representante_ordened")
    colunas_db = cursor.fetchall()            
    colunas_trt = []
    for coluna in colunas_db:
        colunas_trt.append(coluna[0])   
    df_ordened = pd.DataFrame(data=rows, columns=colunas_trt)
    return df_ordened
    
    
# Função que cria o df_trt a partir da tabela na camada raw.
def CREATE_DATAFRAME_RAW(offset):
    conn = presto.connect(host = 'datalake-1', port = 18080, username = 'root')
    cursor = conn.cursor()
    cursor.execute("select distinct * from hive.trusted.table_t_cliente_representante_ordened offset {} limit 2000".format(offset))
    rows = cursor.fetchall()
    cursor.execute("describe hive.trusted.table_t_cliente_representante_ordened")
    colunas_db = cursor.fetchall()            
    colunas_trt = []
    for coluna in colunas_db:
        colunas_trt.append(coluna[0])   
    df_trt = pd.DataFrame(data=rows, columns=colunas_trt)
    return df_trt

# Função que cria o df_trusted a partir do df_trt.
def CREATE_DATAFRAME_TRUSTED(df_trt):
    colunas_trt = df_trt.columns.values
    colunas_trt_upper = []
    colunas_trusted = {}
    for coluna in colunas_trt:
        colunas_trt_upper.append(coluna.upper())
    i = 0
    while i != len(colunas_trt):
        colunas_trusted[colunas_trt[i]] = colunas_trt_upper[i]
        i = i + 1
    df_trusted = df_trt.rename(columns=colunas_trusted)
    return df_trusted  


In [16]:
## BLOCO HIGIENIZAÇÃO ##

# Função que transforma todas as colunas em caixa alta.
def TRANSFORM_COLUMN_TO_UPPERCASE(df_trusted):
    columns = df_trusted.columns.values
    for coluna in columns:
        df_trusted[coluna] =  df_trusted[coluna].str.upper()
    return df_trusted

# Função que remove todas as acentuações.
def NORMALIZE(df_trusted):
    columns = df_trusted.columns.values
    for coluna in columns:
        df_trusted[coluna] = 
        df_trusted[coluna].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')
    return df_trusted


In [17]:
## BLOCO TRUSTED ##

# Função que cria a tabela na camada Trusted a partir do df_trusted.
def CREATE_TABLE_TRUSTED(df_ordened):
    colunas = df_ordened.columns.values
    coluna_new = "CREATE TABLE IF NOT EXISTS hive.trusted.table_t_cliente_representante_hig("
    for coluna in colunas:
        coluna_new = coluna_new + coluna + ' varchar, '    
    formato = ')'
    create_table = coluna_new[:-2] + formato
    conn = presto.connect(host = 'datalake-1', port = 18080, username = 'root')
    cursor = conn.cursor()
    cursor.execute(create_table)
    resp = cursor.fetchall()  
    print(resp)
    return 'Criado com sucesso'

# Função que insere os dados contidos no df_trusted para a tabela produtos_hig na camada Trusted.
def INSERT_DF_TO_TRUSTED(df_trusted):
    try:
        engine = create_engine('presto://root@datalake-1:18080/hive', echo=False)
        df_trusted.to_sql('table_t_cliente_representante_hig', schema='trusted', con=engine, if_exists='append', index=False, method='multi')
        return df_trusted
    except:
        return df_trusted

In [18]:
## BLOCO CRIA FUNÇÃO TRANSFORMATION E PARALLEIZE ##

#Função que junta todos os scripts de transformação
def TRANSFORMATIONS(df_trusted):
    df_trusted = TRANSFORM_COLUMN_TO_UPPERCASE(df_trusted=df_trusted)
    df_trusted = NORMALIZE(df_trusted=df_trusted)
    df_trusted = INSERT_DF_TO_TRUSTED(df_trusted=df_trusted)
    return df_trusted

#Função que paraleliza as transformações.
def PARALLEIZE_DATAFRAME(df, func, n_cores=2):
    try:
        df_split = np.array_split(df, n_cores)
        pool = Pool(n_cores)
        df = pd.concat(pool.map(func, df_split))
        pool.close()
        pool.join()
        return df
    except:
        print('erro PARALLEIZE_DATAFRAME')
        return df

In [19]:
## BLOCO EXECUTÁVEL ##

def EXECUTE():
    df_ordened = CREATE_TABLE_ORDENED()
    CREATE_TABLE_TRUSTED(df_ordened=df_ordened)
    conn = presto.connect(host = 'datalake-1', port = 18080, username = 'root')
    cursor = conn.cursor()
    cursor.execute("select count(*) from hive.trusted.table_t_cliente_representante_hig")    
    offset = cursor.fetchall()[0][0]
    cursor.execute("select count(*) from hive.raw.table_t_cliente_representante")  
    tamanho_raw = cursor.fetchall()[0][0]
    while offset < tamanho_raw:
        i = 0
        while i == 0:
            try:
                df_trt = CREATE_DATAFRAME_RAW(offset)
                df_trusted = CREATE_DATAFRAME_TRUSTED(df_trt = df_trt)
                df_trusted = PARALLEIZE_DATAFRAME(df=df_trusted, func=TRANSFORMATIONS)
                i = 1
            except:
                print("Algo deu errado, vamos tentar novamente em 5 minutos :)")
                time.sleep(300)

        tamanho_df = len(df_trusted)    
        tamanho_trusted = offset + tamanho_df
        offset = offset + 2000
        print('{} linhas inseridas :), tamanho df: {}'.format(tamanho_trusted, tamanho_df))
    return 'TRATAMENTOS/INSERÇÕES FINALIZADAS'

In [20]:
EXECUTE()

336000 linhas inseridas :), tamanho df: 2000
338000 linhas inseridas :), tamanho df: 2000
340000 linhas inseridas :), tamanho df: 2000
342000 linhas inseridas :), tamanho df: 2000
344000 linhas inseridas :), tamanho df: 2000
346000 linhas inseridas :), tamanho df: 2000
348000 linhas inseridas :), tamanho df: 2000
350000 linhas inseridas :), tamanho df: 2000
352000 linhas inseridas :), tamanho df: 2000
354000 linhas inseridas :), tamanho df: 2000
356000 linhas inseridas :), tamanho df: 2000
358000 linhas inseridas :), tamanho df: 2000
360000 linhas inseridas :), tamanho df: 2000
362000 linhas inseridas :), tamanho df: 2000
364000 linhas inseridas :), tamanho df: 2000
366000 linhas inseridas :), tamanho df: 2000
368000 linhas inseridas :), tamanho df: 2000
370000 linhas inseridas :), tamanho df: 2000
372000 linhas inseridas :), tamanho df: 2000
374000 linhas inseridas :), tamanho df: 2000
376000 linhas inseridas :), tamanho df: 2000
378000 linhas inseridas :), tamanho df: 2000
380000 lin

'TRATAMENTOS/INSERÇÕES FINALIZADAS'