In [2]:
# Deduplicação
# Particionamento
# Ajustes de campos

In [1]:
from dotenv import load_dotenv
import os
import sys
sys.path.append('/home/r42/caseWill')

from tqdm import tqdm
from utils.connections import duckdb_postgres_query, postgres_query

In [4]:
load_dotenv()
PATH_RAW = os.getenv('MINIO_RAW')
PATH_BRONZE = os.getenv('MINIO_BRONZE')

In [None]:
create_bronze_customer = """
    CREATE TABLE IF NOT EXISTS bronze.customer (
        cotumer_id           INTEGER     PRIMARY KEY,
        entry_date           DATE        NULL,
        full_name            VARCHAR     NULL,
        birth_date           DATE        NULL,
        uf_name              VARCHAR     NULL,
        uf                   VARCHAR     NULL,
        street_name          VARCHAR     NULL
    )
"""

create_bronze_core_account = """
    CREATE TABLE IF NOT EXISTS bronze.core_account (
        transaction_id                 VARCHAR     PRIMARY KEY,
        transaction_date               DATE        NULL,
        transaction_month              DATE        NULL,
        cotumer_id                     INTEGER     NULL,
        cd_seqlan                      INTEGER     NULL,
        transaction_type               VARCHAR     NULL,
        transaction_value              DOUBLE      NULL,
        partition_year                 INTEGER     NOT NULL,
        partition_month                INTEGER     NOT NULL,
        partition_day                  INTEGER     NOT NULL
    )
"""

create_bronze_core_pix = """
    CREATE TABLE IF NOT EXISTS bronze.core_pix (
        transaction_id                 VARCHAR     PRIMARY KEY,
        transaction_date               DATE        NULL,
        transaction_month              DATE        NULL,
        cd_seqlan                      INTEGER     NULL,
        transaction_type               VARCHAR     NULL,
        transaction_value              DOUBLE      NULL,
        partition_year                 INTEGER     NOT NULL,
        partition_month                INTEGER     NOT NULL,
        partition_day                  INTEGER     NOT NULL
    )
"""

In [None]:
insert_bronze_core_account = """
    WITH raw AS (
        SELECT 
            id_transaction,
            dt_transaction,
            dt_month,
            surrogate_key,
            cd_seqlan,
            ds_transaction_type,
            vl_transaction
        FROM 
            raw.core_account
    )
    INSERT INTO bronze.core_account (
        transaction_id,
        transaction_date,
        transaction_month,
        cotumer_id,
        cd_seqlan,
        transaction_type,
        transaction_value,
        EXTRACT(YEAR FROM dt_transaction) AS partition_year,
        EXTRACT(MONTH FROM dt_transaction) AS partition_month,
        EXTRACT(DAY FROM dt_transaction) AS partition_day
    )

    COMMENT ON COLUMN bronze.core_account.id_transaction IS 'Identificador único da transação';
    COMMENT ON COLUMN bronze.core_account.dt_transaction IS 'Data de movimentação';
    COMMENT ON COLUMN bronze.core_account.dt_month IS 'Mês-ano da movimentação';
    COMMENT ON COLUMN bronze.core_account.cd_account_customer IS 'Número da conta do cliente';
    COMMENT ON COLUMN bronze.core_account.cd_seqlan IS 'Código sequencial de transações';
    COMMENT ON COLUMN bronze.core_account.ds_transaction_type IS 'Tipo da transação';
    COMMENT ON COLUMN bronze.core_account.vl_transaction IS 'Valor da transação';
    COMMENT ON COLUMN bronze.core_account.year IS 'Ano extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_account.month IS 'Mês extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_account.day IS 'Dia extraído da data de movimentação';
"""


In [None]:

