In [1]:
import requests
from bs4 import BeautifulSoup
from lxml import etree 
from decouple import config
import datetime
from zipfile import ZipFile
import pandas as pd
import sqlalchemy
import psycopg2

In [2]:
url_bolsa_familia=config('URL_BOLSA_FAMILIA')
url_bpc=config('URL_BPC')
url_origem_dados=config('URL_ORIGEM_DADOS')

In [3]:
db_host=config('DB_HOST')
db_port=config('DB_PORT')
db_user=config('DB_USER')
db_password=config('DB_PASSWORD')
db_name=config('DB_NAME')

In [4]:
#A CONEXÃO DIRETA VIA PSYCOPG2 SERÁ UTILIZADA PARA CARREGAR OS CSVS BRUTOS VIA COMANDO "COPY" DO POSTGRES
conn = psycopg2.connect(f'host={db_host} dbname={db_name} port={db_port} user={db_user} password={db_password}')

#A CONEXÃO VIA SQLALCHEMY/PANDAS SERÁ UTILIZADA PARA AS QUERIES
engine = sqlalchemy.create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

In [5]:
def get_data_atualiza_bases(url_origem_dados):
    
    headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0'
    }
    
    origem_dados = requests.get(url_origem_dados, headers=headers)
    
    soup = BeautifulSoup(origem_dados.content, "html.parser") 
    dom = etree.HTML(str(soup))

    data_atualiza_novo_bolsa_familia_str = dom.xpath('//td[contains(text(), "Novo Bolsa Família")]/following-sibling::*')[2].text.strip()

    data_atualiza_bpc_str = dom.xpath('//td[contains(text(), "Instituto Nacional do Seguro Social - BPC")]/following-sibling::*')[2].text.strip()

    data_atualiza_seguro_defeso_str = dom.xpath('//td[contains(text(), "Seguro Defeso")]/following-sibling::*')[2].text.strip()

    data_atualiza_novo_bolsa_familia = datetime.datetime.strptime(data_atualiza_novo_bolsa_familia_str, '%m/%Y').date()

    data_atualiza_bpc = datetime.datetime.strptime(data_atualiza_bpc_str, '%m/%Y').date()

    data_atualiza_seguro_defeso = datetime.datetime.strptime(data_atualiza_seguro_defeso_str, '%m/%Y').date()

    return data_atualiza_novo_bolsa_familia, data_atualiza_bpc, data_atualiza_seguro_defeso

In [6]:

def verifica_tabela_novo_bolsa_familia_atualizada(engine, data_atualiza):

    df = pd.read_sql("SELECT TO_DATE(mes_competencia, 'YYYYMM') AS mes_competencia FROM benef_federais.novo_bolsa_familia LIMIT 1", engine)

    df['mes_competencia'] = pd.to_datetime(df['mes_competencia']).dt.date

    if(df.empty):
        return False
    
    if(df['mes_competencia'].iloc[0] < data_atualiza):
        return False
    
    return True

def verifica_tabela_bpc_atualizada(engine, data_atualiza):

    df = pd.read_sql("SELECT TO_DATE(mes_competencia, 'YYYYMM') AS mes_competencia FROM benef_federais.bpc LIMIT 1", engine)

    df['mes_competencia'] = pd.to_datetime(df['mes_competencia']).dt.date

    if(df.empty):
        return False
    
    if(df['mes_competencia'].iloc[0] < data_atualiza):
        return False
    
    return True

In [7]:
def download_csv_novo_bolsa_familia(url_bolsa_familia, data_atualiza):
    url_bolsa_familia_download = url_bolsa_familia + f'/{data_atualiza.strftime("%Y%m")}'

    resp = requests.get(url_bolsa_familia_download)

    arq_salvar=f'download/{data_atualiza.strftime("%Y%m")}_NovoBolsaFamilia.zip'

    with open(arq_salvar, "wb") as f:
        f.write(resp.content)

    arq_csv=f'{data_atualiza.strftime("%Y%m")}_NovoBolsaFamilia.csv'

    with ZipFile(arq_salvar) as zf:
        zf.extract(arq_csv, path='download')

