# Case Técnico iFood Caio Santos — Pipeline de Dados (ETL)

Este notebook realiza a ingestão, limpeza e construção da base analítica para o case.  


In [1]:
from google.colab import drive
import os

drive.mount("/content/drive")

BASE_DIR = "/content/drive/MyDrive/case_ifood"
os.makedirs(BASE_DIR, exist_ok=True)
os.chdir(BASE_DIR)

print("cwd:", os.getcwd())


Mounted at /content/drive
cwd: /content/drive/MyDrive/case_ifood


In [2]:
import os

DIRS = {
    "raw": "data/raw",
    "processed": "data/processed",

    "raw_consumer": "data/raw/consumer",
    "raw_restaurant": "data/raw/restaurant",
    "raw_ab_test_ref": "data/raw/ab_test_ref",
    "raw_order": "data/raw/order",

    "clean_consumer": "data/processed/clean/consumer",
    "clean_restaurant": "data/processed/clean/restaurant",
    "clean_ab_test_ref": "data/processed/clean/ab_test_ref",
    "clean_order": "data/processed/clean/order",

    "order_parts_raw": "data/processed/clean/order/parts_raw",
    "order_parts_clean": "data/processed/clean/order/parts_clean",
    "order_dedup": "data/processed/clean/order/dedup",

    "analytics_orders_enriched": "data/processed/analytics/orders_enriched",

    "reports": "reports",
}

for p in DIRS.values():
    os.makedirs(p, exist_ok=True)

FILES = {
    "consumer_gz": os.path.join(DIRS["raw_consumer"], "consumer.csv.gz"),
    "restaurant_gz": os.path.join(DIRS["raw_restaurant"], "restaurant.csv.gz"),
    "ab_test_ref_tgz": os.path.join(DIRS["raw_ab_test_ref"], "ab_test_ref.tar.gz"),
    "order_json_gz": os.path.join(DIRS["raw_order"], "order.json.gz"),
}

FILES


{'consumer_gz': 'data/raw/consumer/consumer.csv.gz',
 'restaurant_gz': 'data/raw/restaurant/restaurant.csv.gz',
 'ab_test_ref_tgz': 'data/raw/ab_test_ref/ab_test_ref.tar.gz',
 'order_json_gz': 'data/raw/order/order.json.gz'}

## 1. Ingestão de Dados (raw)



In [3]:
from urllib.request import urlretrieve
import os

URLS = {
    "consumer": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/consumer.csv.gz",
    "restaurant": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/restaurant.csv.gz",
    "ab_test_ref": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/ab_test_ref.tar.gz",
    "order": "https://data-architect-test-source.s3-sa-east-1.amazonaws.com/order.json.gz",
}

def download_if_missing(url: str, path: str) -> None:
    if os.path.exists(path) and os.path.getsize(path) > 0:
        return
    urlretrieve(url, path)

download_if_missing(URLS["consumer"], FILES["consumer_gz"])
download_if_missing(URLS["restaurant"], FILES["restaurant_gz"])
download_if_missing(URLS["ab_test_ref"], FILES["ab_test_ref_tgz"])
download_if_missing(URLS["order"], FILES["order_json_gz"])

{f: os.path.exists(p) for f, p in FILES.items()}


{'consumer_gz': True,
 'restaurant_gz': True,
 'ab_test_ref_tgz': True,
 'order_json_gz': True}

## 2. Tratamento das dimensões

In [4]:
import gzip
import shutil
import pandas as pd
import os

tmp_consumer = os.path.join(DIRS["clean_consumer"], "consumer_tmp.csv")
path_consumer_clean = os.path.join(DIRS["clean_consumer"], "consumers.parquet")

with gzip.open(FILES["consumer_gz"], "rb") as f_in, open(tmp_consumer, "wb") as f_out:
    shutil.copyfileobj(f_in, f_out)

df_consumers = pd.read_csv(tmp_consumer)
df_consumers["customer_id"] = df_consumers["customer_id"].astype(str).str.strip()

