In [203]:
import pandas as pd
from datetime import datetime
from pathlib import Path
import sqlite3 as sql

# CRIANDO O BANCO DE DADOS 

In [204]:
endereco = Path('C:\\Users\\elias\\Downloads\\Engenharia de Dados com Python\\Bancos de Dados\\')

BDODS = endereco / "pascoaODS.db"
BDDW = endereco / "pascoaDW.db"

if endereco.exists():
    print("Endereço existe!")
    if (BDODS.exists() and BDDW.exists()):
        print("Bancos de dados já existem!")
    else:
        BDODS.touch()
        BDDW.touch()
        print("Bancos de dados acabaram de ser criados!")
else:
    print("Endereço não existe. Favor, verificar!")


Endereço existe!
Bancos de dados já existem!


# CARGA DOS DADOS DAS OCORRÊNCIAS

In [205]:
# coleta horário da carga
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')
dtCarga

'13/09/2022 23:07'

# CRIAÇÃO DO DATAFRAME tbLogOcorrencias

In [206]:
# pasta com csv
localCSV = 'C:\\Users\\elias\\Downloads\\Engenharia de Dados com Python\\Arquivos CSV\\'

# carga dos dados de cadastro dos batalhões de polícia militar
tbLogOcorrencias = pd.DataFrame(pd.read_csv(localCSV + 'OcorrenciaV2.csv'))

# adicionar data e hora
tbLogOcorrencias['dtCarga'] = dtCarga

# selecionar coluna COD_DP como índice
# tbLogDP = tbLogDP.set_index('COD_DP')

In [207]:
tbLogOcorrencias

Unnamed: 0,COD_DP,COD_BPM,ano,mes,mes_ano,Regiao,COD_Munic_IBGE,Ocorrencia,Soma de Qtde,dtCarga
0,1,5,2018,1,2018m01,1,3304557,ameaca,7,13/09/2022 23:07
1,1,5,2018,1,2018m01,1,3304557,apreensao_drogas,3,13/09/2022 23:07
2,1,5,2018,1,2018m01,1,3304557,estelionato,81,13/09/2022 23:07
3,1,5,2018,1,2018m01,1,3304557,estupro,1,13/09/2022 23:07
4,1,5,2018,1,2018m01,1,3304557,extorsao,3,13/09/2022 23:07
...,...,...,...,...,...,...,...,...,...,...
124000,168,33,2018,12,2018m12,3,3304409,posse_drogas,8,13/09/2022 23:07
124001,168,33,2018,12,2018m12,3,3304409,recuperacao_veiculos,1,13/09/2022 23:07
124002,168,33,2018,12,2018m12,3,3304409,roubo_rua,1,13/09/2022 23:07
124003,168,33,2018,12,2018m12,3,3304409,roubo_transeunte,1,13/09/2022 23:07


# CONEXÃO AO BANCO DE DADOS 

In [208]:
# estabelece conexão ao banco de dados BDODS
conexaoBDODS = sql.connect(BDODS)

# definição da manipulação de dados em SQL
SQL_ODS = conexaoBDODS.cursor()

# conexão com o banco de dados
conexaoBDDW = sql.connect(BDDW)

# estabelece configuração do cursor
SQL_DW = conexaoBDDW.cursor()

# CRIANDO A TABELA E ÍNDICE EM tbLogBPM

In [209]:
# criar a tabela tbLogOcorrencias no BDODS
query_tbLogOcorrencias = '''

    CREATE TABLE IF NOT EXISTS tbLogOcorrencias
    (    
    codDP INTEGER,
    codBPM INTEGER,
    ano INTEGER,
    mes INTEGER,
    mes_ano CHAR (7),
    regiao INTEGER,
    codMunicIBGE INTEGER,
    ocorrencia VARCHAR (100),
    qtde INTEGER,
    dtCarga DATETIME
    )
'''

# executa a query de criação da tabela
SQL_ODS.execute(query_tbLogOcorrencias)

query_idx_tbLogOcorrencias = "CREATE INDEX IF NOT EXISTS idx_tbLogOcorrencias ON tbLogOcorrencias (codDP,codBPM,codMunicIBGE)"

# executa a query de criação do index
SQL_ODS.execute(query_idx_tbLogOcorrencias)

# não há necessidade de confirmar a transação

<sqlite3.Cursor at 0x2d42c539ab0>

# INSERINDO DADOS EM tbLogOcorrencias

In [210]:
print('Iniciando a inserção de dados na tabela tbLogOcorrencias')

