In [None]:
import pandas as pd
from google.cloud import bigquery
import numpy as np

In [None]:
# 1. PARÂMETROS E CONFIGURAÇÕES
project_id = "site-da-laica"
bucket_raw_path = "gs://sympla/raw/Base de Vendas para Teste Técnico Dados__20250904 (1).csv"

# Nomenclatura de destino na camada Trusted
dataset_id = "trusted"
table_id = "vendas_historico"
destination_table = f"{project_id}.{dataset_id}.{table_id}"
bucket_trusted_path = "gs://sympla/trusted/vendas_historico.parquet"
path_refined_base = "gs://sympla/refined/"

In [None]:
# 2. EXTRAÇÃO (EXTRACT - RAW)
print(f"Lendo dados da camada Raw: {bucket_raw_path}")

# Lendo o CSV. O delimitador da base é o ponto e vírgula (;)
df = pd.read_csv(bucket_raw_path, sep=';')

Lendo dados da camada Raw: gs://sympla/raw/Base de Vendas para Teste Técnico Dados__20250904 (1).csv


In [None]:
from decimal import Decimal, ROUND_HALF_UP
# 3. TRANSFORMAÇÃO E NORMALIZAÇÃO (TRANSFORM - TRUSTED)
print("Iniciando normalização de tipagem de dados...")

# a) Normalização de Data (Mantendo o tipo datetime mas zerando as horas)
# O normalize() garante que 2034-02-01 14:30:00 vire 2034-02-01 00:00:00
df['dt_venda'] = pd.to_datetime(df['dt_venda'], errors='coerce').dt.normalize()

# Remove as linhas nulas
df = df.dropna(subset=['dt_venda'])

# b) Normalização de Valores Financeiros (Decimal exato com 2 casas)
cols_to_decimal = [
    'vr_venda',
    'vr_medio_ingresso'
]

for col in cols_to_decimal:
    df[col] = (df[col].astype(str)
                      .str.replace('.', '', regex=False) # Remove o separador de milhar
                      .str.replace(',', '.', regex=False) # Troca a vírgula decimal por ponto
                      # Aplica a tipagem Decimal arredondando para 2 casas com precisão matemática
                      .apply(lambda x: Decimal(x).quantize(Decimal('0.00'), rounding=ROUND_HALF_UP)))

# c) Normalização de Quantidades (Arredondamento para Inteiro)
# Algumas colunas possuem vírgula (ex: "2,5"). Primeiro passamos para float, arredondamos e viramos int.
cols_to_int = [
    'qt_ingresso',
    'qt_evento',
    'qt_produtor',
    'qt_ingresso_por_evento',
    'qt_evento_por_produtor'
]

for col in cols_to_int:
    df[col] = (df[col].astype(str)
                      .str.replace('.', '', regex=False)
                      .str.replace(',', '.', regex=False)
                      .astype(float) # Força ser número primeiro
                      .round(0)      # Arredonda (ex: 2.5 vira 3.0, 2.4 vira 2.0)
                      .fillna(0)     # Trata os nulos
                      .astype(int))  # Converte definitivamente para Inteiro

# d) Normalização de Strings/Dimensões (Caixa Alta e sem espaços inúteis)
cols_to_string = [
    'nm_localidade_estado',
    'tp_produtor_canal_aquisicao',
    'tp_tamanho_produtor',
    'nm_evento_classificacao_negocio'
]

for col in cols_to_string:
    df[col] = df[col].astype(str).str.strip().str.upper()

print("Transformações concluídas. Schema atualizado:")
print(df.dtypes)

Iniciando normalização de tipagem de dados...
Transformações concluídas. Schema atualizado:
dt_venda                           datetime64[ns]
nm_localidade_estado                       object
tp_produtor_canal_aquisicao                object
tp_tamanho_produtor                        object
nm_evento_classificacao_negocio            object
vr_venda                                   object
qt_ingresso                                 int64
qt_evento                                   int64
qt_produtor                                 int64
qt_ingresso_por_evento                      int64
qt_evento_por_produtor                      int64
vr_medio_ingresso                          object
dtype: object


In [None]:
# 4. CARGA NO STORAGE (LOAD - TRUSTED)
print(f"Salvando arquivo Parquet normalizado na camada Trusted: {bucket_trusted_path}...")

# Salva diretamente no GCS. O parâmetro index=False evita criar uma coluna extra inútil
# A engine 'pyarrow' é o padrão da indústria para lidar com Parquet no Python
df.to_parquet(bucket_trusted_path, index=False, engine='pyarrow')

print("Sucesso! Arquivo Parquet gerado e salvo na camada Trusted do Storage.")

