In [None]:
# Cell 1 — Imports & SparkSession Setup
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Option A: fetch creds from environment
ACCESS_KEY = os.environ["AWS_ACCESS_KEY_ID"]
SECRET_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]

# Option B (uncomment to use Airflow hooks instead):
# from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# s3 = S3Hook(aws_conn_id="aws_default")
# ACCESS_KEY = s3.aws_access_key_id
# SECRET_KEY = s3.aws_secret_access_key

S3_BUCKET = "s3a://s3-mycollege"

# Database config from environment (or override defaults)
DB_CONFIG = {
    "host":     os.environ.get("PG_HOST",    "localhost"),
    "port":     os.environ.get("PG_PORT",    "5432"),
    "user":     os.environ.get("PG_USER",    "postgres"),
    "password": os.environ["PG_PASSWORD"],
    "db":       os.environ.get("PG_DATABASE","tourism"),
    "driver":   "org.postgresql.Driver"
}

spark = (
    SparkSession.builder
      .appName("01_ingest")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
      .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
      .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.2,"
        "com.amazonaws:aws-java-sdk-bundle:1.11.1026,"
        "org.postgresql:postgresql:42.6.0"
      )
      .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", SECRET_KEY)


# Cell 2 — Utility Functions
def list_s3_csv(folder: str):
    """Return list of .csv paths under S3 folder."""
    Path = spark._jvm.org.apache.hadoop.fs.Path
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI(S3_BUCKET),
        hadoop_conf
    )
    path = Path(f"{S3_BUCKET}/{folder}/")
    files = fs.listStatus(path)
    return [f.getPath().toString() for f in files if f.getPath().toString().endswith(".csv")]

def prep_df(path: str, log_transform=False, winsor=(0.01,0.99)):
    """
    Read CSV, filter negatives, median-impute OBS_VALUE, optional log + winsor.
    Returns Spark DataFrame.
    """
    from pyspark.sql.functions import log1p, when

    df = (
        spark.read.option("header","true")
                   .option("inferSchema","true")
                   .csv(path)
                   .filter(col("OBS_VALUE") >= 0)
    )

    # Median impute
    med = df.approxQuantile("OBS_VALUE",[0.5],0.001)[0]
    df = df.fillna({"OBS_VALUE": med})

    if log_transform:
        df = df.withColumn("log_OBS_VALUE", log1p(col("OBS_VALUE")))
        lb, ub = df.approxQuantile("log_OBS_VALUE", winsor, 0.001)
        df = df.withColumn(
            "log_OBS_VALUE",
            when(col("log_OBS_VALUE") < lb, lb)
           .when(col("log_OBS_VALUE") > ub, ub)
           .otherwise(col("log_OBS_VALUE"))
        )

    return df


# Cell 3 — Ingest & Clean All Raw CSVs
# Occupancy files
occ_files = list_s3_csv("occupancy")
for fp in occ_files:
    name = fp.split("/")[-1]
    do_log = any(kw in name.lower() for kw in ("arm","nim"))
    df = prep_df(fp, log_transform=do_log)

    # write cleaned occupancy to Postgres staging table
    jdbc_url = (
        f"jdbc:postgresql://{DB_CONFIG['host']}:"
        f"{DB_CONFIG['port']}/{DB_CONFIG['db']}"
    )
    props = {
        "user": DB_CONFIG["user"],
        "password": DB_CONFIG["password"],
        "driver": DB_CONFIG["driver"]
    }
    table = f"staging_occ_{name.replace('.csv','')}"
    df.write.mode("overwrite").jdbc(jdbc_url, table, properties=props)
    print(f"✅ Wrote cleaned occupancy → {table}")

# Capacity files
cap_files = list_s3_csv("capacity")
for fp in cap_files:
    name = fp.split("/")[-1]
    df = prep_df(fp, log_transform=True)

    # write cleaned capacity to Postgres staging table
    jdbc_url = (
        f"jdbc:postgresql://{DB_CONFIG['host']}:"
        f"{DB_CONFIG['port']}/{DB_CONFIG['db']}"
    )
    props = {
        "user": DB_CONFIG["user"],
        "password": DB_CONFIG["password"],
        "driver": DB_CONFIG["driver"]
    }
    table = f"staging_cap_{name.replace('.csv','')}"
    df.write.mode("overwrite").jdbc(jdbc_url, table, properties=props)
    print(f"✅ Wrote cleaned capacity → {table}")