#
SQL_ODS.executemany('''

INSERT INTO tbLogOcorrencias (codDP, codBPM, ano, mes, mes_ano, regiao, codMunicIBGE, ocorrencia, qtde, dtCarga) 
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', tbLogOcorrencias.values.tolist())

# confirmar a transação
conexaoBDODS.commit()

print('Carga de', len(tbLogOcorrencias), 'registros em tbLogOcorrencias finalizada ')

Iniciando a inserção de dados na tabela tbLogOcorrencias
Carga de 124005 registros em tbLogOcorrencias finalizada 


In [211]:
'''teste '''+", ".join(tbLogOcorrencias.columns.tolist())

'teste COD_DP, COD_BPM, ano, mes, mes_ano, Regiao, COD_Munic_IBGE, Ocorrencia, Soma de Qtde, dtCarga'

# CRIANDO O DATAFRAME COM O RESULTADO DA QUERY DE CARGA DE DADOS DAS OCORRÊNCIAS

In [212]:
# definição da query de seleção da carga de dados
query_fOcorrencias = '''
    SELECT
        a.codDP,
        a.codBPM,
        a.ano,
        a.mes,
        a.regiao,
        a.codMunicIBGE,
        a.ocorrencia,
        a.qtde
    FROM tbLogOcorrencias a
    WHERE a.dtCarga = (SELECT MAX(dtCarga) FROM tbLogOcorrencias)
'''

# popular o dataframe
fOcorrencias = pd.read_sql(query_fOcorrencias,conexaoBDODS)

# confirmar transação
conexaoBDODS.commit()

fOcorrencias

Unnamed: 0,codDP,codBPM,ano,mes,regiao,codMunicIBGE,ocorrencia,qtde
0,1,5,2018,1,1,3304557,ameaca,7
1,1,5,2018,1,1,3304557,apreensao_drogas,3
2,1,5,2018,1,1,3304557,estelionato,81
3,1,5,2018,1,1,3304557,estupro,1
4,1,5,2018,1,1,3304557,extorsao,3
...,...,...,...,...,...,...,...,...
124000,168,33,2018,12,3,3304409,posse_drogas,8
124001,168,33,2018,12,3,3304409,recuperacao_veiculos,1
124002,168,33,2018,12,3,3304409,roubo_rua,1
124003,168,33,2018,12,3,3304409,roubo_transeunte,1


In [213]:
# definição da query de criação fOcorrencias em DW
query_create_fOcorrencias = '''
    CREATE TABLE IF NOT EXISTS fOcorrencias
    (
        idDP INTEGER REFERENCES dDP(idDP) ON UPDATE NO ACTION ON DELETE NO ACTION,
        idBPM INTEGER REFERENCES dBPM(idBPM) ON UPDATE NO ACTION ON DELETE NO ACTION,
        idPeriodo INTEGER REFERENCES dPeriodo(idPeriodo) ON UPDATE NO ACTION ON DELETE NO ACTION,
        regiao INTEGER,
        idMunic INTEGER REFERENCES dMunic(idMunic) ON UPDATE NO ACTION ON DELETE NO ACTION,
        ocorrencia VARCHAR(100),
        qtde INTEGER
    )
'''

# cria fOcorrencias em BW
SQL_DW.execute(query_create_fOcorrencias)

# define os índices de fOcorrencias
query_idx_fOcorrencias = "CREATE INDEX IF NOT EXISTS idx_fOcorrencias ON fOcorrencias (idDP, idBPM, idMunic)"
SQL_DW.execute(query_idx_fOcorrencias)

<sqlite3.Cursor at 0x2d435e869d0>

# INSERINDO DADOS NA FATO OCORRÊNCIAS (fOcorrencias) - CARGA INCREMENTAL

In [214]:
# cria tabela temporária
fOcorrencias.to_sql('tempOcorrencias',conexaoBDDW,if_exists='replace')

In [215]:
# definindo a query de carga incremental
query_carga_incremental_fOcorrencias = '''
    SELECT
        b.idDP,
        c.idBPM,
        d.idPeriodo,
        a.regiao,
        e.idMunic,
        a.ocorrencia,
        a.qtde
    FROM tempOcorrencias a
    JOIN dDP b
    ON a.codDP = b.codDP
    JOIN dBPM c
    ON a.codBPM = c.codBPM
    JOIN dPeriodo d 
    ON (a.ano = d.ano) AND (a.mes = d.mes)
    JOIN dMunicipio e
    ON a.codMunicIBGE = e.codMunic
    LEFT JOIN fOcorrencias g
    ON b.idDP = g.idDP
    AND c.idBPM = g.idBPM
    AND d.idPeriodo = g.idPeriodo
    AND e.idMunic = g.idMunic
    WHERE g.idDP is null
    AND g.idBPM is null
    AND g.idPeriodo is null
    AND g.idMunic is null
'''
# criando um DF incremental
carga_incremental_fOcorrencias = pd.read_sql(query_carga_incremental_fOcorrencias,conexaoBDDW)
carga_incremental_fOcorrencias

Unnamed: 0,idDP,idBPM,idPeriodo,regiao,idMunic,ocorrencia,qtde


In [216]:
print('Iniciando a carga incremental de dados na tabela fOcorrencias')

# definir query automaticamente de acordo com as colunas de carga_incremental_fOcorrencias
cols = "(" + ", ".join(carga_incremental_fOcorrencias.columns.tolist())+ ")"
vals = "(" + ", ".join(['?' for x in range(len(carga_incremental_fOcorrencias.columns.tolist()))]) + ")"
query_insert_carga_incremental_fOcorrencias = '''INSERT INTO fOcorrencias ''' + cols + ''' VALUES ''' + vals
# Executar a carga de dados
SQL_DW.executemany(query_insert_carga_incremental_fOcorrencias, carga_incremental_fOcorrencias.values.tolist())

# confirmar a transação
conexaoBDDW.commit()

print('Carga de', len(carga_incremental_fOcorrencias), 'registros em fOcorrencias finalizada ')

Iniciando a inserção de dados na tabela fOcorrencias
Carga de 0 registros em fOcorrencias finalizada 


# ATUALIZANDO DADOS NA FATO OCORRÊNCIAS (fOcorrencias) - ATUALIZAÇÃO RETROATIVA

In [217]:
# query que verifica a existência de atualizações retroativas
query_atualizacao_fOcorrencias = '''
    SELECT
        a.qtde,
        b.idDP,
        c.idBPM,
        d.idPeriodo,
        a.regiao,
        e.idMunic,
        a.ocorrencia        
    FROM tempOcorrencias a
    JOIN dDP b
    ON a.codDP = b.codDP
    JOIN dBPM c
    ON a.codBPM = c.codBPM
    JOIN dPeriodo d 
    ON (a.ano = d.ano) AND (a.mes = d.mes)
    JOIN dMunicipio e
    ON a.codMunicIBGE = e.codMunic
    LEFT JOIN fOcorrencias g
    ON b.idDP = g.idDP
    AND c.idBPM = g.idBPM
    AND d.idPeriodo = g.idPeriodo
    AND e.idMunic = g.idMunic
    AND a.regiao = g.regiao
    AND a.ocorrencia = g.ocorrencia
    WHERE a.qtde <> g.qtde
'''
# cria DF com linhas de atualizações
atualiza_fOcorrencias = pd.read_sql(query_atualizacao_fOcorrencias,conexaoBDDW)
atualiza_fOcorrencias

Unnamed: 0,qtde,idDP,idBPM,idPeriodo,regiao,idMunic,ocorrencia


In [218]:
print('Iniciando a atualização de dados na tabela fOcorrencias')

# definir query automaticamente de acordo com as colunas de carga_incremental_fOcorrencias
atualiza_fOcorrencias_cols_list = atualiza_fOcorrencias.columns.tolist()

cols = []
for col in atualiza_fOcorrencias_cols_list:
    if(col != atualiza_fOcorrencias_cols_list[0]):
        if(col != atualiza_fOcorrencias_cols_list[-1]):
            cols.append(col + ' = ? \nAND ')            
        else:
            cols.append(col + ' = ?')
cols = "".join(cols)

query_update_fOcorrencias = '''
UPDATE fOcorrencias
SET qtde = ?
WHERE ''' + cols
# print(query_update_fOcorrencias)

# Executar a atualização de dados
SQL_DW.executemany(query_update_fOcorrencias,atualiza_fOcorrencias.values.tolist())

# confirmar a transação
conexaoBDDW.commit()

print('Carga de', len(atualiza_fOcorrencias), 'registros em fOcorrencias finalizada ')

Iniciando a atualização de dados na tabela fOcorrencias
Carga de 0 registros em fOcorrencias finalizada 


# DELETAR TABELA TEMPORÁRIA

In [219]:
# deletar table temporária tempOcorrencias
SQL_DW.execute('DROP TABLE tempOcorrencias')

# confirmar a transação
conexaoBDDW.commit()

# FECHANDO A CONEXÃO COM ODS E DW

In [220]:
conexaoBDDW.close()
conexaoBDODS.close()