In [1]:
import os
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, exists
from sqlalchemy.orm import sessionmaker, declarative_base
import configparser

In [2]:
# Leitura das configurações do banco de dados do arquivo .ini
config = configparser.ConfigParser()
config.read('C:/Users/User/Documents/GitHub/processoseletivo-teste/airflow-docker/dags/etl_scripts/config.ini')

['C:/Users/User/Documents/GitHub/processoseletivo-teste/airflow-docker/dags/etl_scripts/config.ini']

In [3]:
# Configuração do banco de dados do data warehouse
user = config.get('Database', 'user')
password = config.get('Database', 'password')
host = config.get('Database', 'host')
port = config.get('Database', 'port')
database_dw = config.get('Database', 'database_dw')
database_dl = config.get('Database', 'database_dl')

In [4]:
# String de conexão para o banco de dados do data warehouse e data lake
string_conexao_dw = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database_dw}'
string_conexao_dl = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database_dl}'

In [5]:
engine_dw = create_engine(string_conexao_dw)

# Criação da engine do SQLAlchemy para o data lake
engine_dl = create_engine(string_conexao_dl)

# Criação da tabela da dimensão Departamento
Base = declarative_base()

In [41]:
# Pegando o último registro
query_ultimo_resgistro = '''SELECT data_criacao, hora_criacao
FROM public.ft_atendimentos
ORDER BY data_criacao DESC, hora_criacao DESC
LIMIT 1;
;
'''
df_ultimo_registro = pd.read_sql_query(query_ultimo_resgistro, engine_dw)
df_ultimo_registro

Unnamed: 0,data_criacao,hora_criacao
0,2023-01-31,21:59:00


In [42]:
data_ultimo_registro = df_ultimo_registro['data_criacao'][0].strftime('%Y-%m-%d')
data_ultimo_registro

'2023-01-31'

In [43]:
hora_ultimo_registro = df_ultimo_registro['hora_criacao'][0].strftime('%H:%M')
hora_ultimo_registro

'21:59'

In [50]:
# Consultar dados de atendimentos da tabela no Data Lake
query_atendimentos = f'''
SELECT * FROM public.atendimentos WHERE TO_DATE(data_criacao, 'DD/MM/YYYY') >= CAST('{data_ultimo_registro}' AS date)
    and  cast(hora_criacao as time) > cast('{hora_ultimo_registro}' as time)
 '''
query_atendimentos
#df_atendimentos = pd.read_sql_query(query_atendimentos, engine_dl)
#df_atendimentos

"\nSELECT * FROM public.atendimentos WHERE TO_DATE(data_criacao, 'DD/MM/YYYY') >= CAST('2023-01-31' AS date)\n    and  cast(hora_criacao as time) > cast('21:59' as time)\n "

In [51]:
# Consultar dados de atendimentos da tabela no Data Lake
df_atendimentos = pd.read_sql_query(query_atendimentos, engine_dl)
df_atendimentos

Unnamed: 0,agente,associacao,data_criacao,data_transferencia,data_finalizacao,protocolo,telefone,tempo_de_atendimento_horas,tempo_de_atendimento_minutos,tempo_de_atendimento_segundos,...,tempo_em_fila_minutos,tempo_em_fila_segundos,data_ultima_msg_agente,data_ultima_msg_cliente,Departamento,hora_criacao,hora_finalizacao,hora_transferencia,hora_ultima_msg_agente,hora_ultima_msg_cliente


In [52]:
# Adicionar coluna de DDD à dimensão cidade
df_atendimentos['ddd_cidade'] = df_atendimentos['telefone'].str[:2]
df_atendimentos['ddd_cidade']

Series([], Name: ddd_cidade, dtype: object)

In [None]:
# Mapear DDD para cidade (usando a dimensão cidade)
df_cidades = pd.read_sql_query('SELECT * FROM dim_cidade', engine_dw)
df_cidades

In [None]:
# Converter a coluna 'ddd_cidade' para o tipo int64
df_atendimentos['ddd_cidade'] = df_atendimentos['ddd_cidade'].astype('int64')
df_atendimentos_fato = df_atendimentos.merge(df_cidades, left_on='ddd_cidade', right_on='ddd', how='left')
df_atendimentos_fato

In [None]:
# Mapear agente
df_agentes = pd.read_sql_query('SELECT * FROM dim_agente', engine_dw)
df_agentes

In [None]:
df_atendimentos_fato['agente'] = df_atendimentos_fato['agente'].fillna('<chatbot>')
df_atendimentos_fato = df_atendimentos_fato.merge(df_agentes, left_on='agente', right_on='nm_agente', how='left')
df_atendimentos_fato

In [None]:
# Mapear data
df_datas = pd.read_sql_query('SELECT sk_data, data FROM public.dim_calendario;', engine_dw)
df_datas

