## Logging Setup
This cell initializes the logging system with INFO level and dual handlers (console + file).  
It also generates a timestamped log file under /logs/ to ensure reproducibility.


In [0]:
import logging
from datetime import datetime

# Crear nombre de archivo con timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
log_filename = f"logs/run_{timestamp}.log"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[
        logging.StreamHandler(),           # Salida en consola
        logging.FileHandler(log_filename)  # Archivo en logs/
    ]
)

logging.info("=== ETL Run Started ===")
logging.info(f"Log file created: {log_filename}")



### Step C1 — Fix Random Seeds

To ensure reproducibility, we set deterministic values for Python, NumPy, and OS-level hashing.  
This guarantees that any randomness in the ETL pipeline produces the same results each time.


In [0]:
import os, random, numpy as np

os.environ["PYTHONHASHSEED"] = "0"
random.seed(0)
np.random.seed(0)

logging.info("Random seeds fixed for reproducibility.")


### Step C2 — Capture Environment

We record the exact package versions used during this run to support reproducibility.  
The output is saved into `requirements.txt` for future reruns.


In [0]:
# Export installed packages to requirements.txt
!pip freeze > requirements.txt

logging.info("requirements.txt generated.")


### Step C3 — Compute SHA-256 Hashes

To guarantee data integrity, we hash each input CSV file and store the results  
in `data_hashes.json`. This allows us to detect any changes to the raw data over time.


In [0]:
import hashlib
import json
from pathlib import Path

data_folder = Path("data")
csv_files = ["menu_items.csv", "order_details.csv"]
hashes = {}

for file_name in csv_files:
    file_path = data_folder / file_name
    with open(file_path, "rb") as f:
        file_hash = hashlib.sha256(f.read()).hexdigest()
        hashes[file_name] = file_hash
        logging.info(f"Hashed {file_name}: {file_hash}")

# Save to JSON
with open("data_hashes.json", "w") as out:
    json.dump(hashes, out, indent=4)

logging.info("Data hashes saved to data_hashes.json")


### Step D1 — Load Input CSV Files

We load both input datasets (`menu_items.csv` and `order_details.csv`) from the `/data` folder  
into Pandas DataFrames. Logging is included to confirm successful loading.


In [0]:
import pandas as pd

menu_df = pd.read_csv("data/menu_items.csv")
orders_df = pd.read_csv("data/order_details.csv")

logging.info(f"Menu dataframe loaded with {len(menu_df)} rows")
logging.info(f"Order details dataframe loaded with {len(orders_df)} rows")

menu_df.head(), orders_df.head()


### Step D2 — Clean and Standardize Data

We fix common data quality issues such as:

- Converting date strings into proper datetime objects  
- Stripping unnecessary whitespace  
- Ensuring numeric columns are correctly typed  
- Removing duplicates if present  


In [0]:
orders_df.columns

In [0]:
# Strip whitespace from text columns (modern approach)
menu_df = menu_df.apply(lambda col: col.str.strip() if col.dtype == "object" else col)
orders_df = orders_df.apply(lambda col: col.str.strip() if col.dtype == "object" else col)

# Convert date and time columns to datetime
orders_df["order_date"] = pd.to_datetime(orders_df["order_date"], errors="coerce")
orders_df["order_time"] = pd.to_datetime(orders_df["order_time"], errors="coerce")

# Convert numeric types
# Si tienes una columna de quantity (si no existe, omítelo)
if "quantity" in orders_df.columns:
    orders_df["quantity"] = pd.to_numeric(orders_df["quantity"], errors="coerce")

logging.info("Data cleaning completed.")


### Step D3 — Join Datasets

We join the two DataFrames on:
`menu_items.menu_item_id = order_details.item_id`

This creates a consolidated table containing order-level details with item names, categories, and prices.


In [0]:
merged_df = orders_df.merge(
    menu_df,
    left_on="item_id",
    right_on="menu_item_id",
    how="left"
)

logging.info(f"Merged dataframe created with {len(merged_df)} rows")
merged_df.head()


### Step D4 — Create Final Tidy Table

We select only relevant columns for analysis:

- order_id  
- order_timestamp  
- item_name  
- category  
- price  
- quantity  


In [0]:
final_df = merged_df[[
    "order_id",
    "order_date",
    "order_time",
    "item_name",
    "category",
    "price"
]]

logging.info("Final tidy table created.")
final_df.head()


### Metric 1 — Top 5 Items by Quantity Ordered


In [0]:
# Add quantity column with 1 for each row
final_df['quantity'] = 1

# Top 5 most ordered items
top_items = final_df.groupby("item_name")["quantity"].sum().sort_values(ascending=False).head(5)
top_items



### Metric 2 — Revenue by Category


In [0]:
final_df["revenue"] = final_df["price"] * final_df["quantity"]
revenue_by_category = final_df.groupby("category")["revenue"].sum()
revenue_by_category


### Metric 3 — Busiest Hour of Day


In [0]:
# Create 'hour' column from order_time
final_df['hour'] = final_df['order_time'].dt.hour
busiest_hour = final_df['hour'].value_counts().idxmax()
busiest_hour_formatted = f"{busiest_hour}:00 - {busiest_hour}:59"
print(f"The busiest hour of the day is: {busiest_hour_formatted}")


### Step D6 — Save Metrics Output

The computed analytics are saved to a timestamped CSV file to allow versioning  
and reproducibility of results.


In [0]:
from datetime import datetime
import os

timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_path = f"metrics_{timestamp}.csv"

final_df.to_csv(output_path, index=False)

logging.info(f"Metrics saved to {output_path}")
output_path


### Step D7 — Quality Tests

We include simple assert-based tests to ensure:

- Expected columns exist  
- Dataframes are not empty  
- Metrics return valid results  


In [0]:
# Test: final_df should not be empty
assert len(final_df) > 0, "ERROR: final_df is empty!"

# Test: required columns exist (usando order_date y order_time en lugar de order_timestamp)
required_cols = ["order_id", "order_date", "order_time", "item_name", "category", "price", "quantity"]
for col in required_cols:
    assert col in final_df.columns, f"Missing column: {col}"

# Test: top_items should return 5 items
assert len(top_items) == 5, "Top 5 items metric is incorrect."

logging.info("All tests passed successfully.")

