__Conhecendo o repositório + Case de manipulação__

Implementaremos algumas das funções que estão nos scripts do repositório com objetivo de absorver melhor sua lógica.
À seguir, chamaremos as principais classes do repositório configurando as credenciais e preparando as interfaces com bases de dados, e em seguida extrairemos a lista de datas já carregadas de um determinado contexto.

IMPORTANTE: É necessário seguir o contexto indicado para que o controle de valores de um ETL não afete o outro.

In [None]:
pwd

__Como fazer consultas à API do data lake sem usar as classes do repositório__ (implementaremos apenas o que é mais trivial).

In [None]:
import requests

username = 'atelles'
password = '' # pegar nos slides de setup
endpoint = 'http://prestoapides.olxdev.io:5000/api/query'
query = 'select * from ods.dm_category'
content_type ="text/plain"
header = {'Content-Type':content_type}

In [None]:
session = requests.Session()
r = session.post(self.endpoint,auth = (self.username,self.password), headers = self.header, data = self.query)

In [None]:
json_data = r.json()
df = pd.DataFrame(json_data)
print(df.head())

__Replicaremos agora o de forma simples a função de controle de valores__

In [None]:
import sys, os, traceback
#change to the utils folder
sys.path.append(os.path.abspath(os.path.join('..', 'utils')))

from query_executor import query
import config
import pandas as pd
import re
import math
import time
import emailutil
#import math
import numpy as np
from datetime import timedelta
from get_values_to_load import get_days_to_load
import string_manipulators

# get configuration info from config.ini through config.py
pg_config_dict = config.get_all_as_dict('postgres')
presto_config_dict = config.get_all_as_dict('presto')

# instantiate classes and configure connections
postgres_executor = query(pg_config_dict)
presto_executor = query(presto_config_dict)

date_log_table = 'etl_class.date_log'
context = 'assignment1'

log_query = f"""select date from {date_log_table} where context = '{context}' order by date desc"""

pg_result = postgres_executor.query_jdbc(log_query)

already_loaded_dates = [pd.to_datetime(date).date() for date in pg_result['date'].values]

Como podem observar, todos estão com as datas 26/08 e 28/08 salvas.
Agora geraremos o range de datas a serem carregadas.

In [4]:
start_date = pd.to_datetime('2018-08-01')
end_date = pd.to_datetime('2018-09-15')
desired_load_list = pd.date_range(start=start_date, end=end_date).date.tolist()

Nessa última etapa, checamos quais datas foram carregadas daquelas que estão na lista das desejadas pelo ETL.

In [None]:
dates_to_load = [date for date in desired_load_list if date not in already_loaded_dates]

In [None]:
dates_to_load

__Abordaremos agora a criação de partições a partir de um range de datas, tarefa comum no nosso ambiente de dados com tabelas particionadas__

In [None]:
start_d = pd.to_datetime('2018-12-30')
end_d = pd.to_datetime('2019-02-10')
desired_date_list = pd.date_range(start=start_d, end=end_d).date.tolist()
        
dates = pd.DataFrame(desired_date_list,columns=['dates']) 

Usaremos o índice do dataframe como referência para agrupar os dias que estão na coluna day por mês e ano, concatenando todos os dias com vírgula

In [None]:
dates['day'] = dates['dates'].apply(lambda x: str(x.day))
dates['month'] = dates['dates'].apply(lambda x: str(x.month))
dates['year'] = dates['dates'].apply(lambda x: str(x.year))
grouped = dates.groupby(['year','month'])['day'].apply(lambda x: "%s" % ', '.join(x))

Iteraremos agora sobre o dataframe construindo a string de acordo com o agrupamento feito anteriormente, usando o índice para construir ano e mes e os valores com os dias agregados para a lista de dias.

In [None]:
month_count = 0
for year,month in list(grouped.index):
    if month_count == 0:
        date_string = "(year = " + str(year) + " and month = " + str(month) + " and day in (" \
        + str(grouped[year,month]) + "))"
    else:
        date_string += " or (year = " + str(year) + " and month = " + str(month) + " and day in (" \
        + str(grouped[year,month]) + "))"
    month_count += 1
date_string = "(" + date_string + ")"

In [None]:
date_string

### __Case de manipulação de dados com pandas__

Importando as bibliotecas necessárias e armazenando as credenciais e queries em memória.

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Dec  6 16:53:57 2018

