In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, lit, month, avg, count

# --- 1Ô∏è‚É£ Connect ---
session = Session.builder.configs({
    "account": "<your_account>",
    "user": "<your_username>",
    "password": "<your_password>",
    "warehouse": "<your_warehouse>",
    "database": "SVI",
    "schema": "PUBLIC"
}).create()

print("‚úÖ Connected to Snowflake")


# --- 2Ô∏è‚É£ Load table ---
df = session.table("ARIZONA_2022_MAXTEMP")

print("üìã Columns in table:")
print(df.columns)


# --- 3Ô∏è‚É£ Identify county columns ---
counties = [c for c in df.columns if c.upper() != "DATA"]

print(f"‚úÖ Found {len(counties)} counties:")
print(counties)


# --- 4Ô∏è‚É£ Reshape (wide ‚Üí long) ---
melted = None
for county in counties:
    county_df = (
        df.select(col("DATA").alias("DATE"), col(county).alias("MAX_TEMP"))
          .with_column("COUNTY", lit(county))
    )
    melted = county_df if melted is None else melted.union_all(county_df)

print("‚úÖ Reshaped to long format")


# --- 5Ô∏è‚É£ Missing value summary ---
missing_summary = (
    melted
    .group_by("COUNTY")
    .agg(
        count("*").alias("TOTAL_DAYS"),
        count(col("MAX_TEMP")).alias("NON_NULL_DAYS")
    )
    .with_column("MISSING_DAYS", col("TOTAL_DAYS") - col("NON_NULL_DAYS"))
)

print("üìâ Missing data summary:")
missing_summary.show()


# --- 6Ô∏è‚É£ Monthly mean max temps ---
monthly_means = (
    melted
    .with_column("MONTH", month(col("DATE")))
    .group_by("COUNTY", "MONTH")
    .agg(avg(col("MAX_TEMP")).alias("MEAN_MAX_TEMP"))
)

print("‚úÖ Monthly means calculated")
monthly_means.show(5)


# --- 7Ô∏è‚É£ Pivot months into columns ---
pivoted = (
    monthly_means
    .pivot("MONTH", [1,2,3,4,5,6,7,8,9,10,11,12])
    .agg({"MEAN_MAX_TEMP": "avg"})
    .sort(col("COUNTY"))
)

print("‚úÖ Pivot complete")
pivoted.show()


# --- 8Ô∏è‚É£ (Optional) Save back to Snowflake ---
pivoted.write.save_as_table("ARIZONA_MEAN_BY_MONTH", mode="overwrite")

print("‚úÖ Saved as SVI.PUBLIC.ARIZONA_MEAN_BY_MONTH")


In [None]:
# --- 9Ô∏è‚É£ Rename month columns to full month names ---
rename_map = {
    '"CAST(1 AS NUMBER(38,0))"': "January",
    '"CAST(2 AS NUMBER(38,0))"': "February",
    '"CAST(3 AS NUMBER(38,0))"': "March",
    '"CAST(4 AS NUMBER(38,0))"': "April",
    '"CAST(5 AS NUMBER(38,0))"': "May",
    '"CAST(6 AS NUMBER(38,0))"': "June",
    '"CAST(7 AS NUMBER(38,0))"': "July",
    '"CAST(8 AS NUMBER(38,0))"': "August",
    '"CAST(9 AS NUMBER(38,0))"': "September",
    '"CAST(10 AS NUMBER(38,0))"': "October",
    '"CAST(11 AS NUMBER(38,0))"': "November",
    '"CAST(12 AS NUMBER(38,0))"': "December"
}

# Apply renaming
renamed = pivoted
for old, new in rename_map.items():
    if old in renamed.columns:
        renamed = renamed.with_column_renamed(old, new)

# Save to a new clean table
renamed.write.save_as_table("ARIZONA_MEAN_BY_MONTH_CLEAN", mode="overwrite")

print("‚úÖ Saved as SVI.PUBLIC.ARIZONA_MEAN_BY_MONTH_CLEAN with month names")


In [None]:
import snowflake.snowpark.functions as F

# Create a Snowpark session object automatically in a notebook:
session = get_active_session()

# Load the SVI dataset
df = session.table("SVI.PUBLIC.ARIZONA_SVI_CLEAN")


In [None]:
import snowflake.snowpark.functions as F

session = get_active_session()

cols = ['"county"', '"mp_noveh"', '"mp_uninsur"', '"mp_pov150"', '"e_heat_risk"', '"e_daypop"']

df_selected = df.select(cols)

