In [None]:
import pandas as pd
from datetime import datetime
from pathlib import Path 
import psycopg2 as sql
from sqlalchemy import create_engine

# Coleta de dados do site do IBGE

In [None]:
#Coleta dos dados
url = 'https://www.ibge.gov.br/explica/codigos-dos-municipios.php#RJ'

dadosIBGE = pd.DataFrame(pd.read_html(url, match= "Municípios do Rio de Janeiro")[0])
dadosIBGE

In [None]:
# Renomeando colunas
dadosIBGE = dadosIBGE.rename(columns= {'Municípios do Rio de Janeiro': 'nmMunic', 'Códigos':'codMunic'})

#Alterando nome do index
dadosIBGE.index.name = 'idMunic'

#Aletrando para iniciar do 1
dadosIBGE.index = dadosIBGE.index +1

#Adicionando uma coluna de data e hora de carga

dtCarga = datetime.today().strftime('%d/%m/%y %H:%M')
dadosIBGE['dtCarga'] = dtCarga
dadosIBGE

In [None]:
dadosIBGE.info()

# Conectando no banco de dados ODS

In [None]:
host = "localhost"  # ou o endereço do servidor PostgreSQL
port = "5432"       # a porta padrão é 5432
database = "pascoaODS"  # substitua pelo nome do banco de dados
user = "postgres"  # substitua pelo nome do usuário do PostgreSQL
password = "bloqeiodofgh"  # substitua pela senha do usuário do PostgreSQL

# Conectando ao PostgreSQL
try:
    connection = sql.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password
    )
    print("Conexão ao PostgreSQL bem-sucedida!")
    # Faça suas operações com o banco de dados aqui...
    #connection.close()  # Não esqueça de fechar a conexão quando terminar!
except sql.Error as e:
    print("Erro ao conectar ao PostgreSQL:", e)


# MANIPULANDO OS BANCOS DE DADOS CRIADOS

In [None]:
# Carregar os dados do DataFrame dadosIBGE na tabela tbLogMunic
# Certifique-se de que o DataFrame corresponde à estrutura da tabela
engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")
dadosIBGE.to_sql('tbLogMunic', engine, if_exists="append", index=False)

# Importando dados dos  dados de cadastro das delegacias de polícia

In [None]:
tbLogDP = pd.DataFrame(pd.read_csv('DP.CSV'))

#Criando coluna de data de carga
tbLogDP['dtCarga'] = datetime.today().strftime('%d/%m/%y %H:%M')
tbLogDP

# Importando dados dados de cadastro dos responsáveis pelas delegacias de polícia

In [None]:
tbLogRespDP = pd.DataFrame(pd.read_csv('ResponsavelDP.CSV'))

#Criando coluna de data de carga
tbLogRespDP['dtCarga'] = datetime.today().strftime('%d/%m/%y %H:%M')
tbLogRespDP

# Conectando no ODS e informando ao python que usarei linguagem SQL

In [None]:
#Definição de manipulação de dados em SQL
SQL_ODS = connection.cursor()

# Inserindo registros na tabela tbLogDP

In [None]:
#Inserindo registros na tabela tbLogDP
print("Iniciando a inserção de dados na tabela tbLogDP.")
print("-------------------------------------------------")

SQL_ODS.executemany('''INSERT INTO tbLogDP (cod_DP,nm_DP,endereco,dtCarga) VALUES (%s, %s, %s, %s)''',tbLogDP.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(tbLogDP), "registros inseridos na tbLogDP!")

# Inserindo registros na tabela tbLogRespDP

In [None]:
#Inserindo registros na tabela tbLogRespDP
print("Iniciando a inserção de dados na tabela tbLogRespDP.")
print("-------------------------------------------------")

SQL_ODS.executemany('''INSERT INTO tbLogRespDP (cod_DP, Responsavel, dtCarga) VALUES (%s, %s, %s)''',tbLogRespDP.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(tbLogRespDP), "registros inseridos na tbLogRespDP!")