@author: arthur.telles
"""

import sys, os, traceback
#change to the utils folder
sys.path.append(os.path.abspath(os.path.join('..', 'utils')))

from query_executor import query
import config
import pandas as pd
import re
import math
import time
import emailutil
import numpy as np
from datetime import timedelta
from get_values_to_load import get_days_to_load
import string_manipulators

initial_time = time.time()

# get configuration info from config.ini through config.py
pg_config_dict = config.get_all_as_dict('postgres')
presto_config_dict = config.get_all_as_dict('presto')

# instantiate classes and configure connections
postgres_executor = query(pg_config_dict)
presto_executor = query(presto_config_dict)

query_to_execute = """select approval_date,
b.list_id_nk,
reason_removed_detail_name,
state_name,
case when b.category_id_fk = 46 then 'Autos'
     when b.category_id_fk in (79,40,86,44,3,2,80) then 'RE'
     when b.category_id_fk is null then 'no cat'
     else 'Other' end as category,
account_id_fk,
price
from ods.ad b
join ods.dm_area c on b.area_id_fk=c.area_id_pk
join ods.dm_reason_removed_detail d on b.reason_removed_detail_id_fk=d.reason_removed_detail_id_pk
where {DATE_CLAUSE}"""

# ---- Define these variables ---- #
etl_context = 'assignment12'
schema_to_load = 'etl_class'
table_in_schema_to_load = 'assignment'
# -------------------------------- #


print("\n Running ETL # -------------- # \n ")


Chamaremos a função que vai carregar os dias indicados pelo contexto escolhido anteriormente, em seguida será feita a exploração de dados.

In [None]:
date = pd.to_datetime('2018-08-25')
dates_to_load, last_day_loaded = get_days_to_load(etl_context,'etl_class.date_log',\
                                                    'etl_class.config_params',\
                                                    last_day=pd.to_datetime('2018-09-01'))

# Loading at D-1
date_clause_string, today_date_string = string_manipulators.generate_date_string(date - timedelta(days=1),False)

edited_query = string_manipulators.substitute_params_in_string(['{DATE_CLAUSE}'],\
                                            [date_clause_string],query_to_execute)

presto_result = presto_executor.query_request(edited_query)

pg_result = postgres_executor.query_jdbc("select * from etl_class.accounts_table")

In [None]:
pg_result.describe()

In [None]:
presto_result.describe(include='all')

In [None]:
pg_result.describe(include='all')

Próximo passo é executar o join e avaliar quais ids não estão presentes no segundo dataframe.

In [None]:
result = pd.merge(presto_result,pg_result,how='left',on='account_id_fk')

É necessário restringir os dados para ads aprovados em 2018-08-25 pois o year month day não limita todos os approval dates a essa data.

In [None]:
result2 = result[(~pd.isnull(result['gender'])) & (result['approval_date'] == '2018-08-25')]

Exemplo de uso da função apply: Conversão de todos os valores 't' e 'f' para True e False, com objetivo de otimizar o armazenamento desses dados. Operações feitas com dados booleanos se tornam exponencialmente mais rápidas que quando armazenados como string.

In [None]:
result2['is_company'] = result2['is_company'].apply(lambda x: True if x == 't' else False)

Agruparemos então os dados com a função groupby, semelhante ao SQL, para trazer um pouco do processamento para memória e então extrair a métrica que desejávamos no começo.

In [None]:
result3 = result2.groupby(['approval_date','state_name','gender','is_company'])['account_id_fk'].\
nunique().reset_index()

Renomearemos e checaremos o formato das colunas para bater com o nome e o formato no banco. Em seguida, agruparemos também pelo list_id para retirar a quantidade de listings únicos em cada grupo.

In [None]:
result3.columns = ['approval_date', 'state_name', 'gender', 'is_company', 'accounts']
result3['listings'] = result2.groupby(['approval_date','state_name','gender','is_company'])['list_id_nk'].\
nunique().reset_index()['list_id_nk']

Vamos fazer um teste de inserção para ver se os dados estão ok e se o banco está apto a recebê-los sem erro.

In [None]:
result = result3

if result.shape[0] > 100:
    postgres_executor.insert_chunks_with_progress(result, table_in_schema_to_load, schema_to_load,\
                                                if_exists='append')
else:
    postgres_executor.insert_jdbc(result,table_in_schema_to_load,schema_to_load, if_exists='append')

date_log_data = pd.DataFrame(data={"date": date,
                    "load_date": str(pd.to_datetime('now').tz_localize('UTC').tz_convert('America/Sao_Paulo')\
                                .tz_localize(None)),
                    "context": etl_context}, index=[0])
postgres_executor.insert_jdbc(date_log_data,'date_log','etl_class', if_exists='append')

print('No more days to load!')

# gravar no log o tempo para execução
print("Total Elapsed Time: {} minutes and {} seconds.".format(str(math.floor((time.time() - initial_time)/60)),\
  str(time.time() - initial_time - math.floor((time.time() - initial_time)/60)*60))) 
