In [1]:
from io import StringIO
import json
from typing import Any, Optional, Iterable, List

import boto3
import pandas as pd
from botocore.exceptions import ClientError


# =========================
# Configuration (your paths)
# =========================
BUCKET = "cubixchicagodata"

# Incoming RAW JSON (written by the Extract lambda)
RAW_TAXI_PREFIX    = "rawdata/to_processed/taxi_data/"
RAW_WEATHER_PREFIX = "rawdata/to_processed/weather_data/"

# Where we move processed RAW JSON (to keep the inbox clean)
PROCESSED_TAXI_RAW_PREFIX    = "rawdata/processede/taxi_data/"
PROCESSED_WEATHER_RAW_PREFIX = "rawdata/processede/weather_data/"

# Final, transformed CSVs
TX_TRANSFORMED_PREFIX = "transformed_data/taxi_trips/"
WX_TRANSFORMED_PREFIX = "transformed_data/weather/"

# Master tables (slowly growing dimensions)
PAYMENT_MASTER_PREFIX = "transformed_data/payment_type/"
COMPANY_MASTER_PREFIX = "transformed_data/company/"
PAYMENT_MASTER_FILE   = "payment_type_master.csv"
COMPANY_MASTER_FILE   = "company_master.csv"


# =========================
# S3 helpers
# =========================

def read_json_from_s3(bucket: str, key: str) -> Any:
    """
    Read a JSON object from S3 and return it as a Python object (dict or list).
    """
    s3 = boto3.client("s3")
    resp = s3.get_object(Bucket=bucket, Key=key)
    return json.loads(resp["Body"].read())


def read_csv_from_s3(bucket: str, prefix: str, filename: str) -> pd.DataFrame:
    """
    Read a CSV from S3 given 'prefix' + 'filename'.
    Raises ClientError if the object does not exist.
    """
    s3 = boto3.client("s3")
    key = f"{prefix}{filename}"
    obj = s3.get_object(Bucket=bucket, Key=key)
    text = obj["Body"].read().decode("utf-8")
    return pd.read_csv(StringIO(text))


def upload_dataframe_to_s3(bucket: str, df: pd.DataFrame, key: str) -> None:
    """
    Upload a pandas DataFrame to S3 as CSV to the exact 'key'.
    """
    s3 = boto3.client("s3")
    buf = StringIO()
    df.to_csv(buf, index=False)
    s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue())


def upload_master_data_to_s3(bucket: str, folder: str, file_type: str, df: pd.DataFrame) -> None:
    """
    Overwrite the current master CSV and archive the previous version:
    saved as transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv
    """
    s3 = boto3.client("s3")
    master_key = f"{folder}{file_type}_master.csv"
    prev_key   = f"transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"

    # Try to copy the current master to "previous" (if it exists).
    try:
        s3.copy_object(Bucket=bucket, CopySource={"Bucket": bucket, "Key": master_key}, Key=prev_key)
    except ClientError as e:
        # If there is no current master yet, ignore. Re-raise anything else.
        if e.response.get("Error", {}).get("Code") not in ("NoSuchKey", "404"):
            raise

    # Write the new master
    upload_dataframe_to_s3(bucket=bucket, df=df, key=master_key)


def list_json_keys(bucket: str, prefix: str) -> Iterable[str]:
    """
    Paginate through S3 and yield all object keys under 'prefix' that end with .json.
    """
    s3 = boto3.client("s3")
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for item in page.get("Contents", []):
            key = item["Key"]
            if key.lower().endswith(".json") and not key.endswith("/"):
                yield key


def _extract_date_from_filename(filename: str) -> Optional[str]:
    """
    Try to extract YYYY-MM-DD from a filename shaped like '..._YYYY-MM-DD.json'.
    Returns None if it can't find one.
    """
    stem = filename.rsplit("/", 1)[-1].split(".")[0]
    parts = stem.split("_")
    return parts[-1] if parts else None


