In [2]:
from google.oauth2.service_account import Credentials
from airflow.operators.python import PythonOperator
from google.cloud.storage.client import Client as StorageClient
from google.cloud.bigquery import Client as BqClient
from airflow.decorators import task, dag
from datetime import timedelta
from io import BytesIO

import pandas as pd
import datetime
import pendulum
import airflow
import logging
import os

In [3]:
LOG_FORMAT = "%(asctime)s [%(levelname)s]: %(threadName)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel("INFO")

# Constantes

CREDENTIALS_PATH = "/home/costa/Documentos/KarHub/karhub-challenge/airflow/credentials/karhub-key.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = CREDENTIALS_PATH

DAG_ID = "processing_dag"
DAG_OWNER_NAME = "Data Engineering"
SCHEDULE_INTERVAL = None  # Dag com agendamento MANUAL
BUCKET = "etl-karhub-dados-sp"
START_DATE = pendulum.today('UTC').add(days=-1)
PROJECT = "karhub-434807"

In [4]:

def csv_to_storage(
    local_file_path: str, gcs_file_path: str, bucket_name: str = "etl-karhub-dados-sp"
):
    client = StorageClient(project=PROJECT)
    try:
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(gcs_file_path)
        blob.upload_from_filename(local_file_path)
        logger.info("Upload do arquivo %s foi realizado", local_file_path)
    except Exception as e:
        logger.error("Erro encontrado: %s", e)

In [5]:
def storage_to_raw(
    gcs_file_path: str,
    dataset_name: str,
    table_name: str,
    bucket_name: str = "etl-karhub-dados-sp",
):
    client = StorageClient(project=PROJECT)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(gcs_file_path)

In [6]:
client = StorageClient(project=PROJECT)
bucket = client.bucket("etl-karhub-dados-sp")
blob_receitas = bucket.blob("receitas/gdvReceitas.csv")
blob_despesas = bucket.blob("despesas/gdvDespesas.csv")

In [12]:
content = blob_despesas.download_as_bytes()

In [13]:
pd.set_option('display.max_colwidth', 255)
pd.set_option('display.max_rows', 9999)

data = BytesIO(content)
df = pd.read_csv(data, encoding="latin_1")

In [14]:
df.head()

Unnamed: 0,Fonte de Recursos,Despesa,Liquidado,Unnamed: 3
0,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900119 - ATRASADOS-OUTROS PODERES/MINIST.PUBLICO,"79.760.504,67",
1,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900124 - COMPLEMENTACAO DE APOSENTADORIA,"1.850.834.374,26",
2,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900125 - COMPLEMENTACAO DE APOSENTADORIA - 13ºSALARIO,"124.536.202,87",
3,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900325 - COMPLEMENTACAO DE PENSAO,"10.948.297,00",
4,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900326 - COMPLEMENTACAO DE PENSAO - 13º SALARIO,"521.632,30",


## Problemas

Ao ler esse dataframe, foi criada uma coluna chamada "Unnamed: 3", proveniente da coluna vazia lá no arquivo `.csv`. E também, a última linha é a linha de total, o que pode ser ruim para o autoschema do BigQuery, que é o que utilizaremos para esse projeto.

Para evitar problemas futuros, vamos remover essa coluna e também essa linha.

Aproveitando que já estamos alterando o arquivo base, vamos adicionar a coluna `dt_insert`.

In [15]:
df = df.drop(columns=["Unnamed: 3"]).drop(df.index[-1])
df['dt_insert'] = pendulum.today('utc').to_iso8601_string()

In [16]:
df.head()

Unnamed: 0,Fonte de Recursos,Despesa,Liquidado,dt_insert
0,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900119 - ATRASADOS-OUTROS PODERES/MINIST.PUBLICO,"79.760.504,67",2024-09-07T00:00:00Z
1,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900124 - COMPLEMENTACAO DE APOSENTADORIA,"1.850.834.374,26",2024-09-07T00:00:00Z
2,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900125 - COMPLEMENTACAO DE APOSENTADORIA - 13ºSALARIO,"124.536.202,87",2024-09-07T00:00:00Z
3,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900325 - COMPLEMENTACAO DE PENSAO,"10.948.297,00",2024-09-07T00:00:00Z
4,001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR,31900326 - COMPLEMENTACAO DE PENSAO - 13º SALARIO,"521.632,30",2024-09-07T00:00:00Z


## E agora?

Aqui já temos o dado pronto para ir para a camada Raw.

Podemos passar para os dados da tabela de Receitas.



## Próximo passo: Tratar os dados Raw

In [17]:
dataset_id = "gdv_raw"
table_id = "despesas_raw"
bq_client = BqClient(project=PROJECT)

query = f"""
        SELECT *
        FROM `{PROJECT}.{dataset_id}.{table_id}`
        """
        
rows = bq_client.query(query)

In [25]:
data_despesas = rows.to_dataframe()


In [31]:
import polars as pl
pl.Config(tbl_rows=100, tbl_cols=10, fmt_str_lengths=150, fmt_float="full")

data_despesas = pl.DataFrame(data_despesas)

In [40]:
despesas = data_despesas.with_columns(
    pl.col("Liquidado").map_elements(lambda x: float(x.strip().replace(".", "").replace(",",".")), return_dtype=pl.Float64).alias("liquidado"),
    pl.col("Fonte de Recursos").str.splitn(" - ", 2).struct.field('field_0').alias("id_fonte_recurso"),
    pl.col("Fonte de Recursos").str.splitn(" - ", 2).struct.field('field_1').alias("nome_fonte_recurso"),
    
).filter(
 ~pl.col("Despesa").str.contains("TOTAL") 
).select(
    pl.col("liquidado"),
    pl.col("id_fonte_recurso"),
    pl.col("nome_fonte_recurso")
)

In [41]:
despesas.head()

liquidado,id_fonte_recurso,nome_fonte_recurso
f64,str,str
79760504.67,"""001""","""TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR"""
1850834374.26,"""001""","""TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR"""
124536202.87,"""001""","""TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR"""
10948297.0,"""001""","""TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR"""
521632.3,"""001""","""TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR"""


Developed by

```shell
             ___         __  __              
            /   |  _____/ /_/ /_  __  _______
           / /| | / ___/ __/ __ \/ / / / ___/
          / ___ |/ /  / /_/ / / / /_/ / /    
         /_/  |_/_/   \__/_/ /_/\__,_/_/     
```