# Online Retail – Data Engineering Pipeline

This notebook focuses on transforming raw transactional data into
clean, structured, and analytics-ready tables.

The goal is to simulate a simple ETL pipeline by:
- Ingesting raw data
- Cleaning and validating records
- Creating derived features
- Producing aggregated output tables for downstream usage


In [None]:
from pathlib import Path

src = Path("online_retail_eda.xlsx")
dst = Path("data") / "online_retail_eda.xlsx"

dst.parent.mkdir(exist_ok=True)

if src.exists() and not dst.exists():
    src.replace(dst)  # dosyayı taşır
    print("✅ Dosya data/ klasörüne taşındı:", dst)
else:
    print("ℹ️ src var mı?", src.exists(), "| dst var mı?", dst.exists())



In [None]:
from pathlib import Path
list(Path("data").iterdir())


In [None]:
from pathlib import Path
import pandas as pd
import numpy as np

PROJECT_DIR = Path(".")
DATA_DIR = PROJECT_DIR / "data"
OUT_DIR = PROJECT_DIR / "output"

DATA_DIR.mkdir(exist_ok=True)
OUT_DIR.mkdir(exist_ok=True)

DATA_PATH = DATA_DIR / "online_retail_eda.xlsx"

if not DATA_PATH.exists():
    raise FileNotFoundError(
        f"{DATA_PATH} bulunamadı.\n"
        "→ Veriyi data/ klasörüne koy."
    )

df = pd.read_excel(DATA_PATH)
df.head()



In [4]:
import pandas as pd
import time

t0 = time.time()
print("starting read xlsx...")
df_tmp = pd.read_excel("data/online_retail_eda.xlsx", engine="openpyxl")

print("read done. saving csv...")
df_tmp.to_csv("data/online_retail_eda.csv", index=False)

print("all done. seconds:", round(time.time() - t0, 2))
print("csv saved -> data/online_retail_eda.csv | shape:", df_tmp.shape)


starting read xlsx...
read done. saving csv...
all done. seconds: 68.87
csv saved -> data/online_retail_eda.csv | shape: (525461, 8)


In [6]:
df.to_csv("data/online_retail_raw.csv", index=False)
print("✅ raw csv saved")


✅ raw csv saved


In [7]:
df = pd.read_csv("data/online_retail_raw.csv")


In [8]:
import re

def clean_columns(cols):
    out = []
    for c in cols:
        c = c.strip().lower()
        c = re.sub(r"[^a-z0-9]+", "_", c)
        c = re.sub(r"_+", "_", c).strip("_")
        out.append(c)
    return out

df.columns = clean_columns(df.columns)


df["invoicedate"] = pd.to_datetime(df["invoicedate"], errors="coerce")
df["customer_id"] = pd.to_numeric(df["customer_id"], errors="coerce")

print(df.dtypes)
df.head(3)


invoice                object
stockcode              object
description            object
quantity                int64
invoicedate    datetime64[ns]
price                 float64
customer_id           float64
country                object
dtype: object


Unnamed: 0,invoice,stockcode,description,quantity,invoicedate,price,customer_id,country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom


In [9]:
rows_before = len(df)


df_clean = df.dropna(subset=["customer_id"]).copy()


df_clean["is_return"] = df_clean["quantity"] < 0


df_clean["invalid_price"] = df_clean["price"] <= 0

rows_after = len(df_clean)

print("Rows before cleaning:", rows_before)
print("Rows after cleaning :", rows_after)
print("Dropped rows        :", rows_before - rows_after)

print("\nReturn rows:", df_clean["is_return"].sum())
print("Invalid price rows:", df_clean["invalid_price"].sum())

df_clean.head(3)


Rows before cleaning: 525461
Rows after cleaning : 417534
Dropped rows        : 107927

Return rows: 9839
Invalid price rows: 31


Unnamed: 0,invoice,stockcode,description,quantity,invoicedate,price,customer_id,country,is_return,invalid_price
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom,False,False
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom,False,False
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom,False,False


In [43]:
dim_customer = (
    df_clean[["customer_id", "country"]]
    .drop_duplicates()
    .sort_values("customer_id")
    .reset_index(drop=True)
)

dim_customer["customer_key"] = dim_customer.index + 1

print("dim_customer shape:", dim_customer.shape)



dim_product = (
    df_clean[["stockcode", "description"]]
    .drop_duplicates()
    .sort_values("stockcode")
    .reset_index(drop=True)
)

dim_product["product_key"] = dim_product.index + 1

print("dim_product shape:", dim_product.shape)



df_clean["invoice_day"] = df_clean["invoicedate"].dt.date

dim_date = (
    df_clean[["invoice_day"]]
    .drop_duplicates()
    .sort_values("invoice_day")
    .reset_index(drop=True)
)