def move_raw_and_write_transformed(
    df: pd.DataFrame,
    datetime_col: str,
    bucket: str,
    file_type: str,               # "taxi" or "weather"
    raw_filename: str,            # just the filename, not full key
    source_prefix: str,           # e.g. RAW_TAXI_PREFIX
    target_raw_prefix: str,       # e.g. PROCESSED_TAXI_RAW_PREFIX
    transformed_prefix: str       # e.g. TX_TRANSFORMED_PREFIX
) -> Optional[str]:
    """
    1) Write the transformed DataFrame to S3 under transformed_prefix with a date in the filename.
       The date is derived from the first non-null value in 'datetime_col' OR
       falls back to the date token found in 'raw_filename'.
    2) Move the RAW JSON from source_prefix to target_raw_prefix (copy + delete).

    Returns the S3 key of the transformed CSV if one was written, otherwise None.
    """
    # Determine date token for the output filename
    date_str = None
    if isinstance(df, pd.DataFrame) and not df.empty and datetime_col in df.columns:
        first_valid = pd.to_datetime(df[datetime_col], errors="coerce").dropna()
        if not first_valid.empty:
            date_str = first_valid.iloc[0].strftime("%Y-%m-%d")
    if not date_str:
        date_str = _extract_date_from_filename(raw_filename) or "unknown_date"

    transformed_key = f"{transformed_prefix}{file_type}_{date_str}.csv"
    upload_dataframe_to_s3(bucket=bucket, df=df, key=transformed_key)

    # Move RAW JSON out of the to_processed inbox
    s3 = boto3.client("s3")
    old_key = f"{source_prefix}{raw_filename}"
    new_key = f"{target_raw_prefix}{raw_filename}"
    s3.copy_object(Bucket=bucket, CopySource={"Bucket": bucket, "Key": old_key}, Key=new_key)
    s3.delete_object(Bucket=bucket, Key=old_key)

    print(f"OK -> {transformed_key}")
    print(f"MOVED RAW -> {new_key}")
    return transformed_key


# =========================
# Taxi transforms
# =========================

def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Minimal cleanup for taxi trips:
    - Drop large/unused geometry columns if present
    - Drop fully empty rows
    - Normalize column names (community area IDs)
    - Add hourly-rounded 'datetime_for_weather' from 'trip_start_timestamp'
    """
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips must be a pandas DataFrame")

    # Drop heavy or unused columns if they exist
    drop_cols = [
        "pickup_census_tract", "dropoff_census_tract",
        "pickup_centroid_location", "dropoff_centroid_location"
    ]
    to_drop = [c for c in drop_cols if c in taxi_trips.columns]
    if to_drop:
        taxi_trips.drop(to_drop, axis=1, inplace=True)

    # Remove empty rows
    taxi_trips.dropna(how="all", inplace=True)

    # Normalize column names if source uses different naming
    rename_map = {}
    if "pickup_community_area" in taxi_trips.columns:
        rename_map["pickup_community_area"] = "pickup_community_area_id"
    if "dropoff_community_area" in taxi_trips.columns:
        rename_map["dropoff_community_area"] = "dropoff_community_area_id"
    if rename_map:
        taxi_trips.rename(columns=rename_map, inplace=True)

    # Create hourly-rounded timestamp for joining to weather later
    if "trip_start_timestamp" in taxi_trips.columns:
        taxi_trips["datetime_for_weather"] = (
            pd.to_datetime(taxi_trips["trip_start_timestamp"], errors="coerce")
              .dt.floor("h")
        )

    return taxi_trips


def update_master(source_df: pd.DataFrame, master_df: pd.DataFrame, id_col: str, value_col: str) -> pd.DataFrame:
    """
    Grow a small master table (e.g. company, payment_type) with any new values found in source_df[value_col].
    If no new values are found, returns master_df unchanged.
    """
    if master_df is None or master_df.empty:
        master_df = pd.DataFrame(columns=[id_col, value_col])

    if value_col not in source_df.columns:
        return master_df

    max_id = int(master_df[id_col].max()) if not master_df.empty else 0
    existing = set(master_df[value_col].astype(str).tolist())

    candidates = [
        v for v in source_df[value_col].astype(str).dropna().unique().tolist()
        if v not in existing
    ]
    if not candidates:
        return master_df

    new_rows = pd.DataFrame({
        id_col: range(max_id + 1, max_id + 1 + len(candidates)),
        value_col: candidates
    })
    return pd.concat([master_df, new_rows], ignore_index=True)


def attach_master_ids(taxi_df: pd.DataFrame,
                      payment_master: pd.DataFrame,
                      company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Left-join the master IDs back to the taxi fact (pandas merge).
    Requires ('payment_type_id','payment_type') and ('company_id','company') columns in master tables.
    """
    out = taxi_df.copy()

    if (isinstance(payment_master, pd.DataFrame) and not payment_master.empty
        and {"payment_type_id", "payment_type"}.issubset(payment_master.columns)
        and "payment_type" in out.columns):
        out = out.merge(
            payment_master[["payment_type_id", "payment_type"]],
            on="payment_type", how="left"
        )

    if (isinstance(company_master, pd.DataFrame) and not company_master.empty
        and {"company_id", "company"}.issubset(company_master.columns)
        and "company" in out.columns):
        out = out.merge(
            company_master[["company_id", "company"]],
            on="company", how="left"
        )

    return out


