In [0]:
my_scope = "DLCSCI622"  
dbutils.secrets.list(my_scope)
my_key = "CSCI-key"  
noaa_token = dbutils.secrets.get(scope="DLCSCI622", key="NOAA-key")


storage_end_point = "firstassignmentstore.dfs.core.windows.net"
container_name = "assign1" 

spark.conf.set(
    "fs.azure.account.key." + storage_end_point,
    dbutils.secrets.get(scope=my_scope, key=my_key))


uri = "abfss://" + container_name + "@" + storage_end_point + "/" 
print(uri)

putting_df = spark.read.format("delta").load(uri + "PDGA_project/Putting")

display(putting_df)

weather_df = spark.read.format("delta").load(uri + "PDGA_project/Weather_ingestion")

display(weather_df)

In [0]:
from pyspark.sql import functions as F

# dictionary of conversion factors
GHCND_CONVERSIONS = {
    "TMAX": (0.1, "degC"), "TMIN": (0.1, "degC"), "TAVG": (0.1, "degC"),
    "TOBS": (0.1, "degC"), "ADPT": (0.1, "degC"), "AWBT": (0.1, "degC"),

    "PRCP": (0.1, "mm"),   "MDPR": (0.1, "mm"),
    "WESD": (0.1, "mm"),   "WESF": (0.1, "mm"),
    "SNOW": (1.0, "mm"),   "SNWD": (1.0, "mm"),

    "AWND": (1.0, "m/s"),  "WSF2": (0.1, "m/s"),
    "WSF5": (0.1, "m/s"),  "WSFG": (0.1, "m/s"),
    "WDF2": (1.0, "deg"),  "WDF5": (1.0, "deg"),
    "WDFG": (1.0, "deg"),

    "ASLP": (0.1, "hPa"),  "ASTP": (0.1, "hPa"),

    "TSUN": (1.0, "minutes"),
    "PSUN": (1.0, "percent"),
    "SRAD": (1.0, "W/m^2"),
}





In [0]:
weather_df.filter(weather_df.datatype == "TMAX").show(20, False)

In [0]:
from pyspark.sql import functions as F

def convert_ghcnd_units_spark(df):
    """
    Convert NOAA GHCND raw values inside a Spark DataFrame.

    Input columns expected:
      - datatype  (e.g., 'TMAX', 'PRCP', 'AWND', 'ASLP', etc.)
      - value     (string or numeric)
    Other columns (date, station, attributes, Location, etc.) are passed through.

    Output adds:
      - value_numeric   (double)
      - value_converted (double, in real units)
      - unit            (string, e.g. 'degC', 'mm', 'hPa')
    """

    # Groups of datatypes
    temps = ["TMAX", "TMIN", "TAVG", "TOBS", "ADPT", "AWBT"]  # °C
    precip_tenths = ["PRCP", "MDPR", "WESD", "WESF"]          # tenths of mm
    snow_mm = ["SNOW", "SNWD"]                               # mm, already scaled

    wind_speed_tenths = ["WSF2", "WSF5", "WSFG"]             # tenths of m/s
    wind_speed_direct = ["AWND"]                             # m/s, already scaled
    wind_dir = ["WDF2", "WDF5", "WDFG"]                      # degrees

    pressure = ["ASLP", "ASTP"]                             # tenths of hPa
    sunshine = ["TSUN", "PSUN"]                             # minutes / percent
    radiation = ["SRAD"]                                    # W/m^2

    df2 = df.withColumn("value_numeric", F.col("value").cast("double"))

    # Treat NOAA missing sentinel -9999 as null
    df2 = df2.withColumn(
        "value_clean",
        F.when(F.col("value_numeric") == -9999, None).otherwise(F.col("value_numeric"))
    )

    # Units by datatype
    unit_col = (
        F.when(F.col("datatype").isin(*temps), "degC")
        .when(F.col("datatype").isin(*precip_tenths), "mm")
        .when(F.col("datatype").isin(*snow_mm), "mm")
        .when(F.col("datatype").isin(*(wind_speed_tenths + wind_speed_direct)), "m/s")
        .when(F.col("datatype").isin(*wind_dir), "deg")
        .when(F.col("datatype").isin(*pressure), "hPa")
        .when(F.col("datatype") == "TSUN", "minutes")
        .when(F.col("datatype") == "PSUN", "percent")
        .when(F.col("datatype") == "SRAD", "W/m^2")
        .otherwise(None)
    )
    df2 = df2.withColumn("unit", unit_col)

    # Conversion logic for value_converted
    value_converted = (
        F.when(F.col("value_clean").isNull(), None)

        # Temperatures (°C):
        #   If abs(value) > 70, assume tenths of °C (e.g., 261 -> 26.1)
        #   Else, assume already °C (e.g., 26.1 -> 26.1)
        .when(
            F.col("datatype").isin(*temps) & (F.abs(F.col("value_clean")) > 70),
            F.col("value_clean") / 10.0
        )
        .when(F.col("datatype").isin(*temps), F.col("value_clean"))

        # Precip & water equivalent: tenths of mm
        .when(F.col("datatype").isin(*precip_tenths), F.col("value_clean") / 10.0)

        # Snow depth/fall: mm already
        .when(F.col("datatype").isin(*snow_mm), F.col("value_clean"))

        # Wind speeds in tenths of m/s
        .when(F.col("datatype").isin(*wind_speed_tenths), F.col("value_clean") / 10.0)

        # AWND already in m/s
        .when(F.col("datatype").isin(*wind_speed_direct), F.col("value_clean"))

        # Wind direction: degrees
        .when(F.col("datatype").isin(*wind_dir), F.col("value_clean"))

        # Pressure: tenths of hPa
        .when(F.col("datatype").isin(*pressure), F.col("value_clean") / 10.0)

        # Sunshine & radiation already in real units
        .when(F.col("datatype").isin(*(sunshine + radiation)), F.col("value_clean"))

        # Fallback: no conversion
        .otherwise(F.col("value_clean"))
    )

    df2 = df2.withColumn("value_converted", value_converted)

    return df2