In [None]:
df_data_criacao = df_datas.rename(columns={'sk_data':'sk_data_criacao','data':'data_criacao'})
df_data_criacao

In [None]:
df_data_finalizacao = df_datas.rename(columns={'sk_data':'sk_data_finalizacao','data':'data_finalizacao'})
df_data_finalizacao

In [None]:
df_data_transferencia = df_datas.rename(columns={'sk_data':'sk_data_transferencia','data':'data_transferencia'})
df_data_transferencia

In [None]:
df_data_ultima_msg_agente = df_datas.rename(columns={'sk_data':'sk_data_ultima_msg_agente','data':'data_ultima_msg_agente'})
df_data_ultima_msg_agente

In [None]:
df_data_ultima_msg_cliente = df_datas.rename(columns={'sk_data':'sk_data_ultima_msg_cliente','data':'data_ultima_msg_cliente'})
df_data_ultima_msg_cliente

In [None]:
df_atendimentos_fato.data_transferencia = pd.to_datetime(df_atendimentos_fato.data_transferencia, format="%d/%m/%Y %H:%M:%S").dt.date
df_atendimentos_fato.data_finalizacao = pd.to_datetime(df_atendimentos_fato.data_finalizacao, format="%d/%m/%Y %H:%M:%S").dt.date
df_atendimentos_fato.data_criacao = pd.to_datetime(df_atendimentos_fato.data_criacao, format="%d/%m/%Y").dt.date
df_atendimentos_fato.data_ultima_msg_agente = pd.to_datetime(df_atendimentos_fato.data_ultima_msg_agente, format="%d/%m/%Y %H:%M:%S").dt.date
df_atendimentos_fato.data_ultima_msg_cliente = pd.to_datetime(df_atendimentos_fato.data_ultima_msg_cliente, format="%d/%m/%Y %H:%M:%S").dt.date
df_atendimentos_fato.hora_ultima_msg_agente  = pd.to_datetime(df_atendimentos_fato.hora_ultima_msg_agente, format="%H:%M").dt.time              
df_atendimentos_fato.hora_ultima_msg_cliente  = pd.to_datetime(df_atendimentos_fato.hora_ultima_msg_cliente, format="%H:%M").dt.time               
df_atendimentos_fato.hora_criacao = pd.to_datetime(df_atendimentos_fato.hora_criacao, format="%H:%M").dt.time
df_atendimentos_fato.hora_finalizacao = pd.to_datetime(df_atendimentos_fato.hora_finalizacao, format="%H:%M").dt.time
df_atendimentos_fato.hora_transferencia = pd.to_datetime(df_atendimentos_fato.hora_transferencia, format="%H:%M").dt.time
df_atendimentos_fato

data_ultima_msg_cliente = df_atendimentos_fato.merge(df_data_criacao, left_on='data_criacao', right_on='data_criacao', how='left')
data_ultima_msg_cliente

df_atendimentos_fato = df_atendimentos_fato.merge(df_data_finalizacao, left_on='data_finalizacao', right_on='data_finalizacao', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_data_transferencia, left_on='data_transferencia', right_on='data_transferencia', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_data_ultima_msg_agente, left_on='data_ultima_msg_agente', right_on='data_ultima_msg_agente', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_data_data_ultima_msg_cliente, left_on='df_atendimentos_fato', right_on='df_atendimentos_fato', how='left')
df_atendimentos_fato

# Mapear tempo (horas)
df_tempos = pd.read_sql_query('SELECT sk_hora_dia, hora_do_dia	FROM dim_horas;', engine_dw)
df_tempos

df_tempo_criacao = df_tempos.rename(columns={'sk_hora_dia':'sk_hora_criacao','hora_do_dia':'hora_criacao'})
df_tempo_criacao

df_tempo_finalizacao = df_tempos.rename(columns={'sk_hora_dia':'sk_hora_finalizacao','hora_do_dia':'hora_finalizacao'})
df_tempo_finalizacao

df_tempo_transferencia = df_tempos.rename(columns={'sk_hora_dia':'sk_hora_transferencia','hora_do_dia':'hora_transferencia'})
df_tempo_transferencia

df_tempo_ultima_msg_agente = df_tempos.rename(columns={'sk_hora_dia':'sk_hora_ultima_msg_agente','hora_do_dia':'hora_ultima_msg_agente'})
df_tempo_ultima_msg_agente

df_tempo_ultima_msg_cliente = df_tempos.rename(columns={'sk_hora_dia':'sk_hora_ultima_msg_cliente','hora_do_dia':'hora_ultima_msg_cliente'})
df_tempo_ultima_msg_cliente

df_atendimentos_fato = df_atendimentos_fato.merge(df_tempo_criacao, left_on='hora_criacao', right_on='hora_criacao', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_tempo_finalizacao, left_on='hora_criacao', right_on='hora_finalizacao', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_tempo_transferencia, left_on='hora_criacao', right_on='hora_transferencia', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_tempo_ultima_msg_agente, left_on='hora_criacao', right_on='hora_ultima_msg_agente', how='left')
df_atendimentos_fato