# Criando o dataframe com o select de criação da dimensão dp (ddp), no dw

In [None]:
qry_dDP = '''
SELECT cod_dp, nm_dp, endereco, responsavel
from
( SELECT
            a.cod_DP,
            a.nm_DP,
            a.endereco,
            b.responsavel,
            max(a.dtCarga)
        FROM tbLogDP a
        JOIN tbLogRespDP b
        on a.cod_DP = b.cod_DP
        GROUP BY
            a.cod_DP,
            a.nm_DP,
            a.endereco,
            b.Responsavel
	
	)a;
'''
dfdDP = pd.read_sql(qry_dDP, connection)

# Coletando dados dos batalhões de polícia militar (bpm)

In [None]:
#Criar Data Frame com os dados dos BPMs
tbLogBPM = pd.DataFrame(pd.read_csv("BPM.csv"))

#Criando coluna com a data da carga
tbLogBPM['dtCarga'] = dtCarga

# Construindo a tabela de log de registros do bpm (tbLogBPM), no ODS

In [None]:
#Definir a query de criação da tabela
qry_tbLogBPM = '''
    
    CREATE TABLE IF NOT EXISTS tbLogBPM (
    
    codBPM INTEGER,
    nmBPM VARCHAR (7),
    enderecoBPM VARCHAR (200),
    dtCarga timestamp without time zone

    )
'''

#executar a criação da tbLogBPM
SQL_ODS.execute(qry_tbLogBPM)

#definição da qry de criação de index
qry_idx_tbLogBPM_codBPM = "CREATE INDEX IF NOT EXISTS idx_tbLogBPM_codBPM ON tbLogBPM (codBPM)"

#executar a criação do index
SQL_ODS.execute(qry_idx_tbLogBPM_codBPM)

# Inserindo dados na tbLogBPM

In [None]:
#Inserindo registros na tabela tbLogBPM
print("Iniciando a inserção de dados na tabela tbLogBPM.")
print("-------------------------------------------------")

SQL_ODS.executemany('''INSERT INTO tbLogBPM (codBPM,nmBPM,enderecoBPM,dtCarga) VALUES (%s,%s,%s,%s)''',tbLogBPM.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(tbLogBPM), "registros inseridos na tbLogBPM!")

# Coletar dados da área dos batalhões da polícia militar

In [None]:
#Criar Data Frame com os dados dos BPMs
tbLogAreaBPM = pd.DataFrame(pd.read_csv("areaBPMv1.csv"))

#Criando coluna com a data da carga
tbLogAreaBPM['dtCarga'] = dtCarga

# Construindo a tabela de log de registros da área dos bpms (tblogareabpm), no ods

In [None]:
#Definir a query de criação da tabela
qry_tbLogAreaBPM = '''
    
    CREATE TABLE IF NOT EXISTS tbLogAreaBPM (
    
    codBPM INTEGER,
    areaBPM NUMERIC (7,2),
    dtCarga timestamp without time zone

    )
'''

#executar a criação da tbLogAreaBPM
SQL_ODS.execute(qry_tbLogAreaBPM)

#definição da qry de criação de index
qry_idx_tbLogAreaBPM_codBPM = "CREATE INDEX IF NOT EXISTS idx_tbLogAreaBPM_codBPM ON tbLogAreaBPM (codBPM)"

#executar a criação do index
SQL_ODS.execute(qry_idx_tbLogAreaBPM_codBPM)
connection.commit()

# Inserindo dados da área dos bpms na tabela tblogareabpm

In [None]:
#Inserindo registros na tabela tbLogAreaBPM
print("Iniciando a inserção de dados na tabela tbLogAreaBPM.")
print("-------------------------------------------------")

SQL_ODS.executemany('''INSERT INTO tbLogAreaBPM (codBPM,areaBPM,dtCarga) VALUES (%s,%s,%s)''',tbLogAreaBPM.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(tbLogAreaBPM), "registros inseridos na tbLogAreaBPM")

# Criando o dataframe com o select de criação da dimensão dp (dbpm), no dw