df_consumers = df_consumers.drop_duplicates(subset=["customer_id"]).reset_index(drop=True)
df_consumers.to_parquet(path_consumer_clean, index=False)

df_consumers.shape, path_consumer_clean


((806156, 7), 'data/processed/clean/consumer/consumers.parquet')

In [5]:
tmp_restaurant = os.path.join(DIRS["clean_restaurant"], "restaurant_tmp.csv")
path_restaurant_clean = os.path.join(DIRS["clean_restaurant"], "restaurants.parquet")

with gzip.open(FILES["restaurant_gz"], "rb") as f_in, open(tmp_restaurant, "wb") as f_out:
    shutil.copyfileobj(f_in, f_out)

df_restaurants = pd.read_csv(tmp_restaurant)

if "id" in df_restaurants.columns:
    df_restaurants = df_restaurants.rename(columns={"id": "merchant_id"})

df_restaurants["merchant_id"] = df_restaurants["merchant_id"].astype(str).str.strip()
df_restaurants = df_restaurants.drop_duplicates(subset=["merchant_id"]).reset_index(drop=True)

df_restaurants.to_parquet(path_restaurant_clean, index=False)

df_restaurants.shape, path_restaurant_clean


((7292, 12), 'data/processed/clean/restaurant/restaurants.parquet')

In [6]:
import tarfile
import os
import pandas as pd

ab_extract_dir = os.path.join(DIRS["clean_ab_test_ref"], "tmp_extract")
os.makedirs(ab_extract_dir, exist_ok=True)

with tarfile.open(FILES["ab_test_ref_tgz"], "r:gz") as tar:
    tar.extractall(ab_extract_dir)

path_ab_csv = os.path.join(ab_extract_dir, "ab_test_ref.csv")
path_ab_clean = os.path.join(DIRS["clean_ab_test_ref"], "ab_test_ref.parquet")

df_ab = pd.read_csv(path_ab_csv)

df_ab["customer_id"] = df_ab["customer_id"].astype(str).str.strip()
df_ab["is_target"] = df_ab["is_target"].astype(str).str.strip()

df_ab = df_ab[df_ab["customer_id"].notna() & (df_ab["customer_id"] != "")]
df_ab = df_ab[df_ab["is_target"].isin(["target", "control"])]

df_ab = df_ab.drop_duplicates(subset=["customer_id"]).reset_index(drop=True)

df_ab.to_parquet(path_ab_clean, index=False)

df_ab["is_target"].value_counts(), df_ab.shape, path_ab_clean


  tar.extractall(ab_extract_dir)


(is_target
 target     445925
 control    360542
 Name: count, dtype: int64,
 (806467, 2),
 'data/processed/clean/ab_test_ref/ab_test_ref.parquet')

## 3. Orders: particionamento, clean e dedup

Como a duplicidade relevante de `order_id`; foi feita uma deduplicação (1 linha por `order_id`) para evitar dupla contagem de métricas.



In [7]:
import pandas as pd
import os

cols_orders = [
    "order_id",
    "customer_id",
    "order_created_at",
    "order_total_amount",
    "merchant_id",
    "order_scheduled",
    "origin_platform",
    "delivery_address_city",
    "delivery_address_state",
    "delivery_address_country",
]

i = 0
for chunk in pd.read_json(FILES["order_json_gz"], lines=True, chunksize=100_000):
    chunk = chunk[cols_orders]
    chunk.to_parquet(os.path.join(DIRS["order_parts_raw"], f"part_{i:04d}.parquet"), index=False)
    i += 1

{"parts_geradas": i, "output_dir": DIRS["order_parts_raw"]}


{'parts_geradas': 37, 'output_dir': 'data/processed/clean/order/parts_raw'}

In [8]:
import glob
import pandas as pd
import os

raw_parts = sorted(glob.glob(os.path.join(DIRS["order_parts_raw"], "part_*.parquet")))

clean_count = 0
linhas_raw = 0
linhas_clean = 0

