## Configurações iniciais

In [None]:
from google.cloud import bigquery
from google.oauth2 import service_account
import polars as pl
import pandas as pd
import html # biblioteca para decodificar HTML
import json # biblioteca para manipulação de JSON
import gc # biblioteca para gerenciamento de memória
import re # biblioteca para expressões regulares
import glob # biblioteca para manipulação de arquivos globais
import os # biblioteca para manipulação de sistema operacional

pasta_projeto = "D:\\__case_ifood"

# Caminho do arquivo JSON da conta de serviço
credencial_gcp = os.path.join(pasta_projeto,"case-ifood-fsg-6f1d7cf34e08.json")

# Autenticando
credencial = service_account.Credentials.from_service_account_file(credencial_gcp)

# Cliente BigQuery
client = bigquery.Client(credentials=credencial, project="case-ifood-fsg")



### Função para criar o dataset e inserir uma tabela no BQ

In [None]:
def split_dataframe(df, chunk_size):
    """Divide um DataFrame Pandas em pedaços menores."""
    return [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

def criar_dataset_e_tabela(df, dataset_nome, tabela_nome, client, project_id, location="southamerica-east1", chunk_size=1_000_000):
    """
    Cria um dataset (se necessário) e insere os dados em uma tabela BigQuery usando chunks.
    Suporta DataFrames do Pandas ou Polars.
    """

    from google.cloud.exceptions import Conflict

    dataset_id = f"{project_id}.{dataset_nome}"
    table_id = f"{dataset_id}.{tabela_nome}"

    # 1. Criar dataset se não existir
    try:
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = location
        client.create_dataset(dataset, exists_ok=True)
        print(f"Dataset '{dataset_id}' pronto.")
    except Exception as e:
        print(f"Erro ao criar dataset: {e}")
        return

    # 2. Converter de Polars para Pandas, se necessário
    if isinstance(df, pl.DataFrame):
        df = df.to_pandas()
        print("Convertido de Polars para Pandas.")

    if not isinstance(df, pd.DataFrame):
        print("O objeto fornecido não é um DataFrame válido.")
        return

    # 3. Inserir em chunks
    try:
        chunks = split_dataframe(df, chunk_size)
        total_rows = 0

        for i, chunk in enumerate(chunks):
            print(f"Enviando chunk {i+1}/{len(chunks)} com {len(chunk)} linhas...")
            write_disposition = (
                bigquery.WriteDisposition.WRITE_TRUNCATE if i == 0
                else bigquery.WriteDisposition.WRITE_APPEND
            )
            job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
            job = client.load_table_from_dataframe(chunk, table_id, job_config=job_config)
            job.result()
            total_rows += len(chunk)

        print(f"Tabela '{table_id}' carregada com {total_rows} linhas.")
    except Exception as e:
        print(f"Erro ao carregar tabela por chunks: {e}")

## Transformando e inserindo os dados da Tabela merchants

In [None]:
# Query
qry = "SELECT * FROM bronze.merchants"

result = client.query(qry)
arrow_table = result.to_arrow()
df_merchants = pl.from_arrow(arrow_table)

In [None]:
df_merchants.head(5)  # Exibir as primeiras 5 linhas do DataFrame Polars

In [None]:
# Valida valores únicos, totais e nulos de merchants
df_merchants.select(
    pl.col("id").n_unique().alias("total_unique_merchants"),   # Merchants únicos
    pl.col("id").count().alias("total_merchants"),                     # Total de linhas (merchants)
    pl.col("id").is_null().sum().alias("total_null_merchants")# Merchants nulos
)

In [None]:
# Validando quantidade de merchants por status (enabled)
(
    df_merchants
    .group_by("enabled")
    .agg([
        pl.col("id").n_unique().alias("total_unique_merchants"),
        pl.col("id").count().alias("total_merchants"),
        pl.col("id").is_null().sum().alias("total_null_merchants")
    ])
)

In [None]:
# Valida as top 10 cidades com mais parceiros únicos
df_merchants.group_by("merchant_city").agg([
    pl.col("id").n_unique().alias("total_unique_merchants")
]).sort("total_unique_merchants", descending=True).head(10)  # Exibir os 10 maiores por cidade

In [None]:
# Convertendo coluna para data e capitalizando nomes de cidades
df_merchants = df_merchants.with_columns(
    pl.col("created_at").str.to_datetime(time_unit="ms"),
    pl.col("merchant_city").str.to_titlecase()
)

In [None]:
# Inserindo dados no BigQuery
criar_dataset_e_tabela(
    df=df_merchants,  
    dataset_nome="silver",
    tabela_nome="merchants",
    client=client,
    project_id="case-ifood-fsg",
    location="southamerica-east1" 
)

In [None]:
del df_merchants  # Liberar memória
del result  # Liberar memória
del arrow_table  # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

## Transformando e inserindo os dados da tabela ab_test

In [None]:
qry_ab_test = "SELECT * FROM bronze.ab_test"

result_ab_test = client.query(qry_ab_test)
arrow_table_ab_test = result_ab_test.to_arrow()
df_ab_test = pl.from_arrow(arrow_table_ab_test)

In [None]:
df_ab_test.head(3)  # Exibir as primeiras 3 linhas do DataFrame Polars

In [None]:
# Validando se há somente cliente únicos no teste A/B
df_ab_test.group_by("customer_id").agg([
    pl.col("customer_id").count().alias("total_customers"),
    pl.col("customer_id").n_unique().alias("total_unique_merchants")
]).sort("total_customers", descending=True)  # Exibir os maiores por total de clientes

In [None]:
# Verificando a porcentagem de clientes únicos no teste A/B
df_ab_test.group_by("is_target").agg([
    pl.col("customer_id").n_unique().alias("total_unique_customers")
]).with_columns([
        pl.col("total_unique_customers").sum().alias("total_customers"),
        (pl.col("total_unique_customers") / pl.col("total_unique_customers").sum()).round(3).alias("percentage_customers")
    ])

In [None]:
criar_dataset_e_tabela(
    df=df_ab_test,  
    dataset_nome="silver",
    tabela_nome="ab_test",
    client=client,
    project_id="case-ifood-fsg",
    location="southamerica-east1" 
)

In [None]:
del df_ab_test # Liberar memória
del result_ab_test  # Liberar memória
del arrow_table_ab_test
gc.collect()  # Coletar lixo para liberar memória

## Transformando e inserindo os dados da tabela consumer

In [None]:
qry_consumer = "SELECT * FROM bronze.consumer"

result_consumer = client.query(qry_consumer)
arrow_table_consumer = result_consumer.to_arrow()
df_consumer_polars = pl.from_arrow(arrow_table_consumer)

In [None]:
df_consumer_polars.head(3)  # Exibir as primeiras 3 linhas do DataFrame Polars

In [None]:
df_consumer_polars.group_by("customer_id").agg([
    pl.col("customer_id").count().alias("total_customers"),
    pl.col("customer_id").n_unique().alias("total_unique_merchants")
]).sort("total_unique_merchants", descending=True)  # Exibir os maiores por total de clientes

In [None]:
# Validando a porcentagem de clientes únicos por status e idioma
df_consumer_polars.group_by(["active","language"]).agg([
    pl.col("customer_id").n_unique().alias("total_unique_customers"),
    pl.col("customer_id").count().alias("total_customers")
]).with_columns([
    pl.col("total_customers").sum().alias("total_customers_sum"),
    (pl.col("total_unique_customers") / pl.col("total_customers").sum()).round(3).alias("percentage_customers")
])

In [None]:
# Laço utilizado para remover aspas e desfazer entidades HTML

df_consumer = df_consumer_polars.to_pandas()  # Convertendo de Polars para Pandas

for col in df_consumer.select_dtypes(include='object'):
    df_consumer[col] = (
        df_consumer[col]
        .str.replace('"', '', regex=False)  # remove aspas duplas
        .str.replace("'", "", regex=False)  # remove aspas simples
        .apply(lambda x: html.unescape(x) if isinstance(x, str) else x)
    )

In [None]:
df_consumer["created_at"] = pd.to_datetime(df_consumer["created_at"])

In [None]:
criar_dataset_e_tabela(
    df=df_consumer,  
    dataset_nome="silver",
    tabela_nome="consumer",
    client=client,
    project_id="case-ifood-fsg",
    location="southamerica-east1" 
)

In [None]:
del df_consumer_polars  # Liberar memória
#del df_consumer # Liberar memória
del result_consumer  # Liberar memória
del arrow_table_consumer  # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

## Transformando e inserindo os dados da tabela orders

#### Código abaixo utilizado para verificar a quantidade de registros por semana, pois tive dificuldades de ler toda a tabela diretamente

```sql
SELECT 
       CAST(
          DATE(
              DATE_TRUNC(o.order_created_at,WEEK)
              ) 
         AS STRING) AS semana,
       COUNT(*) qtd
FROM `bronze.orders` o
GROUP BY ALL
```

In [None]:
# Filtro criado para buscar dados mais rapidamente do BQ
semanas = [
    "2018-12-02", "2018-12-09", "2018-12-16", "2018-12-23",
    "2018-12-30", "2019-01-06", "2019-01-13", "2019-01-20", "2019-01-27"
]

In [None]:
df_orders_chunk = []

for semana in semanas:
    print(f"Lendo semana: {semana}")

    qry = f"""
        WITH base AS (
            SELECT
                CAST(DATE(DATE_TRUNC(order_created_at, WEEK)) AS STRING) AS semana,
                * EXCEPT(items)
            FROM bronze.orders
        )
        SELECT * EXCEPT(semana)
        FROM base
        WHERE semana = '{semana}'
    """

    result = client.query(qry)
    arrow_table = result.to_arrow()
    df = pl.from_arrow(arrow_table)
    df_orders_chunk.append(df)

df_orders = pl.concat(df_orders_chunk, how="vertical")

print("Dados carregados com sucesso!")
print(df_orders.shape)

In [None]:
del df # Liberar memória
del df_orders_chunk  # Liberar memória
del arrow_table  # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

In [None]:
df_orders.head(3)  # Exibir as primeiras 3 linhas do DataFrame Polars

In [None]:
# Validando quantidade de pedidos únicos por cidade de entrega
# Verificado que a coluna 'order_id' não é única por pedido, pois há pedidos com o mesmo ID em diferentes semanas
df_orders.group_by("delivery_address_city").agg([
    pl.col("order_id").n_unique().alias("total_unique_orders"),
    pl.col("order_id").count().alias("total_orders"),
    pl.col("order_total_amount").sum().alias("total_order_amount")
]).sort("total_unique_orders", descending=True)  # Exibir os maiores por total de pedidos únicos

In [None]:
# Para um mesmo pedido e mesmo cliente, tenho 2 ou mais registros,
# Verificado que há mais de um cpf para o mesmo pedido
df_orders.group_by(["order_id","customer_id"]).agg([
    pl.col("order_id").count().alias("total_orders"),
    pl.col("cpf").n_unique().alias("total_unique_cpf") # Verifica se o CPF é único por pedido
]).sort("total_orders", descending=True).head(5)  # Exibir os maiores por total de pedidos

In [None]:
# Criando a coluna 'rn' para identificar a ordem dos pedidos por cliente
df_orders = (
    df_orders.sort(["order_id", "order_created_at"])
    .with_columns([
        pl.arange(0, pl.len()).over("order_id").alias("rn") + 1
    ])
)

In [None]:
df_orders.filter(pl.col("rn") == 2 ).head(3)  # Exibir as primeiras 3 linhas do DataFrame Polars filtrado

In [None]:
# Removendo duplicatas de pedidos por cliente
df_orders = df_orders.filter(pl.col("rn") == 1)

#### Criando, transformando e inserindo o df_order no BQ, que armazenará o cabeçalho do pedido.

In [None]:
# Aplicando algumas transformações
df_orders = df_orders.with_columns(
    pl.col("cpf").cast(pl.Utf8).str.zfill(11), # preenchendo CPF com zeros à esquerda
    pl.col("delivery_address_city").str.to_titlecase(),
    pl.col("customer_name").str.to_titlecase().str.replace("'", ""),
    pl.col("delivery_address_district").str.to_titlecase(),
).select(
    pl.col("order_id"),
    pl.col("order_created_at"),
    pl.col("cpf"),
    pl.col("customer_id"),
    pl.col("customer_name"),
    pl.col("delivery_address_district"),
    pl.col("delivery_address_city"),
    pl.col("delivery_address_state"),
    pl.col("delivery_address_country"),
    pl.col("delivery_address_zip_code"),
    pl.col("delivery_address_latitude"),
    pl.col("delivery_address_longitude"),
    pl.col("delivery_address_external_id"),
    pl.col("merchant_id"),
    pl.col("merchant_latitude"),
    pl.col("merchant_longitude"),
    pl.col("merchant_timezone"),
    pl.col("order_total_amount"),
    pl.col("order_scheduled"),
    pl.col("order_scheduled_date"),
    pl.col("origin_platform")
)

In [None]:
criar_dataset_e_tabela(
    df=df_orders,  
    dataset_nome="silver",
    tabela_nome="order",
    client=client,
    project_id="case-ifood-fsg",
    location="southamerica-east1" 
)

In [None]:
del df_orders  # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

#### Criando, transformando e inserindo o df_order_details no BQ, que armazenará os itens do pedido.

In [None]:
dfs_order_details = []

for semana in semanas:
    print(f"Lendo semana: {semana}")
# Como identificado na célula anterior, a coluna 'order_id' não é única por pedido,
# então é necessário filtrar os pedidos para pegar os registros sem duplicatas.
    qry = f"""
        WITH base AS (
                SELECT
                    CAST(DATE(DATE_TRUNC(order_created_at, WEEK)) AS STRING) AS semana,
                    order_id,
                    cpf,
                    items
                FROM bronze.orders
                QUALIFY ROW_NUMBER() OVER(PARTITION BY order_id ORDER BY order_created_at ASC) = 1
            )
                SELECT * EXCEPT(semana)
                FROM base
                WHERE semana = '{semana}'
    """

    result = client.query(qry)
    arrow_table = result.to_arrow()
    df = pl.from_arrow(arrow_table)
    dfs_order_details.append(df)

df_order_details = pl.concat(dfs_order_details, how="vertical")

print("Dados carregados com sucesso!")
print(df_order_details.shape)

In [None]:
del df # Liberar memória
del dfs_order_details  # Liberar memória
del arrow_table  # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

In [None]:
# Célula criada para buscar dados locais sem precisar do BQ
#df_order_details.write_parquet(
 #   os.path.join(pasta_projeto,"order_details.parquet")
#)

#df_order_details = pl.read_parquet(os.path.join(pasta_projeto,"order_details.parquet"))


In [None]:
# função para limpar e analisar o JSON
def safe_json_parse(text):
    try:

         # Garante que o texto é string
        if not isinstance(text, str):
            return None
        
        # Etapa 1: limpeza básica
        text = text.strip()

        # Remove aspas externas: "...." → ....
        if text.startswith('"') and text.endswith('"'):
            text = text[1:-1]

        # Etapa 2: transforma \\\" → " (escape duplo)
        text = text.replace('\\\"', '"')

        # Etapa 3: transforma \" → " (escape simples)
        text = text.replace('\"', '"')

        # Etapa 4: substitui aspas duplas duplicadas no início e fim: ""abc"" → "abc"
        text = re.sub(r'""([^"]*?)""', r'"\1"', text)

        # Etapa 5: reduz excesso de aspas seguidas internas: """ → "
        text = re.sub(r'"+', r'"', text)

        # Etapa 6: remove barras soltas antes de aspas
        text = re.sub(r'\\"', r'"', text)

        # Etapa 7: parse final
        return json.loads(text)

    except Exception:
        return None

In [None]:
# Dividir o DataFrame em chunks para processamento
chunk_size = 350_000
n = df_order_details.height

# Processar cada chunk
for start in range(0, n, chunk_size):
    end = min(start + chunk_size, n)
    print(f"Processando linhas {start} até {end}...")

    chunk = df_order_details.slice(start, end - start)
    chunk = chunk.with_columns(
        pl.col("items").cast(pl.Utf8).str.strip_chars().alias("items_clean")
    )

    chunk = chunk.with_columns(
        pl.col("items_clean").map_elements(safe_json_parse, return_dtype=pl.Object).alias("parsed_items")
    )

# Filtrar os chunks válidos e inválidos
    chunk_valid = chunk.filter(pl.col("parsed_items").is_not_null())
    chunk_invalid = chunk.filter(pl.col("parsed_items").is_null())

    # Salva as inválidas direto
    if chunk_invalid.height > 0:
        chunk_invalid = chunk_invalid.with_columns([
        pl.col(c).cast(pl.Utf8) for c in chunk_invalid.columns
    ])
        chunk_invalid.write_csv(os.path.join(pasta_projeto,f"invalid_{start}.csv"))

    # Explode manualmente
    exploded_rows = []
    for row in chunk_valid.iter_rows(named=True):
        order_id = row["order_id"]
        cpf = row["cpf"]
        parsed_items = row["parsed_items"]

        if not isinstance(parsed_items, list):
            continue

        for item in parsed_items:
            exploded_rows.append({
                "order_id": order_id,
                "cpf": cpf,
                "name": str(item.get("name", "")),
                "quantity": str(item.get("quantity", "")),
                "sequence": str(item.get("sequence", "")),
                "unitPrice": str(item.get("unitPrice", {}).get("value", "")),
                "addition": str(item.get("addition", {}).get("value", "")),
                "discount": str(item.get("discount", {}).get("value", "")),
                "type": "principal"
            })

            for g in item.get("garnishItems", []):
                exploded_rows.append({
                    "order_id": order_id,
                    "cpf": cpf,
                    "name": str(g.get("name", "")),
                    "quantity": str(g.get("quantity", "")),
                    "sequence": str(g.get("sequence", "")),
                    "unitPrice": str(g.get("unitPrice", {}).get("value", "")),
                    "addition": str(g.get("addition", {}).get("value", "")),
                    "discount": str(g.get("discount", {}).get("value", "")),
                    "type": "garnish"
                })

    # Salva o resultado do chunk direto
    if exploded_rows:
        df_chunk = pl.DataFrame(exploded_rows)
        df_chunk.write_parquet(os.path.join(pasta_projeto,f"chunk_{start}.parquet"))
        del df_chunk

    # Limpa memória
    del chunk, chunk_valid, chunk_invalid, exploded_rows
    gc.collect()

print("Todos os chunks processados e salvos em disco com sucesso.")


In [None]:
del df_order_details # Liberar memória
gc.collect()  # Coletar lixo para liberar memória

In [None]:

arquivos = glob.glob(os.path.join(pasta_projeto,"chunk_*.parquet"))
dfs_parquet = [pl.read_parquet(arq) for arq in arquivos]
df_order_details_explodido = pl.concat(dfs_parquet).sort(by=["order_id", "sequence"])

In [None]:
del dfs_parquet # Liberar memória
gc.collect() # Coletar lixo para liberar memória

In [None]:
df_order_details_explodido = df_order_details_explodido.with_columns(
    pl.col("cpf").cast(pl.Utf8).str.zfill(11),
    pl.col("name").str.to_titlecase(),
    pl.col("quantity").cast(pl.Float64),
    pl.col("sequence").cast(pl.Int32),
    pl.col("unitPrice").cast(pl.Float64) / 100,
    pl.col("addition").cast(pl.Float64)  / 100,
    pl.col("discount").cast(pl.Float64)  / 100,
    pl.col("type").cast(pl.Utf8)
).unique()

In [None]:
criar_dataset_e_tabela(
    df=df_order_details_explodido,  
    dataset_nome="silver",
    tabela_nome="order_details",
    client=client,
    project_id="case-ifood-fsg",
    location="southamerica-east1" 
)