In [None]:
#Definir a query de criação da dimensão dBPM
qry_dBPM = '''
    SELECT
        codBPM,
        nmBPM,
        enderecoBPM,
        areaBPM
    FROM 
    (
        SELECT 
            a.codBPM,
            a.nmBPM,
            a.enderecoBPM,
            b.areaBPM,
            max(a.dtCarga)
        FROM tbLogBPM a
        JOIN tbLogAreaBPM b
        ON a.codBPM = b.codBPM
        WHERE a.dtCarga = (
                            SELECT MAX(x.dtCarga)
                            FROM tbLogBPM x
                       )
        GROUP BY
            a.codBPM,
            a.nmBPM,
            a.enderecoBPM,
            b.areaBPM
    ) a
'''

dBPM = pd.read_sql(qry_dBPM,connection)

# Coleta de dados dos registros de ocorrencias

In [None]:
#Dataframe com os dados dos registros das ocorrencias
tbLogOcorrencias = pd.DataFrame(pd.read_csv('OcorrenciaV2.csv'))

#Criando coluna com dtCarga
dtCarga = datetime.today().strftime('%d/%m/%y %H:%M')
tbLogOcorrencias['dtCarga'] = dtCarga

# Criando tabela tbOcorrencias em SQL

In [None]:
#Criando query
qry_create_tbOcorrencias = '''
CREATE TABLE IF NOT EXISTS tbLogOcorrencias (
    
    codDP INTEGER,
    codBPM INTEGER,
    ano INTEGER,
    mes INTEGER,
    mes_ano CHAR(7),
    regiao INTEGER,
    codMunicIBGE INTEGER,
    ocorrencias VARCHAR(100),
    qtde INTEGER,
    dtCarga timestamp without time zone
    )
'''
#executar a criação da Ocorrencias
SQL_ODS.execute(qry_create_tbOcorrencias)

#definição da qry de criação de index
qry_idx_tbOcorrencias = "CREATE INDEX IF NOT EXISTS idx_tbLogOcorrencias ON tbLogOcorrencias (codDP,codBPM,codMunicIBGE)"

#executar a criação do index
SQL_ODS.execute(qry_idx_tbOcorrencias)
connection.commit()

In [None]:
#inserindo dados na tbLogOcorrencias
print("Iniciando a carga de dados na tbLogOcorrencias")
print("-----------------------------------------------")
SQL_ODS.executemany('''

    INSERT INTO tbLogOcorrencias (codDP,codBPM,ano,mes,mes_ano,regiao,codMunicIBGE,ocorrencias,qtde,dtCarga)
    VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)    
    ''', 
    tbLogOcorrencias.values.tolist()

)

#confirmar a transação
connection.commit()

print("Fim da carga de dados da tbLogOcorrencias!",len(tbLogOcorrencias),"Registros inseridos!")

# Criando DataFrame com o resultado da query de ocorrencias

In [None]:
qry_ocorrencias = '''
SELECT 
	a.coddp,
	a.codbpm,
	a.mes,
	a.ano,
	a.codmunicibge,
	a.ocorrencias,
	a.qtde
FROM tblogocorrencias a 
WHERE a.dtcarga = (SELECT max(dtcarga)
				  FROM tblogocorrencias)
'''
dfocorrencias = pd.read_sql(qry_ocorrencias, connection)

# Fechando conexão com o ODS

In [None]:
connection.commit()
connection.close()

# Conetando no DW e informando ao python que usarei linguagem SQL

In [None]:
dadosIBGE = dadosIBGE[['nmMunic','codMunic']]

host = "localhost"  # ou o endereço do servidor PostgreSQL
port = "5432"       # a porta padrão é 5432
database = "pascoaDW"  # substitua pelo nome do banco de dados
user = "postgres"  # substitua pelo nome do usuário do PostgreSQL
password = "bloqeiodofgh"  # substitua pela senha do usuário do PostgreSQL