for idx, p in enumerate(raw_parts):
    dfp = pd.read_parquet(p)
    linhas_raw += len(dfp)

    dfp["customer_id"] = dfp["customer_id"].astype(str).str.strip()
    dfp["order_id"] = dfp["order_id"].astype(str).str.strip()
    dfp["merchant_id"] = dfp["merchant_id"].astype(str).str.strip()

    dfp = dfp[dfp["customer_id"].notna() & (dfp["customer_id"] != "")]
    dfp = dfp[dfp["order_id"].notna() & (dfp["order_id"] != "")]
    dfp = dfp[dfp["merchant_id"].notna() & (dfp["merchant_id"] != "")]

    dfp["order_created_at"] = pd.to_datetime(dfp["order_created_at"], errors="coerce")
    dfp["order_total_amount"] = pd.to_numeric(dfp["order_total_amount"], errors="coerce")

    linhas_clean += len(dfp)

    dfp.to_parquet(os.path.join(DIRS["order_parts_clean"], f"part_{idx:04d}.parquet"), index=False)
    clean_count += 1

{
    "parts_clean": clean_count,
    "linhas_total_raw": linhas_raw,
    "linhas_total_clean": linhas_clean,
    "output_dir": DIRS["order_parts_clean"],
}


{'parts_clean': 37,
 'linhas_total_raw': 3670826,
 'linhas_total_clean': 3670826,
 'output_dir': 'data/processed/clean/order/parts_clean'}

In [9]:
import duckdb
import os

con = duckdb.connect(database=":memory:")

parts_clean_path = os.path.join(DIRS["order_parts_clean"], "part_*.parquet")
path_orders_dedup = os.path.join(DIRS["order_dedup"], "orders_dedup.parquet")

con.execute(f"""
CREATE OR REPLACE TABLE orders_clean AS
SELECT *
FROM read_parquet('{parts_clean_path}')
""")

con.execute("""
CREATE OR REPLACE TABLE orders_dedup AS
SELECT *
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY order_created_at ASC) AS rn
  FROM orders_clean
)
WHERE rn = 1
""")

con.execute(f"COPY orders_dedup TO '{path_orders_dedup}' (FORMAT PARQUET)")

counts = con.execute("""
SELECT
  (SELECT COUNT(*) FROM orders_clean) AS orders_clean_linhas,
  (SELECT COUNT(DISTINCT order_id) FROM orders_clean) AS orders_clean_order_id_distintos,
  (SELECT COUNT(*) FROM orders_dedup) AS orders_dedup_linhas,
  (SELECT COUNT(DISTINCT order_id) FROM orders_dedup) AS orders_dedup_order_id_distintos
""").fetchdf()

counts, path_orders_dedup


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

(   orders_clean_linhas  orders_clean_order_id_distintos  orders_dedup_linhas  \
 0              3670826                          2432974              2432974   
 
    orders_dedup_order_id_distintos  
 0                          2432974  ,
 'data/processed/clean/order/dedup/orders_dedup.parquet')

## 4. Construção da Base Analítica.

base de análise no nível de pedido com a marcação do experimento (target/control).


In [10]:
import duckdb
import os

con = duckdb.connect(database=":memory:")

path_ab_clean = os.path.join(DIRS["clean_ab_test_ref"], "ab_test_ref.parquet")
path_orders_enriched = os.path.join(DIRS["analytics_orders_enriched"], "orders_enriched.parquet")

con.execute(f"""
CREATE OR REPLACE TABLE ab AS
SELECT customer_id, is_target
FROM read_parquet('{path_ab_clean}')
""")

con.execute(f"""
CREATE OR REPLACE TABLE orders_dedup AS
SELECT *
FROM read_parquet('{path_orders_dedup}')
""")

con.execute("""
CREATE OR REPLACE TABLE orders_enriched AS
SELECT
  o.order_id,
  o.customer_id,
  o.order_created_at,
  o.order_total_amount,
  o.merchant_id,
  o.order_scheduled,
  o.origin_platform,
  o.delivery_address_city,
  o.delivery_address_state,
  o.delivery_address_country,
  a.is_target
FROM orders_dedup o
LEFT JOIN ab a
  ON o.customer_id = a.customer_id
""")

