# Avisos

Este notebook contem os scrips que irão:

- Coletar os dados na tabela de Stage;
- Criar tabela temporária com os dados para a atualização da fato;
- Carregar os dados na tabela;
- Realizar atualização incremental;
- Realizar carga retroativa na tabela.

# Imports

In [81]:
import pandas as pd
from datetime import datetime
from pathlib import Path
import sqlite3 as sql

# Variáveis

In [82]:
# Variável que determina a data da carga de uma tabela.
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Variável com o diretório local onde serão salvos os bancos de dados SQLite.
endereco = Path('G:\\Meu Drive\\Projetos - Data Engineer\\Databases\\')

# Definindo o local de criação dos bancos.
BDODS = endereco / "ibgeSTG.db"                  #ODS = Operational Data Store
BDDW = endereco / "ibgeDW.db"                    #DW = Data Warehouse

# Conectando nos bancos de dados.
connODS = sql.connect(BDODS)
connDW = sql.connect(BDDW)

# Definindo a manipulação de dados via SQL.
SQL_ODS = connODS.cursor()
SQL_DW = connDW.cursor()

# Criando o dataframe com a querie que irá carregar os dados

In [83]:
# Definindo a query de carga na fato_boletim_ocorrencias.

qry_fato_ocorrencias = """
SELECT
    a.cod_dp,
    a.cod_bpm,
    a.cod_municipio,
    a.ano,
    a.mes,
    a.regiao,
    a.ocorrencia,
    a.quantidade
FROM st_ocorrencias a
WHERE a.dtCarga = (
                    SELECT
                    MAX (dtCarga)
                    FROM st_ocorrencias
                        )

"""

# Criando o dataframe.
df_fato_ocorrencias = pd.read_sql(qry_fato_ocorrencias, connODS)

# Fazendo um tratamento no campo "mes", onde quando ele tiver apenas um dígito, retornará o valor com um 0 à esquerda.
df_fato_ocorrencias['mes'] = df_fato_ocorrencias['mes'].apply(lambda x: str(x).zfill(2))

# Adicionando uma nova coluna, com o concatenado do "ano" e "mes", que será nossa chave com a dim_tempo.
df_fato_ocorrencias['ano_mes'] = df_fato_ocorrencias['ano'].astype(str) + df_fato_ocorrencias['mes'].astype(str)

# Convertendo a coluna "ano_mes" para o tipo inteiro
df_fato_ocorrencias['ano_mes'] = pd.to_numeric(df_fato_ocorrencias['ano_mes'])

df_fato_ocorrencias

Unnamed: 0,cod_dp,cod_bpm,cod_municipio,ano,mes,regiao,ocorrencia,quantidade,ano_mes
0,1,5,3304557,2018,01,1,ameaca,7777777,201801
1,1,5,3304557,2018,01,1,apreensao_drogas,3,201801
2,1,5,3304557,2018,01,1,estelionato,81,201801
3,1,5,3304557,2018,01,1,estupro,1,201801
4,1,5,3304557,2018,01,1,extorsao,3,201801
...,...,...,...,...,...,...,...,...,...
124003,168,33,3304409,2018,12,3,roubo_transeunte,1,201812
124004,168,33,3304409,2018,12,3,tentat_hom,2,201812
124005,168,31,3304409,2018,12,3,TESTE_INCREMENTAL,999999,201812
124006,168,31,3304409,2018,11,3,TESTE_INCREMENTAL02,999999,201811


# Criando tabela temporária

In [84]:
# Criando tabela temporária.

df_fato_ocorrencias.to_sql('temp_fato_ocorrencias', connDW, if_exists='replace')

df_fato_ocorrencias

Unnamed: 0,cod_dp,cod_bpm,cod_municipio,ano,mes,regiao,ocorrencia,quantidade,ano_mes
0,1,5,3304557,2018,01,1,ameaca,7777777,201801
1,1,5,3304557,2018,01,1,apreensao_drogas,3,201801
2,1,5,3304557,2018,01,1,estelionato,81,201801
3,1,5,3304557,2018,01,1,estupro,1,201801
4,1,5,3304557,2018,01,1,extorsao,3,201801
...,...,...,...,...,...,...,...,...,...
124003,168,33,3304409,2018,12,3,roubo_transeunte,1,201812
124004,168,33,3304409,2018,12,3,tentat_hom,2,201812
124005,168,31,3304409,2018,12,3,TESTE_INCREMENTAL,999999,201812
124006,168,31,3304409,2018,11,3,TESTE_INCREMENTAL02,999999,201811


In [91]:
# Definindo a query da carga incremental.