insert_bronze_core_pix = """
    INSERT INTO bronze.core_pix (
        id_end_to_end,
        dt_transaction,
        dt_month,
        cd_seqlan,
        ds_transaction_type,
        vl_transaction
    )
    SELECT
        id_transaction AS id_end_to_end,
        dt_transaction,
        dt_transaction AS dt_month, -- Aqui assumimos que você quer a data completa na coluna dt_month
        CAST(cd_seqlan AS INTEGER), -- Conversão necessária para INTEGER
        ds_transaction_type,
        vl_transaction
    FROM raw.stg_core_pix;

    COMMENT ON COLUMN bronze.core_pix.id_end_to_end IS 'Identificador único da transação';
    COMMENT ON COLUMN bronze.core_pix.dt_transaction IS 'Data de movimentação';
    COMMENT ON COLUMN bronze.core_pix.dt_month IS 'Mês da movimentação';
    COMMENT ON COLUMN bronze.core_pix.cd_seqlan IS 'Código sequencial de transações';
    COMMENT ON COLUMN bronze.core_pix.ds_transaction_type IS 'Tipo da transação';
    COMMENT ON COLUMN bronze.core_pix.vl_transaction IS 'Valor da transação';
    COMMENT ON COLUMN bronze.core_pix.year IS 'Ano extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_pix.month IS 'Mês extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_pix.day IS 'Dia extraído da data de movimentação';
"""

insert_bronze_customer = """
    INSERT INTO bronze.customer (
        surrogate_key,
        entry_date,
        full_name,
        birth_date,
        uf_name,
        uf,
        street_name
    )
    SELECT
        surrogate_key,
        entry_date,
        full_name,
        birth_date,
        uf_name,
        uf,
        street_name
    FROM raw.customer;

    COMMENT ON COLUMN bronze.customer.surrogate_key IS 'Identificação única do cliente';
    COMMENT ON COLUMN bronze.customer.entry_date IS 'Data de abertura da conta';
    COMMENT ON COLUMN bronze.customer.full_name IS 'Nome completo';
    COMMENT ON COLUMN bronze.customer.birth_date IS 'Data de nascimento';
    COMMENT ON COLUMN bronze.customer.uf_name IS 'Estado de residência';
    COMMENT ON COLUMN bronze.customer.uf IS 'Sigla do estado de residência';
    COMMENT ON COLUMN bronze.customer.street_name IS 'Logradouro';
"""

In [None]:
from datalake.bronze.create_bronze_customer 

In [5]:
create_and_export_bronze_core_account = f"""
    CREATE TABLE IF NOT EXISTS bronze.core_account (
        transaction_id          VARCHAR     PRIMARY KEY,
        transaction_date        DATE        NULL,
        transaction_month       INTEGER     NULL,
        cotumer_id              INTEGER     NULL,
        cd_seqlan               INTEGER     NULL,
        transaction_type        VARCHAR     NULL,
        transaction_value       DOUBLE      NULL,
        year                    INTEGER     NOT NULL,
        month                   INTEGER     NOT NULL,
        day                     INTEGER     NOT NULL
    );
    
    CREATE OR REPLACE TEMP VIEW temp_view_bronze_core_account AS (
        SELECT
            id_transaction as transaction_id,
            dt_transaction as transaction_date,
            dt_month as transaction_month,
            surrogate_key as cotumer_id,
            cd_seqlan as cd_seqlan,
            ds_transaction_type as transaction_type,
            vl_transaction as transaction_value,
            year,
            month,
            day
        FROM (
            SELECT
                distinct *,
                EXTRACT(YEAR FROM dt_transaction) AS year,
                EXTRACT(MONTH FROM dt_transaction) AS month,
                EXTRACT(DAY FROM dt_transaction) AS day,
                row_number() OVER (PARTITION BY id_transaction ORDER BY dt_transaction ASC) AS rn
            FROM
                raw.core_account
        )
        WHERE
            rn = 1
    );
    
    INSERT INTO bronze.core_account (
        transaction_id,
        transaction_date,
        transaction_month,
        cotumer_id,
        cd_seqlan,
        transaction_type,
        transaction_value,
        year,
        month,
        day
    )
    SELECT * FROM temp_view_bronze_core_account;
    
    COPY (SELECT * FROM bronze.core_account)
    TO '{PATH_BRONZE}/core_account/' (
        FORMAT PARQUET
    );
"""