Salvando arquivo Parquet normalizado na camada Trusted: gs://sympla/trusted/vendas_historico.parquet...
Sucesso! Arquivo Parquet gerado e salvo na camada Trusted do Storage.


In [None]:
# 5. CRIAÇÃO DAS TABELAS DIMENSÃO
print(f"Lendo dados da camada Trusted: {bucket_trusted_path}...")
df_trusted = pd.read_parquet(bucket_trusted_path)
print("Modelando as Tabelas Dimensão...")

# --- DIM TEMPO (Calendário Completo até Dez/2034) ---
# Em vez de pegar apenas as datas existentes, geramos um intervalo contínuo
data_inicio = df_trusted['dt_venda'].min()
data_fim = pd.Timestamp('2034-12-31')

# Criamos a sequência diária de datas
calendario = pd.date_range(start=data_inicio, end=data_fim, freq='D')
df_tempo = pd.DataFrame({'dt_venda': calendario})

# Cria as SKs e colunas auxiliares (mantendo sua lógica original)
df_tempo['sk_tempo'] = df_tempo['dt_venda'].dt.strftime('%Y%m%d').astype(int)
df_tempo['ano'] = df_tempo['dt_venda'].dt.year
df_tempo['mes'] = df_tempo['dt_venda'].dt.month
df_tempo['trimestre'] = df_tempo['dt_venda'].dt.quarter
df_tempo['ano_mes'] = df_tempo['dt_venda'].dt.strftime('%Y-%m')

# Converte a coluna original para 'date' (sem horas e sem fuso) logo antes de exportar
df_tempo['dt_venda'] = df_tempo['dt_venda'].dt.date

# --- DIM LOCALIDADE ---
# Extrai estados únicos e gera um ID sequencial
df_localidade = df_trusted[['nm_localidade_estado']].drop_duplicates().reset_index(drop=True)
df_localidade['sk_localidade'] = df_localidade.index + 1

# --- DIM PRODUTOR ---
# Combinação única de tamanho e canal de aquisição
df_produtor = df_trusted[['tp_tamanho_produtor', 'tp_produtor_canal_aquisicao']].drop_duplicates().reset_index(drop=True)
df_produtor['sk_produtor'] = df_produtor.index + 1

# --- DIM EVENTO ---
# Classificação do negócio (ex: Corporativo, Esportivo)
df_evento = df_trusted[['nm_evento_classificacao_negocio']].drop_duplicates().reset_index(drop=True)
df_evento['sk_evento'] = df_evento.index + 1

print(f"Dimensões modeladas com sucesso! Calendário expandido até {data_fim.date()}.")

Lendo dados da camada Trusted: gs://sympla/trusted/vendas_historico.parquet...
Modelando as Tabelas Dimensão...
Dimensões modeladas com sucesso! Calendário expandido até 2034-12-31.


In [None]:
# 6. CRIAÇÃO DA TABELA FATO
print("Construindo a Tabela Fato (Cruzamento de Chaves)...")

# Fazemos o MERGE (JOIN) do dataframe original com as dimensões para capturar as SKs
df_fato = df_trusted.copy()

# Join com Tempo
df_fato['sk_tempo'] = df_fato['dt_venda'].dt.strftime('%Y%m%d').astype(int)

# Join com Localidade
df_fato = df_fato.merge(df_localidade, on='nm_localidade_estado', how='left')

# Join com Produtor
df_fato = df_fato.merge(df_produtor, on=['tp_tamanho_produtor', 'tp_produtor_canal_aquisicao'], how='left')

# Join com Evento
df_fato = df_fato.merge(df_evento, on='nm_evento_classificacao_negocio', how='left')

# Selecionar APENAS as chaves (SKs) e as métricas para a tabela Fato
colunas_fato = [
    'sk_tempo', 'sk_localidade', 'sk_produtor', 'sk_evento',
    'vr_venda', 'qt_ingresso', 'qt_evento', 'qt_produtor',
    'qt_ingresso_por_evento', 'qt_evento_por_produtor', 'vr_medio_ingresso'
]
df_fato = df_fato[colunas_fato]


Construindo a Tabela Fato (Cruzamento de Chaves)...


In [None]:
# 7. CARGA NO STORAGE (SALVANDO EM PARQUET)
print("Salvando o Star Schema na camada Refined (Storage)...")

tabelas = {
    "dim_tempo": df_tempo,
    "dim_localidade": df_localidade,
    "dim_produtor": df_produtor,
    "dim_evento": df_evento,
    "fato_vendas_case": df_fato
}