In [0]:
df_conv = convert_ghcnd_units_spark(weather_df)

df_conv.filter(df_conv.datatype == "TMAX").show(5, False)
display(df_conv)

In [0]:
from pyspark.sql import functions as F

df_conv = convert_ghcnd_units_spark(weather_df)

df_conv = df_conv.withColumn(
    "date",
    F.to_timestamp("date", "yyyy-MM-dd'T'HH:mm:ss").cast("date")
)

df_wide = (
    df_conv
    .groupBy("date", "station", "Location")
    .pivot("datatype")
    .agg(F.first("value_converted"))
)

fixed_cols = ["date", "station", "Location"]
other_cols = [c for c in df_wide.columns if c not in fixed_cols]
df_wide = df_wide.select(*fixed_cols, *sorted(other_cols))

display(df_wide)


In [0]:
df_disc = putting_df

df_weather = weather_df

In [0]:
from pyspark.sql import functions as F

df_disc = df_disc.withColumn(
    "event_date",
    F.to_date("end_date")   # YYYY-MM-DD → date
)

In [0]:
df_weather = df_wide \
    .withColumn("city", F.split("Location", ", ").getItem(0)) \
    .withColumn("state", F.split("Location", ", ").getItem(1))


In [0]:
display(df_disc)

In [0]:
df_disc.select("pdga_event_id").distinct().count()

In [0]:
df_weather.select("Location").distinct().count()

In [0]:
display(df_weather)

In [0]:
display(df_wide)

In [0]:
df_disc = df_disc.withColumn("event_date", F.to_date("end_date"))



df_weather = df_wide.withColumn("weather_date", F.to_date("date"))

In [0]:
df_joined = df_disc.alias("d").join(
    df_weather.alias("w"),
    F.col("d.event_date") == F.col("w.weather_date"),
    "left"   # keep all disc rows; drop weather-only rows
)


In [0]:
display(df_joined)

In [0]:
# Sanity checks
assert df_disc.count() == df_joined.count()
check = df_joined.groupBy("pdga_event_id").agg(F.countDistinct("weather_date").alias("n")).filter("n > 1")
assert check.count() == 0
df_joined.groupBy("event_name").agg(F.avg(F.col("AWND_3day_avg").isNull().cast("int")).alias("pct_missing_awnd"))


In [0]:
df_joined.select("pdga_event_id").distinct().count()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Start from df_wide: date, station, Location, TMAX, TMIN, PRCP, AWND, ...
df_weather = df_wide \
    .withColumn("weather_date", F.to_date("date")) \
    .withColumn("city",  F.split("Location", ", ").getItem(0)) \
    .withColumn("state", F.split("Location", ", ").getItem(1))

# Choose which weather metrics you care about
weather_metrics = ["TMAX", "TMIN", "PRCP", "AWND"]  

# 3-day rolling window: per city, ordered by date, include current day + 2 previous
w_city_3day = (
    Window
    .partitionBy("city")   
    .orderBy("weather_date")
    .rowsBetween(-2, 0)    # last 3 days (inclusive)
)

# Add 3-day rolling averages for each metric
for col in weather_metrics:
    df_weather = df_weather.withColumn(
        f"{col}_3day_avg",
        F.avg(F.col(col)).over(w_city_3day)
    )


df_weather_3day = df_weather.select(
    "weather_date",
    "city",
    "state",
    *[f"{c}_3day_avg" for c in weather_metrics]
)


In [0]:
display(df_weather_3day)

In [0]:
df_weather_3day.select("weather_date")

In [0]:
df_disc = df_disc.withColumn("event_city", F.col("city")) \
                 .withColumn("event_state", F.col("state"))