dim_date["date_key"] = (
    pd.to_datetime(dim_date["invoice_day"])
    .dt.strftime("%Y%m%d")
    .astype(int)
)

dim_date["year"] = pd.to_datetime(dim_date["invoice_day"]).dt.year
dim_date["month"] = pd.to_datetime(dim_date["invoice_day"]).dt.month
dim_date["day"] = pd.to_datetime(dim_date["invoice_day"]).dt.day
dim_date["day_of_week"] = pd.to_datetime(dim_date["invoice_day"]).dt.dayofweek
dim_date["quarter"] = pd.to_datetime(dim_date["invoice_day"]).dt.quarter

print("dim_date shape:", dim_date.shape)


dim_customer shape: (4388, 3)
dim_product shape: (4461, 3)
dim_date shape: (307, 7)


In [18]:



fact_sales = df_clean.merge(
    dim_customer[["customer_id", "customer_key"]],
    on="customer_id",
    how="left"
)


fact_sales = fact_sales.merge(
    dim_product[["stockcode", "product_key"]],
    on="stockcode",
    how="left"
)


fact_sales = fact_sales.merge(
    dim_date[["invoice_day", "date_key"]],
    on="invoice_day",
    how="left"
)


fact_sales = fact_sales[
    [
        "invoice",
        "customer_key",
        "product_key",
        "date_key",
        "quantity",
        "price",
        "is_return",
        "invalid_price"
    ]
].copy()


fact_sales["line_id"] = range(1, len(fact_sales) + 1)

print("fact_sales shape:", fact_sales.shape)
fact_sales.head(3)


fact_sales shape: (519501, 9)


Unnamed: 0,invoice,customer_key,product_key,date_key,quantity,price,is_return,invalid_price,line_id
0,489434,513,3870,20091201,12,6.95,False,False,1
1,489434,513,3148,20091201,12,6.75,False,False,2
2,489434,513,3151,20091201,12,6.75,False,False,3


In [19]:
print("Null customer_key:", fact_sales["customer_key"].isna().sum())
print("Null product_key :", fact_sales["product_key"].isna().sum())
print("Null date_key    :", fact_sales["date_key"].isna().sum())


Null customer_key: 0
Null product_key : 0
Null date_key    : 0


In [20]:
print("df_clean rows:", len(df_clean))
print("fact_sales rows:", len(fact_sales))


df_clean rows: 417534
fact_sales rows: 519501


In [22]:
print("df_clean rows:", len(df_clean))

print("dim_customer duplicated customer_id:", dim_customer["customer_id"].duplicated().sum())
print("dim_product duplicated stockcode:", dim_product["stockcode"].duplicated().sum())
print("dim_date duplicated invoice_day:", dim_date["invoice_day"].duplicated().sum())


df_clean rows: 417534
dim_customer duplicated customer_id: 5
dim_product duplicated stockcode: 456
dim_date duplicated invoice_day: 0


In [23]:

dim_customer = (
    df_clean.groupby("customer_id", as_index=False)["country"]
    .agg(lambda s: s.value_counts().index[0])
    .sort_values("customer_id")
    .reset_index(drop=True)
)

dim_customer["customer_key"] = range(1, len(dim_customer) + 1)

print("dim_customer shape:", dim_customer.shape)
print("duplicated customer_id:", dim_customer["customer_id"].duplicated().sum())
dim_customer.head(3)


dim_customer shape: (4383, 3)
duplicated customer_id: 0


Unnamed: 0,customer_id,country,customer_key
0,12346.0,United Kingdom,1
1,12347.0,Iceland,2
2,12348.0,Finland,3


In [24]:

df_clean["description"] = df_clean["description"].astype(str).str.strip()

dim_product = (
    df_clean.groupby("stockcode", as_index=False)["description"]
    .agg(lambda s: s.value_counts().index[0])
    .sort_values("stockcode")
    .reset_index(drop=True)
)

dim_product["product_key"] = range(1, len(dim_product) + 1)

print("dim_product shape:", dim_product.shape)
print("duplicated stockcode:", dim_product["stockcode"].duplicated().sum())
dim_product.head(3)


dim_product shape: (4031, 3)
duplicated stockcode: 0


Unnamed: 0,stockcode,description,product_key
0,10002,INFLATABLE POLITICAL GLOBE,1
1,10080,GROOVY CACTUS INFLATABLE,2
2,10109,BENDY COLOUR PENCILS,3


In [25]:
fact_sales = df_clean.merge(
    dim_customer[["customer_id", "customer_key"]],
    on="customer_id",
    how="left",
    validate="many_to_one"
)

fact_sales = fact_sales.merge(
    dim_product[["stockcode", "product_key"]],
    on="stockcode",
    how="left",
    validate="many_to_one"
)