df_grouped = (
    df_selected.group_by('"county"')
    .agg(
        F.mean('"mp_noveh"').alias("mp_noveh_mean"),
        F.mean('"mp_uninsur"').alias("mp_uninsur_mean"),
        F.mean('"mp_pov150"').alias("mp_pov150_mean"),
        F.mean('"e_heat_risk"').alias("e_heat_risk_mean"),
        F.mean('"e_daypop"').alias("e_daypop_mean")
    )
)

df_grouped.write.save_as_table(
    "SVICleaned",
    mode="overwrite"
)


In [None]:
df = session.table("ARIZONA_SVI_CLEAN")
print(df.columns)

In [None]:
df = session.table("CLEANED_COUNTY_HOSPITAL_SUMMARY")
print(df.columns)

In [None]:
import snowflake.snowpark.functions as F

# 1. Load tables
svi_large = session.table("ARIZONA_SVI_CLEAN")
svi_cleaned = session.table("SVICleaned")
hosp = session.table("CLEANED_COUNTY_HOSPITAL_SUMMARY")

# 2. Aggregate population from large SVI table (per county)
pop_per_county = (
    svi_large.group_by('"county"')
    .agg(F.sum('"e_daypop"').alias("population_total"))
    .rename({'"county"': '"county_pop"'})  # rename to avoid ambiguity
)

# 3. Merge population with SVICleaned metrics
svi_with_pop = svi_cleaned.join(
    pop_per_county,
    svi_cleaned['"county"'] == pop_per_county['"county_pop"'],
    how="left"
).drop(pop_per_county['"county_pop"'])  # drop renamed duplicate

# 4. Create clean join key for hospital merge
svi_with_pop = svi_with_pop.with_column(
    "CLEAN_COUNTY",
    F.lower(F.regexp_replace(F.col('"county"'), " county", ""))
)

hosp_clean = hosp.with_column(
    "CLEAN_COUNTY",
    F.lower(F.regexp_replace(F.col('"county"'), " county", ""))
).select(
    "num_hospitals",
    "total_capacity",
    "CLEAN_COUNTY"
)

# 5. Merge hospitals
merged = svi_with_pop.join(
    hosp_clean,
    on="CLEAN_COUNTY",
    how="left"
)

# 6. Fill nulls in hospital data
merged = merged.with_column("num_hospitals", F.coalesce(F.col("num_hospitals"), F.lit(0)))
merged = merged.with_column("total_capacity", F.coalesce(F.col("total_capacity"), F.lit(0)))

# 7. Compute per-population metrics
merged = merged.with_column(
    "hospitals_per_10k",
    (F.col("num_hospitals") / F.col("population_total")) * 10000
)
merged = merged.with_column(
    "beds_per_10k",
    (F.col("total_capacity") / F.col("population_total")) * 10000
)

# 8. Optional: medical desert flag
merged = merged.with_column(
    "is_medical_desert",
    F.when(
        (F.col("beds_per_10k") < 5) | (F.col("hospitals_per_10k") < 0.5),
        F.lit(1)
    ).otherwise(F.lit(0))
)

# 9. Drop temporary join key
merged = merged.drop("CLEAN_COUNTY")

# 10. Save final merged table
merged.write.save_as_table("SVI_HOSPITAL_MERGED", mode="overwrite")

merged.show()


In [None]:
import snowflake.snowpark.functions as F
session = get_active_session()

svi = session.table("SVI.PUBLIC.SVICleaned")
hosp = session.table("SVI.PUBLIC.CLEANED_COUNTY_HOSPITAL_SUMMARY")


In [None]:
hosp_clean = hosp.with_column(
    "clean_county",
    F.lower(
        F.regexp_replace(
            F.regexp_replace(F.col('"county"'), "county", ""),
            r"\s+", ""
        )
    )
)


In [None]:
hosp_clean = hosp.with_column(
    "clean_county",
    F.lower(
        F.regexp_replace(
            F.regexp_replace(F.col('"county"'), "county", ""),
            r"\s+", ""
        )
    )
)


In [None]:
merged = svi_clean.join(
    hosp_clean,
    svi_clean["clean_county"] == hosp_clean["clean_county"],
    how="left"
)


In [None]:
merged = merged.with_column(
    "capacity_per_person",
    F.col('"total_capacity"') / F.col('"e_totpop"')
)


In [None]:
cols = ['"county"', "mp_noveh", "mp_uninsur", "mp_pov150", "e_heat_risk", "E_DAYPOP"]