In [0]:
df_joined = df_disc.alias("d").join(
    df_weather_3day.alias("w"),
    (F.col("d.event_date") == F.col("w.weather_date")) &
    (F.lower(F.col("d.event_city")) == F.lower(F.col("w.city"))) &
    (F.lower(F.col("d.event_state")) == F.lower(F.col("w.state"))),
    "left"
)


In [0]:
display(df_joined)

In [0]:
df_joined.write.mode("overwrite").format("delta").save(uri + "PDGA_project/Silver")

In [0]:
df_clean_precip = df_joined.filter(F.col("PRCP_3day_avg").isNotNull())
display(df_clean_precip)

In [0]:
df_clean_precip.select("pdga_event_id").distinct().count()

In [0]:
from pyspark.sql import functions as F

# Metrics to aggregate 
performance_metrics = [
    "SG_Putting",
    "C1X_Putt_Pct",
    "C2_Putt_Pct",
    "OB_per_18",
    "Avg_Putt_Distance"
]

# Weather metrics (already 3-day averages)
weather_metrics = [
    "TMAX_3day_avg",
    "TMIN_3day_avg",
    "PRCP_3day_avg",
    "AWND_3day_avg"
]

# Build aggregations
agg_exprs = []

# Tournament performance means
for col in performance_metrics:
    agg_exprs.append(F.avg(F.col(col)).alias(f"avg_{col}"))

# Field size
agg_exprs.append(F.countDistinct("player_name").alias("num_players"))

# Take the weather values from any row 
for col in weather_metrics:
    agg_exprs.append(F.first(F.col(col)).alias(col))

# Now create the tournament-level table
df_event_summary3 = (
    df_clean_precip
    .groupBy("event_date", "event_name", "city", "state")  # one row per event
    .agg(*agg_exprs)
    .orderBy("event_date")
)


In [0]:
display(df_event_summary3)

In [0]:
df_clean = df_joined.filter(F.col("TMAX_3day_avg").isNotNull())
display(df_clean)

In [0]:
from pyspark.sql import functions as F

# Metrics to aggregate 
performance_metrics = [
    "SG_Putting",
    "C1X_Putt_Pct",
    "C2_Putt_Pct",
    "OB_per_18",
    "Avg_Putt_Distance"
]

# Weather metrics (already 3-day averages)
weather_metrics = [
    "TMAX_3day_avg",
    "TMIN_3day_avg",
    "PRCP_3day_avg",
    "AWND_3day_avg"
]

# Build aggregations
agg_exprs = []

# Tournament performance means
for col in performance_metrics:
    agg_exprs.append(F.avg(F.col(col)).alias(f"avg_{col}"))

# Field size
agg_exprs.append(F.countDistinct("player_name").alias("num_players"))

# Take the weather values from any row 
for col in weather_metrics:
    agg_exprs.append(F.first(F.col(col)).alias(col))

# Now create the tournament-level table
df_event_summary2 = (
    df_clean
    .groupBy("event_date", "event_name", "city", "state")  # one row per event
    .agg(*agg_exprs)
    .orderBy("event_date")
)


In [0]:
display(df_event_summary2)

In [0]:
from pyspark.sql import functions as F

# Metrics to aggregate (you can add more)
performance_metrics = [
    "SG_Putting",
    "C1X_Putt_Pct",
    "C2_Putt_Pct",
    "OB_per_18",
    "Avg_Putt_Distance"
]

# Weather metrics (already 3-day averages)
weather_metrics = [
    "TMAX_3day_avg",
    "TMIN_3day_avg",
    "PRCP_3day_avg",
    "AWND_3day_avg"
]

# Build aggregations
agg_exprs = []

# Tournament performance means
for col in performance_metrics:
    agg_exprs.append(F.avg(F.col(col)).alias(f"avg_{col}"))

# Field size
agg_exprs.append(F.countDistinct("player_name").alias("num_players"))

# Take the weather values from any row (they're the same for all players)
for col in weather_metrics:
    agg_exprs.append(F.first(F.col(col)).alias(col))

# Now create the tournament-level table
df_event_summary = (
    df_joined
    .groupBy("event_date", "event_name", "city", "state")  # one row per event
    .agg(*agg_exprs)
    .orderBy("event_date")
)


In [0]:
display(df_event_summary)

In [0]:
df_pandas = df_event_summary.toPandas()

import seaborn as sns
import matplotlib.pyplot as plt

sns.heatmap(
    df_pandas[[
        "TMAX_3day_avg",
        "PRCP_3day_avg",
        "AWND_3day_avg",
        "avg_SG_Putting",
        "avg_C1X_Putt_Pct",
        "avg_C2_Putt_Pct",
        "avg_OB_per_18"
    ]].corr(),
    annot=True,
    cmap="coolwarm"
)
plt.show()