def download_csv_bpc(url_bpc, data_atualiza):
    url_bpc_download = url_bpc + f'/{data_atualiza.strftime("%Y%m")}'

    resp = requests.get(url_bpc_download)

    arq_salvar=f'download/{data_atualiza.strftime("%Y%m")}_BPC.zip'

    with open(arq_salvar, "wb") as f:
        f.write(resp.content)

    arq_csv=f'{data_atualiza.strftime("%Y%m")}_BPC.csv'

    with ZipFile(arq_salvar) as zf:
        zf.extract(arq_csv, path='download')

In [8]:
def carrega_bd_novo_bolsa_familia(conn, data_atualiza):
    arq_csv=f'download/{data_atualiza.strftime("%Y%m")}_NovoBolsaFamilia.csv'

    copy_sql = f"""
            COPY benef_federais.novo_bolsa_familia FROM stdin
            CSV
            HEADER
            DELIMITER as ';'
            """

    with conn.cursor() as cur:

        cur.execute('DROP INDEX IF EXISTS benef_federais.novo_bolsa_familia_nis_favorecido_idx;')
        conn.commit()

        cur.execute('TRUNCATE TABLE benef_federais.novo_bolsa_familia;')
        conn.commit()

        with open(arq_csv, 'r', encoding='ISO 8859-1') as f:
            cur.copy_expert(sql=copy_sql, file=f)
        conn.commit()

        cur.execute('CREATE INDEX novo_bolsa_familia_nis_favorecido_idx ON benef_federais.novo_bolsa_familia (nis_favorecido);')
        conn.commit()
    
    conn.close()

def carrega_bd_bpc(conn, data_atualiza):
    arq_csv=f'download/{data_atualiza.strftime("%Y%m")}_BPC.csv'

    copy_sql = f"""
            COPY benef_federais.bpc FROM stdin
            CSV
            HEADER
            DELIMITER as ';'
            """

    with conn.cursor() as cur:

        cur.execute('DROP INDEX IF EXISTS benef_federais.bpc_nis_beneficiario_idx;')
        conn.commit()

        cur.execute('DROP INDEX IF EXISTS benef_federais.bpc_nis_representante_legal_idx;')
        conn.commit()

        cur.execute('TRUNCATE TABLE benef_federais.bpc;')
        conn.commit()

        with open(arq_csv, 'r', encoding='ISO 8859-1') as f:
            cur.copy_expert(sql=copy_sql, file=f)
        conn.commit()

        cur.execute('CREATE INDEX bpc_nis_beneficiario_idx ON benef_federais.bpc (nis_beneficiario);')
        conn.commit()

        cur.execute('CREATE INDEX bpc_nis_representante_legal_idx ON benef_federais.bpc (nis_representante_legal);')
        conn.commit()
    
    conn.close()


In [10]:
data_atualiza_novo_bolsa_familia, data_atualiza_bpc, data_atualiza_seguro_defeso = get_data_atualiza_bases(url_origem_dados=url_origem_dados)

print(data_atualiza_novo_bolsa_familia, data_atualiza_bpc, data_atualiza_seguro_defeso)

bd_novo_bolsa_familia_atualizado = verifica_tabela_novo_bolsa_familia_atualizada(engine, data_atualiza_novo_bolsa_familia)
print(bd_novo_bolsa_familia_atualizado)

bd_bpc_atualizado = verifica_tabela_bpc_atualizada(engine, data_atualiza_bpc)
print(bd_bpc_atualizado)

if(bd_novo_bolsa_familia_atualizado is False):
    #download_csv_novo_bolsa_familia(url_bolsa_familia=url_bolsa_familia, data_atualiza=data_atualiza_novo_bolsa_familia)
    carrega_bd_novo_bolsa_familia(conn=conn, data_atualiza=data_atualiza_novo_bolsa_familia)

if(bd_bpc_atualizado is False):
    #download_csv_bpc(url_bpc=url_bpc, data_atualiza=data_atualiza_bpc)
    carrega_bd_bpc(conn=conn, data_atualiza=data_atualiza_bpc)
    
    

2024-03-01 2024-04-01 2024-03-01
True
True