for nome_tabela, df in tabelas.items():
    caminho_destino = f"{path_refined_base}{nome_tabela}.parquet"
    df.to_parquet(caminho_destino, index=False, engine='pyarrow')
    print(f" -> Salvo: {caminho_destino} | Linhas: {len(df)}")

print("\nModelagem Dimensional (Star Schema) concluída com sucesso!")

Salvando o Star Schema na camada Refined (Storage)...
 -> Salvo: gs://sympla/refined/dim_tempo.parquet | Linhas: 699
 -> Salvo: gs://sympla/refined/dim_localidade.parquet | Linhas: 29
 -> Salvo: gs://sympla/refined/dim_produtor.parquet | Linhas: 10
 -> Salvo: gs://sympla/refined/dim_evento.parquet | Linhas: 6
 -> Salvo: gs://sympla/refined/fato_vendas_case.parquet | Linhas: 15004

Modelagem Dimensional (Star Schema) concluída com sucesso!


In [None]:
from google.cloud import bigquery

# 8. PARÂMETROS E CONFIGURAÇÕES
# O seu Dataset no BigQuery
dataset_id = "site-da-laica.sympla"

# O caminho base Refined no Storage
gcs_base_uri = "gs://sympla/refined/"

# A lista das tabelas do Star Schema
tabelas_star_schema = [
    "dim_tempo",
    "dim_localidade",
    "dim_produtor",
    "dim_evento",
    "fato_vendas_case"
]

# Inicializa o Client do BigQuery
client = bigquery.Client(project="site-da-laica")

print(f"Iniciando a carga de dados para Tabelas NATIVAS no dataset: {dataset_id}...\n")

# LOOP DE CRIAÇÃO E CARGA DAS TABELAS
for nome_tabela in tabelas_star_schema:

    # Monta a URI exata do arquivo Parquet no Storage
    source_uri = f"{gcs_base_uri}{nome_tabela}.parquet"

    # Define o ID completo da tabela no BigQuery
    table_id = f"{dataset_id}.{nome_tabela}"

    # Configura o Job de Carga (Load Job)
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        # WRITE_TRUNCATE substitui os dados antigos pelos novos.
        # Ideal para recargas completas de dimensões e fatos.
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )

    print(f"Carregando {source_uri} na tabela {table_id}...")

    # EXECUÇÃO NO BIGQUERY
    try:
        # Inicia o job de ingestão (copiando do Storage para o disco nativo do BQ)
        load_job = client.load_table_from_uri(
            source_uri, table_id, job_config=job_config
        )

        # Aguarda o processamento do job finalizar
        load_job.result()

        # Verifica a tabela final para confirmar o sucesso
        destination_table = client.get_table(table_id)
        print(f"Sucesso! Tabela Nativa criada com {destination_table.num_rows} linhas.\n")

    except Exception as e:
        print(f"Erro ao carregar a tabela {nome_tabela}: {e}")

print("Todas as Tabelas Nativas foram carregadas com sucesso!")

Iniciando a carga de dados para Tabelas NATIVAS no dataset: site-da-laica.sympla...

Carregando gs://sympla/refined/dim_tempo.parquet na tabela site-da-laica.sympla.dim_tempo...
Sucesso! Tabela Nativa criada com 699 linhas.

Todas as Tabelas Nativas foram carregadas com sucesso!


In [None]:
# 9. Modelo de Forecasting (Séries Temporais) para projetar o restante de 2034.
query_treino = f"""
SELECT
    t.ano,
    t.mes,
    t.trimestre,
    l.sk_localidade,
    p.sk_produtor,
    e.sk_evento,
    SUM(f.vr_venda) as vr_venda,
    SUM(f.qt_ingresso) as qt_ingresso,
    SUM(f.qt_evento) as qt_evento,
    SUM(f.qt_produtor) as qt_produtor
FROM `{dataset_id}.fato_vendas` f
JOIN `{dataset_id}.dim_tempo` t ON f.sk_tempo = t.sk_tempo
JOIN `{dataset_id}.dim_localidade` l ON f.sk_localidade = l.sk_localidade
JOIN `{dataset_id}.dim_produtor` p ON f.sk_produtor = p.sk_produtor
JOIN `{dataset_id}.dim_evento` e ON f.sk_evento = e.sk_evento
WHERE t.dt_venda < '2034-03-01'
GROUP BY 1, 2, 3, 4, 5, 6
"""

df_train = client.query(query_treino).to_dataframe()

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import itertools
from decimal import Decimal, ROUND_HALF_UP