df_atendimentos_fato = df_atendimentos_fato.merge(df_tempo_ultima_msg_cliente, left_on='hora_criacao', right_on='hora_ultima_msg_cliente', how='left')
df_atendimentos_fato

In [None]:
# Mapear departamento
df_departamentos = pd.read_sql_query('SELECT * FROM dim_departamento', engine_dw)
df_atendimentos_fato = df_atendimentos_fato.merge(df_departamentos, left_on='Departamento', right_on='nm_departamento', how='left')


In [None]:
df_atendimentos_fato.info()

In [None]:
df_atendimentos_fato.info()

In [None]:
tempo_de_atendimentos_horas = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_horas,'h')
tempo_de_atendimento_minutos = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_minutos,'m')
tempo_de_atendimento_segundos = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_segundos,'s')

total_tempo_de_atendimentos = tempo_de_atendimentos_horas + tempo_de_atendimento_minutos + tempo_de_atendimento_segundos

df_atendimentos_fato['tempo_de_atendimentos'] = total_tempo_de_atendimentos

In [None]:
tempo_de_atendimento_humano_horas = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_humano_horas,'h')
tempo_de_atendimento_humano_minutos = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_humano_minutos,'m')
tempo_de_atendimento_humano_segundos = pd.to_timedelta(df_atendimentos_fato.tempo_de_atendimento_humano_segundos,'s')

tempo_de_atendimento_humano = tempo_de_atendimento_humano_horas + tempo_de_atendimento_humano_minutos + tempo_de_atendimento_humano_segundos

df_atendimentos_fato['tempo_de_atendimento_humano'] = tempo_de_atendimento_humano

In [None]:
df_atendimentos_fato['tempo_atendimento_automatizado'] = total_tempo_de_atendimentos - tempo_de_atendimento_humano

In [None]:
tempo_em_fila_horas = pd.to_timedelta(df_atendimentos_fato.tempo_em_fila_horas,'h')
tempo_em_fila_minutos = pd.to_timedelta(df_atendimentos_fato.tempo_em_fila_minutos,'m')
tempo_em_fila_segundos = pd.to_timedelta(df_atendimentos_fato.tempo_em_fila_segundos,'s')

tempo_em_fila = tempo_em_fila_horas + tempo_em_fila_minutos + tempo_em_fila_segundos

df_atendimentos_fato['tempo_em_fila'] = tempo_em_fila

In [None]:
tempo_em_espera_horas = pd.to_timedelta(df_atendimentos_fato.tempo_em_espera_horas,'h')
tempo_em_espera_minutos = pd.to_timedelta(df_atendimentos_fato.tempo_em_espera_minutos,'m')
tempo_em_espera_segundos = pd.to_timedelta(df_atendimentos_fato.tempo_em_espera_segundos,'s')

tempo_em_espera = tempo_em_espera_horas + tempo_em_espera_minutos + tempo_em_espera_segundos

df_atendimentos_fato['tempo_de_espera'] = tempo_em_espera

In [None]:
df_atendimentos_fato = df_atendimentos_fato.drop(columns=['nm_agente','agente','Departamento','nm_departamento',
                                                          'nk_cidade','nm_cidade','uf','ddd','data_inicio','data_fim',
                                                          'ddd_cidade','tempo_de_atendimento_horas','tempo_de_atendimento_minutos',
                                                         'tempo_de_atendimento_segundos','tempo_de_atendimento_humano_horas',
                                                         'tempo_de_atendimento_humano_minutos','tempo_de_atendimento_humano_segundos',
                                                         'tempo_em_fila_horas','tempo_em_fila_minutos','tempo_em_fila_segundos',
                                                          'tempo_em_espera_horas','tempo_em_espera_minutos','tempo_em_espera_segundos'])

In [None]:
orderm_columns = ['sk_agente','sk_departamento','sk_cidade','associacao',
                                                            'protocolo','telefone','data_criacao','hora_criacao',
                                                            'data_finalizacao','hora_finalizacao','data_transferencia',
                                                            'hora_transferencia','data_ultima_msg_agente',
                                                            'hora_ultima_msg_agente','data_ultima_msg_cliente',
                                                            'hora_ultima_msg_cliente','tempo_de_atendimentos',
                                                            'tempo_de_atendimento_humano','tempo_atendimento_automatizado',
                                                            'tempo_em_fila','tempo_de_espera']
df_atendimentos_fato = df_atendimentos_fato.reindex(columns=orderm_columns)
                                                            
df_atendimentos_fato.info()

In [None]:
# Inserir dados na fato atendimentos no banco de dados
df_atendimentos_fato.to_sql('ft_atendimentos', engine_dw, if_exists='append', index=False)
