In [1]:
from pathlib import Path
import os
import stat

files = {
"prefect_olist_pipeline.py": r'''#!/usr/bin/env python3
"""
Minimal Prefect ETL for final project (meant to be run locally).
- Reads a CSV (local path or s3://)
- Enriches with exchangerate.host
- Writes results locally under pipeline_outputs/ (S3 optional)
"""
from pathlib import Path
import io
import json
from typing import Optional, Dict, Any
import pandas as pd
import requests
import boto3
from prefect import flow, task, get_run_logger

OUTPUT_DIR = Path("pipeline_outputs")
OUTPUT_DIR.mkdir(exist_ok=True)
RATES_CACHE_FILE = OUTPUT_DIR / "rates_cache.json"
DEFAULT_ORDERS_CSV = r"C:\Users\analy\iCloudDrive\Desktop\DACSS Materials and Job Hunt\DACSS 690A Data Engineering\archive (4)\olist_orders_dataset.csv"

def load_rates_cache() -> Dict[str, Optional[float]]:
    if RATES_CACHE_FILE.exists():
        try:
            return json.loads(RATES_CACHE_FILE.read_text())
        except Exception:
            return {}
    return {}

def save_rates_cache(cache: Dict[str, Optional[float]]):
    try:
        RATES_CACHE_FILE.write_text(json.dumps(cache))
    except Exception:
        pass

@task(retries=1, retry_delay_seconds=5)
def extract_csv(path: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Reading CSV: {path}")
    df = pd.read_csv(path)
    logger.info(f"Loaded rows={len(df)} cols={len(df.columns)}")
    return df

@task(retries=2, retry_delay_seconds=5)
def fetch_rate_for_date(date_str: str) -> Optional[float]:
    logger = get_run_logger()
    try:
        resp = requests.get(f"https://api.exchangerate.host/{date_str}", params={"base":"BRL","symbols":"USD"}, timeout=10)
        resp.raise_for_status()
        rate = resp.json().get("rates", {}).get("USD")
        logger.info(f"Fetched rate for {date_str}: {rate}")
        return float(rate) if rate is not None else None
    except Exception:
        logger.exception("Failed to fetch rate")
        return None

@task
def enrich_orders(df: pd.DataFrame) -> pd.DataFrame:
    logger = get_run_logger()
    if "order_purchase_timestamp" not in df.columns:
        logger.warning("No order_purchase_timestamp column; skipping enrichment")
        df["brl_to_usd_rate"] = None
        df["payment_usd"] = None
        return df
    df = df.copy()
    df["order_purchase_timestamp"] = pd.to_datetime(df["order_purchase_timestamp"], errors="coerce")
    df["date_only"] = df["order_purchase_timestamp"].dt.date.astype(str)
    cache = load_rates_cache()
    dates = df["date_only"].dropna().unique().tolist()
    for d in dates:
        if d not in cache:
            # call the task function directly; Prefect will run tasks when the flow executes
            cache[d] = fetch_rate_for_date(d)
    # Coerce any Prefect task-returned objects to simple floats where possible
    for k, v in list(cache.items()):
        if isinstance(v, float) or v is None:
            continue
        try:
            cache[k] = float(v)
        except Exception:
            cache[k] = None
    save_rates_cache(cache)
    df["brl_to_usd_rate"] = df["date_only"].map(cache)
    amt_col = "price" if "price" in df.columns else "payment_value" if "payment_value" in df.columns else None
    if amt_col:
        df["payment_usd"] = df[amt_col] * df["brl_to_usd_rate"]
    else:
        df["payment_usd"] = None
    df = df.drop(columns=["date_only"])
    missing = int(df["brl_to_usd_rate"].isna().sum()) if "brl_to_usd_rate" in df.columns else 0
    if missing > 0:
        logger.warning(f"{missing} rows missing conversion rate")
    return df

@task(retries=2, retry_delay_seconds=5)
def upload_df_to_s3(df: pd.DataFrame, bucket: str, key: str) -> str:
    logger = get_run_logger()
    logger.info(f"Uploading to s3://{bucket}/{key}")
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    s3 = boto3.client("s3")
    s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue().encode("utf-8"))
    url = f"s3://{bucket}/{key}"
    logger.info(f"Uploaded to {url}")
    return url

@flow(name="olist_csv_api_pipeline")
def data_processing_flow(csv_orders: str = DEFAULT_ORDERS_CSV, s3_bucket: str = "") -> dict:
    logger = get_run_logger()
    logger.info("Starting minimal ETL")
    orders = extract_csv(csv_orders)
    enriched = enrich_orders(orders)
    try:
        enriched["order_purchase_timestamp"] = pd.to_datetime(enriched["order_purchase_timestamp"], errors="coerce")
        enriched["month"] = enriched["order_purchase_timestamp"].dt.to_period("M")
        monthly = enriched.groupby("month")["payment_usd"].sum().reset_index().rename(columns={"payment_usd":"monthly_sales_usd"})
    except Exception:
        monthly = pd.DataFrame()
    result = {"monthly": None, "enriched": None}
    if s3_bucket:
        result["monthly"] = upload_df_to_s3(monthly, s3_bucket, "processed/monthly_sales_usd.csv")
        result["enriched"] = upload_df_to_s3(enriched, s3_bucket, "processed/enriched_orders.csv")
    else:
        monthly.to_csv(OUTPUT_DIR/"monthly_sales_usd.csv", index=False)
        enriched.to_csv(OUTPUT_DIR/"enriched_orders.csv", index=False)
        result["monthly"] = str(OUTPUT_DIR/"monthly_sales_usd.csv")
        result["enriched"] = str(OUTPUT_DIR/"enriched_orders.csv")
    logger.info("ETL done")
    return result

if __name__ == "__main__":
    data_processing_flow()
''',

"generate_data.py": r'''"""
generate_data.py
Creates data/orders_large.csv by repeating a tiny sample until the file size >= 100MB.
Run: python generate_data.py
"""
import pandas as pd
from pathlib import Path

OUT = Path("data")
OUT.mkdir(exist_ok=True)
sample_path = OUT / "orders_sample.csv"
large_path = OUT / "orders_large.csv"

if not sample_path.exists():
    df = pd.DataFrame({
        "order_id": [f"o{i}" for i in range(1, 11)],
        "order_purchase_timestamp": ["2020-01-01 10:00:00"]*10,
        "order_approved_at": ["2020-01-01 10:05:00"]*10,
        "order_delivered_carrier_date": ["2020-01-05 10:00:00"]*10,
        "price": [10.0, 20.0, 30.0, 5.0, 7.5, 12.0, 6.0, 8.0, 15.0, 3.0]
    })
    df.to_csv(sample_path, index=False)
else:
    df = pd.read_csv(sample_path)

target_bytes = 100 * 1024 * 1024
chunks = []
current_bytes = 0
rep = 0
while current_bytes < target_bytes:
    chunks.append(df.assign(rep=rep))
    rep += 1
    current_bytes = sum(len(chunk.to_csv(index=False).encode("utf-8")) for chunk in chunks)

big_df = pd.concat(chunks, ignore_index=True)
big_df.to_csv(large_path, index=False)
print(f"Wrote {large_path} size: {large_path.stat().st_size / (1024*1024):.2f} MB")
''',

"upload_to_s3.sh": r'''#!/usr/bin/env bash
# usage: ./upload_to_s3.sh <bucket-name>
BUCKET="$1"
if [ -z "$BUCKET" ]; then
  echo "Usage: $0 <bucket-name>"
  exit 1
fi
aws s3 cp data/orders_large.csv s3://$BUCKET/raw/orders_large.csv
echo "Uploaded to s3://$BUCKET/raw/orders_large.csv"
''',

"deployment_example.yaml": r'''apiVersion: prefect.io/v2
kind: Deployment
metadata:
  name: olist-test
  flow_name: olist_csv_api_pipeline
spec:
  flow_location: "./prefect_olist_pipeline.py"
  entrypoint: "data_processing_flow:data_processing_flow"
  schedule:
    cron: "*/5 * * * *"   # test schedule every 5 minutes; change to "0 2 * * *" for daily
  parameters:
    csv_orders: "s3://<your-bucket>/raw/orders_large.csv"
    s3_bucket: "<your-bucket>"
  infra:
    type: "process"
''',

"README.md": r'''#E-commerce ETL — Local Prefect Run

This repo contains a minimal Prefect ETL pipeline and supporting files for the course final project.

What I ran (local, reproducible)
- I created the pipeline files (prefect_olist_pipeline.py and helpers) using the project setup cell.
- I installed the required packages in the notebook environment and restarted the kernel.
- I executed a quick sample run and then a full run of the Prefect flow (data_processing_flow) locally.
- The pipeline read the Olist orders CSV (local path), attempted to enrich rows with historical BRL→USD exchange rates, and saved outputs to `pipeline_outputs/`.

Important reproducibility note
- The exchange-rate API returned nulls for the enrichment during my run (see `pipeline_outputs/rates_cache.json`), so for reproducibility I produced monthly totals using a fixed conversion rate (1 BRL = 0.25 USD). Files to inspect:
  - `pipeline_outputs/enriched_orders.csv` — enriched dataset from the flow (brl_to_usd_rate and payment_usd columns may be null in this run)
  - `pipeline_outputs/monthly_sales_brl_minimal.csv` — monthly totals in BRL
  - `pipeline_outputs/monthly_sales_usd_fixedrate_minimal.csv` — monthly USD totals computed using the fixed rate (0.25 BRL→USD)

Notes and how to reproduce locally
1. Place the Olist CSV in a local path and confirm its location.
2. Install requirements:
   pip install -r requirements.txt
3. Run the pipeline from a notebook (sample first, then full run) or:
   python prefect_olist_pipeline.py
4. Check outputs in `pipeline_outputs/`.

Data policy
- The large CSV is intentionally excluded from the repository (see `.gitignore`) to avoid committing big files.
''',

"requirements.txt": r'''prefect>=2.10.0
pandas
requests
boto3
s3fs
''',

".gitignore": r'''__pycache__/
pipeline_outputs/
data/orders_large.csv
venv/
.env
.ipynb_checkpoints/
'''
}

# write files
for fname, content in files.items():
    Path(fname).write_text(content)
    print("Wrote", fname)

# make upload_to_s3.sh executable if present
sh = Path("upload_to_s3.sh")
if sh.exists():
    mode = sh.stat().st_mode
    sh.chmod(mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
    print("Made upload_to_s3.sh executable")

print("\nAll files created in", Path.cwd())

Wrote prefect_olist_pipeline.py
Wrote generate_data.py
Wrote upload_to_s3.sh
Wrote deployment_example.yaml
Wrote README.md
Wrote requirements.txt
Wrote .gitignore
Made upload_to_s3.sh executable

All files created in C:\Users\analy\iCloudDrive\Desktop\DACSS Materials and Job Hunt\DACSS 690A Data Engineering