# 10. TREINAMENTO E PREDIÇÃO COM GRANULARIDADE TOTAL
targets = ['vr_venda', 'qt_ingresso', 'qt_evento', 'qt_produtor']
X = df_train[['ano', 'mes', 'trimestre', 'sk_localidade', 'sk_produtor', 'sk_evento']]
y = df_train[targets]

# Treino
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
modelo_fato = RandomForestRegressor(n_estimators=100, random_state=42)
modelo_fato.fit(X_train, y_train)
print(f"Modelo treinado! Score R²: {modelo_fato.score(X_test, y_test):.2f}")

# --- A. GERAR COMBINAÇÕES PARA 2034 (Granularidade Real) ---
print("Gerando combinações de dimensões para o futuro...")
# Extraímos todas as combinações únicas de dimensões que existiram no histórico
dimensoes_unicas = df_train[['sk_localidade', 'sk_produtor', 'sk_evento']].drop_duplicates()

# Definimos os meses de Março a Dezembro
meses_futuros = pd.DataFrame({'mes': list(range(3, 13))})
meses_futuros['ano'] = 2034
meses_futuros['trimestre'] = meses_futuros['mes'].apply(lambda m: (m-1)//3 + 1)

# CROSS JOIN: Meses x (Localidade/Produtor/Evento) para ter milhares de linhas
meses_faltantes = meses_futuros.merge(dimensoes_unicas, how='cross')

# --- B. REALIZAR PREDIÇÃO ---
X_futuro = meses_faltantes[['ano', 'mes', 'trimestre', 'sk_localidade', 'sk_produtor', 'sk_evento']]
previsoes_array = modelo_fato.predict(X_futuro)

df_proj_valores = pd.DataFrame(previsoes_array, columns=targets)
fato_2034_proj = pd.concat([meses_faltantes.reset_index(drop=True), df_proj_valores], axis=1)
fato_2034_proj['flag_previsao'] = 1

# --- C. CONSOLIDAÇÃO E LIMPEZA ---
# Pegamos o Real (Até Fevereiro)
df_real = df_train[~((df_train['ano'] == 2034) & (df_train['mes'] >= 3))].copy()
df_real['flag_previsao'] = 0

fato_consolidada = pd.concat([df_real, fato_2034_proj], ignore_index=True)

# Reconstrução da sk_tempo (Chave diária para o BI)
fato_consolidada['sk_tempo'] = (fato_consolidada['ano'] * 10000 + fato_consolidada['mes'] * 100 + 1).astype(int)

# --- D. CÁLCULO DE MÉTRICAS E TIPAGEM ---
# Forçamos float para cálculo e depois Decimal/Int para o BQ
fato_consolidada['vr_venda'] = fato_consolidada['vr_venda'].astype(float)

fato_consolidada['vr_medio_ingresso'] = (fato_consolidada['vr_venda'] / fato_consolidada['qt_ingresso']).replace([np.inf, -np.inf], 0).fillna(0)
fato_consolidada['qt_ingresso_por_evento'] = (fato_consolidada['qt_ingresso'] / fato_consolidada['qt_evento']).replace([np.inf, -np.inf], 0).fillna(0)
fato_consolidada['qt_evento_por_produtor'] = (fato_consolidada['qt_evento'] / fato_consolidada['qt_produtor']).replace([np.inf, -np.inf], 0).fillna(0)

def para_decimal(x):
    return Decimal(str(x)).quantize(Decimal('0.00'), rounding=ROUND_HALF_UP)

# Aplicando Tipos Finais (NUMERIC e INTEGER)
for col in ['vr_venda', 'vr_medio_ingresso']:
    fato_consolidada[col] = fato_consolidada[col].apply(para_decimal)

cols_int = ['sk_tempo', 'sk_localidade', 'sk_produtor', 'sk_evento', 'qt_ingresso', 'qt_evento', 'qt_produtor', 'qt_ingresso_por_evento', 'qt_evento_por_produtor', 'flag_previsao']
for col in cols_int:
    fato_consolidada[col] = fato_consolidada[col].round(0).astype(int)

# --- E. CARGA NO BIGQUERY ---
tabela_final = f"{dataset_id}.fato_vendas"
import pandas_gbq
pandas_gbq.to_gbq(fato_consolidada, tabela_final, project_id=project_id, if_exists='replace')

print(f"Sucesso! Tabela {tabela_final} criada com {len(fato_consolidada)} linhas e granularidade total.")

Modelo treinado! Score R²: 0.85
Gerando combinações de dimensões para o futuro...


100%|██████████| 1/1 [00:00<00:00, 10894.30it/s]

Sucesso! Tabela site-da-laica.sympla.fato_vendas criada com 28552 linhas e granularidade total.