fact_sales = fact_sales.merge(
    dim_date[["invoice_day", "date_key"]],
    on="invoice_day",
    how="left",
    validate="many_to_one"
)

fact_sales = fact_sales[
    ["invoice","customer_key","product_key","date_key","quantity","price","is_return","invalid_price"]
].copy()

fact_sales["line_id"] = range(1, len(fact_sales) + 1)

print("df_clean rows:", len(df_clean))
print("fact_sales rows:", len(fact_sales))
print("Null customer_key:", fact_sales["customer_key"].isna().sum())
print("Null product_key :", fact_sales["product_key"].isna().sum())
print("Null date_key    :", fact_sales["date_key"].isna().sum())


df_clean rows: 417534
fact_sales rows: 417534
Null customer_key: 0
Null product_key : 0
Null date_key    : 0


In [42]:
# Save outputs

dim_customer.to_parquet("output/dim_customer.parquet", index=False)
dim_product.to_parquet("output/dim_product.parquet", index=False)
dim_date.to_parquet("output/dim_date.parquet", index=False)
fact_sales.to_parquet("output/fact_sales.parquet", index=False)

print(" All tables exported as Parquet.")


 All tables exported as Parquet.


In [27]:
dq_report = {
    "rows_raw": len(df),
    "rows_clean": len(df_clean),
    "rows_fact": len(fact_sales),
    "unique_customers": dim_customer.shape[0],
    "unique_products": dim_product.shape[0],
    "unique_dates": dim_date.shape[0],
    "return_rows": df_clean["is_return"].sum(),
    "invalid_price_rows": df_clean["invalid_price"].sum(),
}

dq_report


{'rows_raw': 525461,
 'rows_clean': 417534,
 'rows_fact': 417534,
 'unique_customers': 4383,
 'unique_products': 4031,
 'unique_dates': 307,
 'return_rows': 9839,
 'invalid_price_rows': 31}

In [28]:
dq_report = {
    "rows_raw": len(df),
    "rows_clean": len(df_clean),
    "rows_fact": len(fact_sales),
    "dropped_missing_customer_id": len(df) - len(df_clean),
    "return_rows": int(df_clean["is_return"].sum()),
    "invalid_price_rows": int(df_clean["invalid_price"].sum()),
    "fk_null_customer_key": int(fact_sales["customer_key"].isna().sum()),
    "fk_null_product_key": int(fact_sales["product_key"].isna().sum()),
    "fk_null_date_key": int(fact_sales["date_key"].isna().sum()),
    "unique_customers": int(dim_customer.shape[0]),
    "unique_products": int(dim_product.shape[0]),
    "unique_dates": int(dim_date.shape[0]),
}

dq_report


{'rows_raw': 525461,
 'rows_clean': 417534,
 'rows_fact': 417534,
 'dropped_missing_customer_id': 107927,
 'return_rows': 9839,
 'invalid_price_rows': 31,
 'fk_null_customer_key': 0,
 'fk_null_product_key': 0,
 'fk_null_date_key': 0,
 'unique_customers': 4383,
 'unique_products': 4031,
 'unique_dates': 307}

In [41]:
from pathlib import Path

BASE = Path("warehouse")
BRONZE = BASE / "bronze"
SILVER = BASE / "silver"
GOLD = BASE / "gold"

for p in [BRONZE, SILVER, GOLD]:
    p.mkdir(parents=True, exist_ok=True)

print(" bronze / silver / gold folders created")


 bronze / silver / gold folders created


In [40]:
# Bronze
df.to_csv(BRONZE / "online_retail_raw.csv", index=False)

# Silver
df_clean.to_parquet(SILVER / "online_retail_clean.parquet", index=False)

# Gold
dim_customer.to_parquet(GOLD / "dim_customer.parquet", index=False)
dim_product.to_parquet(GOLD / "dim_product.parquet", index=False)
dim_date.to_parquet(GOLD / "dim_date.parquet", index=False)
fact_sales.to_parquet(GOLD / "fact_sales.parquet", index=False)

print(" Data exported to bronze / silver / gold layers")


 Data exported to bronze / silver / gold layers


In [39]:
from pathlib import Path
import pandas as pd

STATE = Path("warehouse/state_last_invoicedate.txt")

current_max = df_clean["invoicedate"].max()

if STATE.exists():
    last_processed = pd.to_datetime(STATE.read_text().strip())
else:
    last_processed = pd.Timestamp.min

incremental_rows = df_clean[df_clean["invoicedate"] > last_processed]

print("last_processed:", last_processed)
print("current_max   :", current_max)
print("new_rows      :", len(incremental_rows))

STATE.write_text(str(current_max))
print(" state updated")


last_processed: 2010-12-09 20:01:00
current_max   : 2010-12-09 20:01:00
new_rows      : 0
 state updated