df_selected = df.select(cols)

df_grouped = (
    df_selected.group_by("county")
    .agg(
        F.mean("mp_noveh").alias("MP_NOVEH_MEAN"),
        F.mean("mp_uninsur").alias("MP_UNINSUR_MEAN"),
        F.mean("mp_pov150").alias("MP_POV150_MEAN"),
        F.mean("e_heat_risk").alias("E_HEAT_RISK_MEAN"),
        F.mean("E_DAYPOP").alias("E_DAYPOP")  # ADD THIS
    )
)

df_grouped.write.save_as_table("SVICLEANED", mode="overwrite")


In [None]:
import snowflake.snowpark.functions as F

# 1. Load tables
svi_large = session.table("ARIZONA_SVI_CLEAN")
hosp = session.table("CLEANED_COUNTY_HOSPITAL_SUMMARY")

# 2. Aggregate SVI metrics at county level (ONLY your original metrics)
svi_aggregated = (
    svi_large.group_by('"county"')
    .agg(
        # Population (for denominator)
        F.sum('"e_daypop"').alias("population_total"),
        
        # Your original metrics only
        F.mean('"mp_noveh"').alias("mp_noveh_mean"),
        F.mean('"mp_uninsur"').alias("mp_uninsur_mean"),
        F.mean('"mp_pov150"').alias("mp_pov150_mean"),
        F.mean('"e_heat_risk"').alias("e_heat_risk_mean")
    )
)

# 3. Create clean join keys (remove " County" suffix and lowercase)
svi_with_key = svi_aggregated.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.col('"county"'), " [Cc]ounty", ""))
)

hosp_clean = hosp.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.col('"county"'), " [Cc]ounty", ""))
).select(
    F.col('"num_hospitals"').alias("num_hospitals"),
    F.col('"total_capacity"').alias("total_capacity"),
    "clean_county"
)

# 4. Merge hospital data with SVI
merged = svi_with_key.join(
    hosp_clean,
    on="clean_county",
    how="left"
)

# 5. Handle nulls (now without quotes since we aliased them)
merged = (
    merged
    .with_column(
        "num_hospitals", 
        F.coalesce(F.col("num_hospitals"), F.lit(0))
    )
    .with_column(
        "total_capacity", 
        F.coalesce(F.col("total_capacity"), F.lit(0))
    )
)

# 6. Calculate per-capita metrics with safe division
merged = (
    merged
    .with_column(
        "hospitals_per_10k",
        F.when(F.col("population_total") > 0,
               (F.col("num_hospitals") / F.col("population_total")) * 10000)
        .otherwise(F.lit(0))
    )
    .with_column(
        "beds_per_10k",
        F.when(F.col("population_total") > 0,
               (F.col("total_capacity") / F.col("population_total")) * 10000)
        .otherwise(F.lit(0))
    )
    .with_column(
        "carrying_capacity_ratio",
        F.when(F.col("population_total") > 0,
               F.col("total_capacity") / F.col("population_total"))
        .otherwise(F.lit(0))
    )
)

# 7. Add medical desert flag
merged = merged.with_column(
    "is_medical_desert",
    F.when(
        (F.col("beds_per_10k") < 5) | (F.col("hospitals_per_10k") < 0.5),
        F.lit(1)
    ).otherwise(F.lit(0))
)

# 8. Drop temporary join key
merged = merged.drop("clean_county")

# 9. Save final merged table
merged.write.save_as_table("SVI_HOSPITAL_MERGED", mode="overwrite")

# 10. Display results
merged.show()

In [None]:
temp_data = session.table("ARIZONA_2022_MAXTEMP")
print("Columns:", temp_data.columns)
temp_data.show(5)

In [None]:
import snowflake.snowpark.functions as F

# 1. Load the temperature data
temp_data = session.table("ARIZONA_2022_MAXTEMP")

# 2. Find max temperature for each county column
max_temps_dict = {}

county_columns = ['YAVAPAI', 'COCHISE', 'COCONINO', 'MARICOPA', 'GILA', 'PIMA', 
                  'PINAL', '"Santa Cruz"', 'GRAHAM', 'GREENLEE', 'APACHE', 
                  'NAVAJO', 'MOHAVE', '"La Paz"', 'YUMA']

# Calculate max for each county
max_temp_agg = temp_data.agg(
    *[F.max(col).alias(col.strip('"')) for col in county_columns]
)