comments_bronze_core_account = """
    COMMENT ON COLUMN bronze.core_account.transaction_id IS 'Identificador único da transação';
    COMMENT ON COLUMN bronze.core_account.transaction_date IS 'Data de movimentação';
    COMMENT ON COLUMN bronze.core_account.transaction_month IS 'Mês-ano da movimentação';
    COMMENT ON COLUMN bronze.core_account.cotumer_id IS 'Número da conta do cliente';
    COMMENT ON COLUMN bronze.core_account.cd_seqlan IS 'Código sequencial de transações';
    COMMENT ON COLUMN bronze.core_account.transaction_type IS 'Tipo da transação';
    COMMENT ON COLUMN bronze.core_account.transaction_value IS 'Valor da transação';
    COMMENT ON COLUMN bronze.core_account.year IS 'Ano extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_account.month IS 'Mês extraído da data de movimentação';
    COMMENT ON COLUMN bronze.core_account.day IS 'Dia extraído da data de movimentação';
"""

In [6]:
postgres_query(comments_bronze_core_account)

Query executed successfully.


In [8]:
from dotenv import load_dotenv
load_dotenv()
import os
import sys
sys.path.append('/home/r42/caseWill')
from datetime import datetime

from utils.connections import duckdb_postgres_query, postgres_query


PATH_BRONZE = os.getenv('MINIO_BRONZE')
FILE_NAME = datetime.now().strftime('%Y-%m-%d')


query = f"""
    CREATE TABLE IF NOT EXISTS bronze.customer (
        cotumer_id           INTEGER     PRIMARY KEY,
        entry_date           DATE        NULL,
        full_name            VARCHAR     NULL,
        birth_date           DATE        NULL,
        uf_name              VARCHAR     NULL,
        uf                   VARCHAR     NULL,
        street_name          VARCHAR     NULL
    );
    
    CREATE OR REPLACE TEMP VIEW temp_view_bronze_customer AS (
        SELECT
            surrogate_key,
            entry_date,
            full_name,
            birth_date,
            uf_name,
            uf,
            street_name
        FROM (
            SELECT
                *,   
                ROW_NUMBER() OVER (PARTITION BY surrogate_key ORDER BY entry_date DESC) AS rn
            FROM
                raw.customer
        ) subquery
        WHERE
            rn = 1
    );
    
    INSERT INTO bronze.customer (
        cotumer_id,
        entry_date,
        full_name,
        birth_date,
        uf_name,
        uf,
        street_name
    )
    SELECT * FROM temp_view_bronze_customer;
    
    COPY (SELECT * FROM bronze.customer)
    TO '{PATH_BRONZE}/customer/{FILE_NAME}.parquet' (
        FORMAT PARQUET
    );
"""
comments = """
    COMMENT ON COLUMN bronze.customer.cotumer_id IS 'Identificação única do cliente';
    COMMENT ON COLUMN bronze.customer.entry_date IS 'Data de abertura da conta';
    COMMENT ON COLUMN bronze.customer.full_name IS 'Nome completo';
    COMMENT ON COLUMN bronze.customer.birth_date IS 'Data de nascimento';
    COMMENT ON COLUMN bronze.customer.uf_name IS 'Estado de residência';
    COMMENT ON COLUMN bronze.customer.uf IS 'Sigla do estado de residência';
    COMMENT ON COLUMN bronze.customer.street_name IS 'Logradouro';
"""

duckdb_postgres_query(query)
postgres_query(comments)

Query executed successfully.


In [11]:
duckdb_postgres_query("drop table bronze.core_pix")

In [3]:
PATH_BRONZE

In [3]:
duckdb_postgres_query("select * from raw.customer").fetchall

AttributeError: 'NoneType' object has no attribute 'fetchall'