qry_carga_incremental_fato_ocorrencias = """
SELECT
    b.sk_dp,
    c.sk_bpm,
    d.sk_municipio,
    e.sk_tempo,
    a.ocorrencia,
    a.quantidade
FROM temp_fato_ocorrencias a
JOIN dim_dp b
    ON a.cod_dp = b.cod_dp
JOIN dim_bpm c
    ON a.cod_bpm = c.cod_bpm
JOIN dim_municipio d
    ON a.cod_municipio = d.cod_municipio
JOIN dim_tempo e
    ON a.ano_mes = e.sk_tempo
LEFT JOIN fato_boletim_ocorrencias f
    ON b.sk_dp = f.sk_dp
    AND c.sk_bpm = f.sk_bpm
    AND d.sk_municipio = f.sk_municipio
    AND e.sk_tempo = f.sk_tempo
    AND a.ocorrencia = f.ocorrencia
WHERE f.sk_dp IS NULL
AND f.sk_bpm IS NULL
AND f.sk_municipio IS NULL
AND f.sk_tempo IS NULL
"""

# Criando um dataframe incremental.
carga_incremental_fato_ocorrencias = pd.read_sql(qry_carga_incremental_fato_ocorrencias, connDW)

carga_incremental_fato_ocorrencias

Unnamed: 0,sk_dp,sk_bpm,sk_municipio,sk_tempo,ocorrencia,quantidade
0,137,29,65,201811,TESTE_INCREMENTAL03,999999


# Carregando dados na tabela fato

In [86]:
# Inserindo registros na tabela fato_boletim_ocorrencias.
print('Iniciando a carga de dados na tabela fato_boletim_ocorrencias')
print('--------------------------------------------')


connDW.executemany('''
    INSERT INTO fato_boletim_ocorrencias(sk_dp, sk_bpm, sk_municipio, sk_tempo, ocorrencia, quantidade) 
    VALUES(?,?,?,?,?,?)''', carga_incremental_fato_ocorrencias.values.tolist())

connDW.commit()

# Exibindo mensagem de conclusão
print('Carga incremental finalizada!', len(carga_incremental_fato_ocorrencias), 'registros inseridos na fato_boletim_ocorrencias.')
print('--------------------------------------------')

Iniciando a carga de dados na tabela fato_boletim_ocorrencias
--------------------------------------------
Carga incremental finalizada! 0 registros inseridos na fato_boletim_ocorrencias.
--------------------------------------------


# Desenvolvendo a query para a atualização retroativa

In [89]:
# Nesta etapa vamos criar um processo para verificar se houve alteração de alguma métrica no histórico da fato.
# Quando o registro for igual, porém com alteração na coluna de quantidade, ocorrerá a alteração na tabela fato.

qry_att_fato_ocorrencias = """
SELECT
    a.quantidade,
    b.sk_dp,
    c.sk_bpm,
    d.sk_municipio,
    e.sk_tempo,
    a.ocorrencia
FROM temp_fato_ocorrencias a
JOIN dim_dp b
    ON a.cod_dp = b.cod_dp
JOIN dim_bpm c
    ON a.cod_bpm = c.cod_bpm
JOIN dim_municipio d
    ON a.cod_municipio = d.cod_municipio
JOIN dim_tempo e
    ON a.ano_mes = e.sk_tempo
LEFT JOIN fato_boletim_ocorrencias f
    ON b.sk_dp = f.sk_dp
    AND c.sk_bpm = f.sk_bpm
    AND d.sk_municipio = f.sk_municipio
    AND e.sk_tempo = f.sk_tempo
    AND a.ocorrencia = f.ocorrencia
WHERE a.quantidade <> f.quantidade
"""

# Criando o dataframe de atualização retroativa.
att_retro_fato_ocorrencias = pd.read_sql(qry_att_fato_ocorrencias, connDW)

att_retro_fato_ocorrencias

Unnamed: 0,quantidade,sk_dp,sk_bpm,sk_municipio,sk_tempo,ocorrencia
0,7777777,1,4,68,201801,ameaca


# Executando a atualização retroativa

In [90]:
# Atualizando os dados retroativos na fato_boletim_ocorrencias

print("Iniciando atualização retroativa na fato_boletim_ocorrencias!")
print('--------------------------------------------')
      
# Definindo a query de atualização.
qry_att_retro_fato = """

    UPDATE fato_ocorrencias
    SET quantidade = ?
    WHERE sk_dp = ?
    AND sk_bpm = ?
    AND sk_municipio = ?
    AND sk_tempo = ?
    AND ocorrencia = ?

"""
      
# Executando a atualização na fato Ocorrencias.
connDW.executemany( qry_att_retro_fato, att_retro_fato_ocorrencias.values.tolist() )

connDW.commit()
      
      
print("Fim da atualização retroativa na fato_boletim_ocorrencias!", len(atualização_fato_ocorrencias), "Registros atualizados!")
print('--------------------------------------------')

Iniciando atualização retroativa na fato_boletim_ocorrencias!
--------------------------------------------
Fim da atualização retroativa na fato_boletim_ocorrencias! 1 Registros atualizados!
--------------------------------------------


# Drop na tabela temporária

In [92]:
# Dropando a tabela temporária.
connDW.execute("DROP TABLE temp_fato_ocorrencias")

<sqlite3.Cursor at 0x1f3fbe41b20>

# Fechando as conexões

In [93]:
# Fechando as conexões com ODS e DW.

connODS.close()
connDW.close()