# 3. Unpivot to get county-temperature pairs
unpivot_exprs = []
for county_col in county_columns:
    clean_name = county_col.strip('"')
    unpivot_exprs.append(
        max_temp_agg.select(
            F.lit(clean_name).alias("county_name"),
            F.col(clean_name).alias("max_temp_2022")
        )
    )

# Union all counties
max_temps = unpivot_exprs[0]
for expr in unpivot_exprs[1:]:
    max_temps = max_temps.union(expr)

# 4. Standardize county names for joining (lowercase, no spaces)
max_temps = max_temps.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.col("county_name"), " ", ""))
)

# 5. Load your merged table and create join key
merged = session.table("SVI_HOSPITAL_MERGED")

merged_with_key = merged.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.regexp_replace(F.col('"county"'), " County", ""), " ", ""))
)

# 6. Join temperature data
final_merged = merged_with_key.join(
    max_temps.select("clean_county", "max_temp_2022"),
    on="clean_county",
    how="left"
)

# 7. Drop temporary join key
final_merged = final_merged.drop("clean_county")

# 8. Save the updated table
final_merged.write.save_as_table("SVI_HOSPITAL_MERGED", mode="overwrite")

# 9. Display results
print("Updated table with max temperatures:")
final_merged.select(
    '"county"',
    "population_total",
    "num_hospitals",
    "beds_per_10k",
    "max_temp_2022"
).show()

In [None]:
from snowflake.snowpark import Session

session = Session.builder.configs({
    "account": "<your_account>",
    "user": "<your_username>",
    "password": "<your_password>",
    "warehouse": "<your_warehouse>",
    "database": "SVI",
    "schema": "PUBLIC"
}).create()

print("‚úÖ Reconnected to Snowflake")


In [None]:
import snowflake.snowpark.functions as F

# 1. Load tables
svi_large = session.table("ARIZONA_SVI_CLEAN")
hosp = session.table("CLEANED_COUNTY_HOSPITAL_SUMMARY")

# 2. Aggregate SVI metrics at county level (USING ESTIMATES & PERCENTAGES, NOT MOE)
svi_aggregated = (
    svi_large.group_by('"county"')
    .agg(
        # Correct population denominator
        F.sum('"e_totpop"').alias("population_total"),
        
        # Correct vulnerability indicators
        F.mean('"ep_noveh"').alias("pct_no_vehicle"),
        F.mean('"ep_uninsur"').alias("pct_uninsured"),
        F.mean('"ep_pov150"').alias("pct_poverty"),
        F.mean('"ep_heat_risk"').alias("pct_heat_risk")
    )
)

# 3. Create clean join keys
svi_with_key = svi_aggregated.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.col('"county"'), " [Cc]ounty", ""))
)

hosp_clean = hosp.with_column(
    "clean_county",
    F.lower(F.regexp_replace(F.col('"county"'), " [Cc]ounty", ""))
).select(
    F.col('"num_hospitals"').alias("num_hospitals"),
    F.col('"total_capacity"').alias("total_capacity"),
    "clean_county"
)

# 4. Merge hospital data with SVI
merged = svi_with_key.join(
    hosp_clean,
    on="clean_county",
    how="left"
)

# 5. Handle nulls
merged = (
    merged
    .with_column("num_hospitals", F.coalesce(F.col("num_hospitals"), F.lit(0)))
    .with_column("total_capacity", F.coalesce(F.col("total_capacity"), F.lit(0)))
)

# 6. Calculate per-capita hospital access
merged = (
    merged
    .with_column(
        "hospitals_per_10k",
        F.when(F.col("population_total") > 0,
               (F.col("num_hospitals") / F.col("population_total")) * 10000)
        .otherwise(F.lit(0))
    )
    .with_column(
        "beds_per_10k",
        F.when(F.col("population_total") > 0,
               (F.col("total_capacity") / F.col("population_total")) * 10000)
        .otherwise(F.lit(0))
    )
    .with_column(
        "carrying_capacity_ratio",
        F.when(F.col("population_total") > 0,
               F.col("total_capacity") / F.col("population_total"))
        .otherwise(F.lit(0))
    )
)

# 7. Medical desert flag (same logic)
merged = merged.with_column(
    "is_medical_desert",
    F.when(
        (F.col("beds_per_10k") < 5) | (F.col("hospitals_per_10k") < 0.5),
        F.lit(1)
    ).otherwise(F.lit(0))
)

# 8. Drop temp key
merged = merged.drop("clean_county")

# 9. Save final table
merged.write.save_as_table("SVI_HOSPITAL_MERGED", mode="overwrite")

# 10. Display results
merged.show()