# =========================
# Weather transform
# =========================

def transform_weather_data(weather_json: dict) -> pd.DataFrame:
    """
    Flatten Open-Meteo hourly arrays into a tabular DataFrame with:
    datetime, temperature, wind_speed, rain, precipitation
    """
    hourly = weather_json.get("hourly", {})
    table = {
        "datetime":      hourly.get("time", []),
        "temperature":   hourly.get("temperature_2m", []),
        "wind_speed":    hourly.get("wind_speed_10m", []),
        "rain":          hourly.get("rain", []),
        "precipitation": hourly.get("precipitation", []),
    }
    df = pd.DataFrame(table)
    if "datetime" in df.columns:
        df["datetime"] = pd.to_datetime(df["datetime"], errors="coerce")
    return df


# =========================
# JSON → DataFrame utility
# =========================

def json_to_dataframe(payload: Any) -> pd.DataFrame:
    """
    Flexible JSON → DataFrame:
    - list → DataFrame(list)
    - dict with key 'data' being a list → DataFrame(dict['data'])
    - plain dict → single-row DataFrame
    - otherwise → empty DataFrame
    """
    if isinstance(payload, list):
        return pd.DataFrame(payload)
    if isinstance(payload, dict):
        if "data" in payload and isinstance(payload["data"], list):
            return pd.DataFrame(payload["data"])
        return pd.DataFrame([payload])
    return pd.DataFrame()


def looks_like_error_payload(payload: Any, df: pd.DataFrame) -> bool:
    """
    Heuristic: Socrata sometimes returns a small dict like {'error': ..., 'message': ...}.
    Treat such 1-row/2-column frames as an error placeholder, not real data.
    """
    if isinstance(payload, dict):
        keys = set(payload.keys())
        if keys.issubset({"error", "message"}) and 0 < len(keys) <= 2:
            return True
    # Also catch the DataFrame shape (1 row, columns only error/message)
    if isinstance(df, pd.DataFrame) and not df.empty:
        cols = set(df.columns)
        if cols.issubset({"error", "message"}) and df.shape[1] <= 2:
            return True
    return False


# =========================
# Lambda entrypoint
# =========================

