In [1]:
"""
Topic: How to Structure transform() vs persist() in a Data Pipeline
===================================================================
This file demonstrates how to separate transformation logic
from persistence (storage) logic in a clean, modular pipeline.
Each section below matches the 10-slide training format.
"""

# -------------------------------------------------------------------
# Slide 1 ‚Äì Overview
# -------------------------------------------------------------------
"""
Overview:
In ETL/ELT pipelines, always separate data transformation logic from
persistence (storage). This improves reusability, testing, and debugging.

- transform() ‚Üí logic layer (clean, enrich, compute)
- persist()   ‚Üí I/O layer (save to storage or DB)
"""
print("‚úÖ transform() handles logic | persist() handles storage")

# -------------------------------------------------------------------
# Slide 2 ‚Äì Why Separate Logic and Storage?
# -------------------------------------------------------------------
"""
Mixing transformations and persistence makes testing difficult.
Keep them separate for better modularity and environment flexibility.

Benefits:
- Test logic without S3 or DB access
- Switch between local/dev/prod easily
- Easier debugging and error tracing
"""

# -------------------------------------------------------------------
# Slide 3 ‚Äì Role of transform()
# -------------------------------------------------------------------
import pandas as pd
from datetime import datetime

def transform(df):
    """
    Cleans and enriches data with business rules.
    Input: raw DataFrame
    Output: transformed DataFrame
    """
    df = df.dropna(subset=["order_id"])
    df["price_usd"] = df["price"] * 0.012
    df["processed_ts"] = datetime.now()
    return df

# Example usage of transform()
raw_df = pd.DataFrame([
    {"order_id": 101, "price": 1000},
    {"order_id": None, "price": 2000}
])
print("\n--- Running transform() ---")
print(transform(raw_df))

# -------------------------------------------------------------------
# Slide 4 ‚Äì Role of persist()
# -------------------------------------------------------------------
def persist(df, output_path):
    """
    Saves the DataFrame to the specified storage.
    Handles I/O and logging; no transformation here.
    """
    try:
        df.to_parquet(output_path, index=False)
        print(f"‚úÖ Successfully saved to {output_path}")
    except Exception as e:
        print(f"‚ùå Persist failed: {e}")

# -------------------------------------------------------------------
# Slide 5 ‚Äì Pipeline Integration
# -------------------------------------------------------------------
def pipeline_run():
    df_raw = pd.DataFrame([
        {"order_id": 1, "price": 2500},
        {"order_id": 2, "price": 3800}
    ])
    df_trans = transform(df_raw)
    persist(df_trans, "output/orders_transformed.parquet")

# Uncomment to simulate full run
# pipeline_run()

# -------------------------------------------------------------------
# Slide 6 ‚Äì Testing transform()
# -------------------------------------------------------------------
def test_transform_adds_column():
    df = pd.DataFrame([{"order_id": 1, "price": 100}])
    out = transform(df)
    assert "price_usd" in out.columns
    print("‚úÖ test_transform_adds_column passed")

# Uncomment to test
# test_transform_adds_column()

# -------------------------------------------------------------------
# Slide 7 ‚Äì Testing persist() with Mock
# -------------------------------------------------------------------
from unittest.mock import patch

@patch("pandas.DataFrame.to_parquet")
def test_persist(mock_save):
    persist(pd.DataFrame(), "fake/path")
    mock_save.assert_called_once()
    print("‚úÖ test_persist passed (mocked)")

# Uncomment to run mock test
# test_persist()

# -------------------------------------------------------------------
# Slide 8 ‚Äì Config & Environment Awareness
# -------------------------------------------------------------------
import os
ENV = os.getenv("ENV", "dev")

if ENV == "dev":
    OUTPUT_PATH = "./output/dev/orders.parquet"
else:
    OUTPUT_PATH = "s3://prod/orders.parquet"

print(f"Environment = {ENV}, Output Path = {OUTPUT_PATH}")

# -------------------------------------------------------------------
# Slide 9 ‚Äì Real-World Example
# -------------------------------------------------------------------
"""
Use Case: Supply Chain Data Flow
1. Extract shipment JSON via API
2. transform() adds SLA and holiday logic
3. persist() saves results to S3 for BI dashboards

Example Flow:
df = transform(shipments_df)
persist(df, 's3://logistics/processed/shipments.parquet')
"""

# -------------------------------------------------------------------
# Slide 10 ‚Äì Best Practices & Summary
# -------------------------------------------------------------------
"""
Summary:
| Function     | Purpose             | Testable | Env-Specific |
|---------------|--------------------|-----------|---------------|
| transform()   | Data logic          | ‚úÖ Yes    | ‚ùå No         |
| persist()     | Storage/I/O         | ‚ö†Ô∏è Partial| ‚úÖ Yes        |

Best Practices:
- Keep transform() pure, no file I/O
- Centralize all write logic in persist()
- Log each step
- Unit test transform(), mock persist()
"""

print("\nüìò Summary:")
print("transform() = logic only | persist() = write only")


‚úÖ transform() handles logic | persist() handles storage

--- Running transform() ---
   order_id  price  price_usd               processed_ts
0     101.0   1000       12.0 2025-11-10 14:32:47.221971
Environment = dev, Output Path = ./output/dev/orders.parquet

üìò Summary:
transform() = logic only | persist() = write only


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["price_usd"] = df["price"] * 0.012
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["processed_ts"] = datetime.now()