con.execute(f"COPY orders_enriched TO '{path_orders_enriched}' (FORMAT PARQUET)")

df_counts = con.execute("""
SELECT
  COUNT(*) AS pedidos_total,
  SUM(CASE WHEN is_target IS NOT NULL THEN 1 ELSE 0 END) AS pedidos_com_grupo,
  SUM(CASE WHEN is_target IS NULL THEN 1 ELSE 0 END) AS pedidos_sem_grupo,
  COUNT(DISTINCT customer_id) AS customers_total,
  COUNT(DISTINCT CASE WHEN is_target IS NOT NULL THEN customer_id END) AS customers_com_grupo
FROM orders_enriched
""").fetchdf()

df_counts, path_orders_enriched


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

(   pedidos_total  pedidos_com_grupo  pedidos_sem_grupo  customers_total  \
 0        2432974          2427415.0             5559.0           806467   
 
    customers_com_grupo  
 0               806466  ,
 'data/processed/analytics/orders_enriched/orders_enriched.parquet')

##Relatórios de Qualidade e Smoke Test (reports)
Realização de validação e integridade

In [11]:
import pandas as pd
import os

os.makedirs(DIRS["reports"], exist_ok=True)

coverage = df_counts.copy()
coverage.to_csv(os.path.join(DIRS["reports"], "coverage_orders_enriched.csv"), index=False)

quality = pd.DataFrame([{
    "tabela": "ab_test_ref",
    "linhas": int(df_ab.shape[0]),
    "customers_distintos": int(df_ab["customer_id"].nunique()),
    "is_target_valores": int(df_ab["is_target"].nunique()),
}, {
    "tabela": "consumers",
    "linhas": int(df_consumers.shape[0]),
    "customers_distintos": int(df_consumers["customer_id"].nunique()),
}, {
    "tabela": "restaurants",
    "linhas": int(df_restaurants.shape[0]),
    "merchants_distintos": int(df_restaurants["merchant_id"].nunique()),
}, {
    "tabela": "orders_clean",
    "linhas_total_clean": int(counts["orders_clean_linhas"].iloc[0]),
    "order_id_distintos_clean": int(counts["orders_clean_order_id_distintos"].iloc[0]),
}, {
    "tabela": "orders_dedup",
    "linhas_total_dedup": int(counts["orders_dedup_linhas"].iloc[0]),
    "order_id_distintos_dedup": int(counts["orders_dedup_order_id_distintos"].iloc[0]),
}])

quality.to_csv(os.path.join(DIRS["reports"], "quality_report.csv"), index=False)

{
    "coverage_csv": os.path.join(DIRS["reports"], "coverage_orders_enriched.csv"),
    "quality_csv": os.path.join(DIRS["reports"], "quality_report.csv"),
}


{'coverage_csv': 'reports/coverage_orders_enriched.csv',
 'quality_csv': 'reports/quality_report.csv'}

In [12]:
import os
import pandas as pd

checks = {
    "orders_enriched": path_orders_enriched,
    "coverage_orders_enriched": os.path.join(DIRS["reports"], "coverage_orders_enriched.csv"),
    "quality_report": os.path.join(DIRS["reports"], "quality_report.csv"),
}

print({k: os.path.exists(v) for k, v in checks.items()})

df_smoke = pd.read_parquet(path_orders_enriched, columns=["customer_id", "order_id", "order_created_at", "order_total_amount", "is_target"])

print("linhas:", len(df_smoke))
print("customers:", df_smoke["customer_id"].nunique())
print("order_ids:", df_smoke["order_id"].nunique())
print("periodo:", df_smoke["order_created_at"].min(), "->", df_smoke["order_created_at"].max())
print(df_smoke["is_target"].value_counts(dropna=False).head())


{'orders_enriched': True, 'coverage_orders_enriched': True, 'quality_report': True}
linhas: 2432974
customers: 806467
order_ids: 2432974
periodo: 2018-12-03 00:00:00+00:00 -> 2019-01-31 23:59:59+00:00
is_target
target     1416677
control    1010738
None          5559
Name: count, dtype: int64