def lambda_handler(event, context):
    """
    Transform & Load pipeline:
      - Reads RAW JSON from S3 inbox (rawdata/to_processed/...)
      - Transforms into normalized CSVs (transformed_data/...)
      - Moves the RAW files out of the inbox (rawdata/processede/...)
      - Updates/archives master tables (payment_type/company)
    This handler is safe against S3 recursion: it ignores events not under 'rawdata/to_processed/'.
    """
    s3 = boto3.client("s3")

    # --- Recursion guard: only react to keys under rawdata/to_processed/
    if event and "Records" in event:
        for rec in event["Records"]:
            key = rec["s3"]["object"]["key"]
            if not key.startswith("rawdata/to_processed/"):
                print(f"SKIP {key}: outside of rawdata/to_processed/.")
                return {"statusCode": 200, "body": json.dumps({"skipped": key})}

    # --- Load current masters (or start with empty frames)
    try:
        payment_master = read_csv_from_s3(BUCKET, PAYMENT_MASTER_PREFIX, PAYMENT_MASTER_FILE)
    except ClientError:
        payment_master = pd.DataFrame(columns=["payment_type_id", "payment_type"])

    try:
        company_master = read_csv_from_s3(BUCKET, COMPANY_MASTER_PREFIX, COMPANY_MASTER_FILE)
    except ClientError:
        company_master = pd.DataFrame(columns=["company_id", "company"])

    written_keys: List[str] = []

    # ============== TAXI ==============
    for key in list_json_keys(BUCKET, RAW_TAXI_PREFIX):
        filename = key.split("/")[-1]
        payload = read_json_from_s3(BUCKET, key)
        taxi_raw_df = json_to_dataframe(payload)

        # If this looks like an API error placeholder, move RAW and skip writing transformed.
        if taxi_raw_df.empty or looks_like_error_payload(payload, taxi_raw_df):
            print(f"SKIP transformed for {key}: empty/error payload.")
            # Still move RAW out of the inbox to avoid re-processing
            move_raw_and_write_transformed(
                df=pd.DataFrame(),  # write an empty CSV only if you really want; else skip writing by not calling
                datetime_col="datetime_for_weather",
                bucket=BUCKET,
                file_type="taxi",
                raw_filename=filename,
                source_prefix=RAW_TAXI_PREFIX,
                target_raw_prefix=PROCESSED_TAXI_RAW_PREFIX,
                transformed_prefix=TX_TRANSFORMED_PREFIX
            )
            # NOTE: If you do NOT want an empty transformed CSV, remove the call above
            # and just copy+delete the RAW yourself here.
            continue

        # Clean & normalize taxi data
        taxi_transformed = taxi_trips_transformations(taxi_raw_df)
        print("TAXI columns:", list(taxi_transformed.columns))

        # Grow masters only if the columns exist
        if "company" in taxi_transformed.columns:
            company_master = update_master(taxi_transformed, company_master, "company_id", "company")
        else:
            print("SKIP master update: no 'company' column.")
        if "payment_type" in taxi_transformed.columns:
            payment_master = update_master(taxi_transformed, payment_master, "payment_type_id", "payment_type")
        else:
            print("SKIP master update: no 'payment_type' column.")

        # Attach IDs from masters back to fact
        taxi_with_ids = attach_master_ids(taxi_transformed, payment_master, company_master)

        # Write transformed CSV and move RAW JSON out of the inbox
        out_key = move_raw_and_write_transformed(
            df=taxi_with_ids,
            datetime_col="datetime_for_weather",
            bucket=BUCKET,
            file_type="taxi",
            raw_filename=filename,
            source_prefix=RAW_TAXI_PREFIX,
            target_raw_prefix=PROCESSED_TAXI_RAW_PREFIX,
            transformed_prefix=TX_TRANSFORMED_PREFIX
        )
        if out_key:
            written_keys.append(out_key)

    # Persist updated masters (with archival of previous version)
    upload_master_data_to_s3(BUCKET, PAYMENT_MASTER_PREFIX, "payment_type", payment_master)
    upload_master_data_to_s3(BUCKET, COMPANY_MASTER_PREFIX, "company", company_master)
    print("payment_type_master updated.")
    print("company_master updated.")

    # ============= WEATHER =============
    for key in list_json_keys(BUCKET, RAW_WEATHER_PREFIX):
        filename = key.split("/")[-1]
        payload = read_json_from_s3(BUCKET, key)
        weather_df = transform_weather_data(payload)

        # If weather is empty, still move the RAW to avoid retries
        if weather_df.empty:
            print(f"SKIP transformed for {key}: empty weather frame.")
            move_raw_and_write_transformed(
                df=pd.DataFrame(),  # write empty CSV only if you want one
                datetime_col="datetime",
                bucket=BUCKET,
                file_type="weather",
                raw_filename=filename,
                source_prefix=RAW_WEATHER_PREFIX,
                target_raw_prefix=PROCESSED_WEATHER_RAW_PREFIX,
                transformed_prefix=WX_TRANSFORMED_PREFIX
            )
            continue

        out_key = move_raw_and_write_transformed(
            df=weather_df,
            datetime_col="datetime",
            bucket=BUCKET,
            file_type="weather",
            raw_filename=filename,
            source_prefix=RAW_WEATHER_PREFIX,
            target_raw_prefix=PROCESSED_WEATHER_RAW_PREFIX,
            transformed_prefix=WX_TRANSFORMED_PREFIX
        )
        if out_key:
            written_keys.append(out_key)

    # Final response (handy in test console)
    return {"statusCode": 200, "body": json.dumps({"written": written_keys})}


ModuleNotFoundError: No module named 'boto3'