# Conectando ao PostgreSQL
try:
    connection = sql.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password
    )
    print("Conexão ao PostgreSQL bem-sucedida!")
    # Faça suas operações com o banco de dados aqui...
    #connection.close()  # Não esqueça de fechar a conexão quando terminar!
except sql.Error as e:
    print("Erro ao conectar ao PostgreSQL:", e)


In [None]:
#Definição de manipulação de dados em SQL
SQL_DW = connection.cursor()

In [None]:
# Carregar os dados do DataFrame dadosIBGE na tabela dMunicipio
# Certifique-se de que o DataFrame corresponde à estrutura da tabela
engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")

#Deletando a tabela caso ela exista
SQL_DW.execute('DROP TABLE IF EXISTS public."dMunicipio"')
dadosIBGE.to_sql('dMunicipio', engine, if_exists="append", index=False)

In [None]:
dfdDP

# Deletando dados antigos e reiniciando ao autoincremental da chave

In [None]:
#Deletar os dados atuais
SQL_DW.execute("DELETE FROM dDP")

#Reiniciando o autoincramental da chave
SQL_DW.execute("ALTER SEQUENCE dDP_idDP_seq RESTART WITH 1;")

#Confirmar a transação
connection.commit()

# Inserindo dados na dimensão dp (ddp), no dw

In [None]:
#Inserindo registros na tabela tbLogRespDP
print("Iniciando a inserção de dados na tabela dDP.")
print("-------------------------------------------------")

SQL_DW.executemany('''INSERT INTO dDP (cod_DP, nm_dp, endereco, responsavel) VALUES (%s, %s, %s, %s)''',dfdDP.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(dfdDP), "registros inseridos na dDP!")

# Validando carga dos dados da dimensão dp (ddp)

In [None]:
pd.read_sql("SELECT * FROM dDP",connection)

# Construindo tabela de dimensão período (dperiodo) no DW

In [None]:
#Definir a query de criação da tabela
qry_dperiodo = '''
    
    CREATE TABLE IF NOT EXISTS dperiodo (
    
    idperiodo SERIAL PRIMARY KEY,
    data timestamp without time zone,
    mes INTEGER,
    ano INTEGER,
    trimestre INTEGER,
    semestre INTEGER
    )
'''

#executar a criação da dperiodo
SQL_DW.execute(qry_dperiodo)

#executar a criação do index idperiodo
SQL_DW.execute("CREATE INDEX IF NOT EXISTS idx_dperiodo_idperiodo ON dperiodo (idperiodo)")

#executar a criação do index codBPM
SQL_DW.execute("CREATE INDEX IF NOT EXISTS idx_dBPM_codBPM ON dperiodo (mes, ano, trimestre, semestre)")
connection.commit()

# Construindo a tabela de dimensão bpm (dbpm), no dw

In [None]:
#Definir a query de criação da tabela
qry_dBPM = '''
    
    CREATE TABLE IF NOT EXISTS dBPM (
    
    idBPM SERIAL PRIMARY KEY,
    codBPM INTEGER,
    nmBPM VARCHAR (7),
    enderecoBPM VARCHAR (200),
    areaBPM NUMERIC (7,2)

    )
'''

#executar a criação da tbLogBPM
SQL_DW.execute(qry_dBPM)

#executar a criação do index idBPM
SQL_DW.execute("CREATE INDEX IF NOT EXISTS idx_dBPM_idBPM ON dBPM (idBPM)")

#executar a criação do index codBPM
SQL_DW.execute("CREATE INDEX IF NOT EXISTS idx_dBPM_codBPM ON dBPM (codBPM)")
connection.commit()

# Deletando dados antigos e reiniciando ao autoincremental da chave

In [None]:
#Deletar os dados atuais
SQL_DW.execute("DELETE FROM dBPM")

#Reiniciando o autoincramental da chave
SQL_DW.execute("ALTER SEQUENCE dBPM_idBPM_seq RESTART WITH 1;")

#Confirmar a transação
connection.commit()

# Inserindo dados na dimensão dp (ddp), no dw

In [None]:
#Inserindo registros na tabela dBPM
print("Iniciando a inserção de dados na tabela dBPM.")
print("-------------------------------------------------")

SQL_DW.executemany('''INSERT INTO dBPM (codBPM, nmBPM, enderecoBPM, areaBPM) VALUES (%s, %s, %s, %s)''',dBPM.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(dBPM), "registros inseridos na dBPM!")

# Deletando dados antigos e reiniciando ao autoincremental da chave

In [None]:
#Deletar os dados atuais
SQL_DW.execute("DELETE FROM dperiodo")

#Reiniciando o autoincramental da chave
SQL_DW.execute("ALTER SEQUENCE dperiodo_idperiodo_seq RESTART WITH 1;")

#Confirmar a transação
connection.commit()

# Inserindo dados na dimensão dp (ddp), no dw

In [None]:
#Inserindo registros na tabela data
print("Iniciando a inserção de dados na tabela dperiodo.")
print("-------------------------------------------------")

#Criando query 
query_dperiodo = '''
WITH RECURSIVE data(d) AS (
    VALUES ('2018-01-01'::date)
    UNION ALL
    SELECT (d + INTERVAL '1 day')::date
    FROM data
    WHERE d < ('2021-12-01'::date)
)
SELECT 
    TO_CHAR(d, 'dd-mm-YYYY') AS Data,
    CAST(TO_CHAR(d, 'MM') AS INTEGER) AS Mes,
	CAST(TO_CHAR(d, 'yyyy') AS INTEGER) AS Ano,
	CASE WHEN CAST(TO_CHAR(d, 'MM') AS INTEGER) in (1, 2, 3) THEN 1
	WHEN CAST(TO_CHAR(d, 'MM') AS INTEGER) in (4, 5, 6) THEN 2
	WHEN CAST(TO_CHAR(d, 'MM') AS INTEGER) in (7, 8, 9) THEN 3
	else 3
	end as Trimestre,
	CASE WHEN CAST(TO_CHAR(d, 'MM')AS INTEGER) IN (1, 2, 3, 4, 5, 6) THEN 1
	else 2
	end as Semestre
	
FROM data;
'''

#Executando a query e carregando na tabela dperiodo
dperiodo = pd.read_sql(query_dperiodo, connection)

dperiodo['data'] = pd.to_datetime(dperiodo['data'], format='%d-%m-%Y').dt.strftime('%Y-%m-%d')
SQL_DW.executemany('''INSERT INTO dperiodo (data, mes, ano, trimestre, semestre) VALUES (%s, %s, %s, %s, %s)''',dperiodo.values.tolist())

#confirmar a transação
connection.commit()

#Exibir uma mensagem de conclusão de carga
print("Carga finalizada!",len(dperiodo), "registros inseridos na dBPM!")

# Criando a tabela fato ocorrencias no DW

In [None]:
#definindo a qry da criação da fOcorrencias

qry_tbfocorrencias = '''
create table if not exists fOcorrencias(
	idDP INTEGER REFERENCES DDP(idDP) ON UPDATE NO ACTION ON DELETE NO ACTION,
    idbpm INTEGER REFERENCES dbpm(idbpm) ON UPDATE NO ACTION ON DELETE NO ACTION,
    idperiodo INTEGER REFERENCES dperiodo(idperiodo) ON UPDATE NO ACTION ON DELETE NO ACTION,
    codMunic INTEGER REFERENCES public."dMunicipio"(codMunic) ON UPDATE NO ACTION ON DELETE NO ACTION,
    regiao INTEGER,
    ocorrencias VARCHAR(100),
    qtde INTEGER
)
'''
SQL_DW.execute(qry_tbfocorrencias)

#Criando index 
SQL_DW.execute("CREATE INDEX IF NOT EXISTS idx_fOcorrencias ON fOcorrencias(idDP,idBPM,idPeriodo,idMunic)")
               
#confirmar a transação
conexaoDW.commit()

# Fechando a conexão com o DW

In [None]:
connection.commit()
connection.close()