In [None]:
import pandas as pd
import folium
from folium.plugins import MarkerCluster
from ipywidgets import interact, IntSlider, HTML
import os
from geopy.geocoders import Nominatim
import kagglehub
import re
import numpy as np
import requests
import logging

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, udf, mean, when, lit, max as spark_max, min as spark_min
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.types import DoubleType, ArrayType
from pyspark.sql.functions import regexp_extract


# Setup basic logging to console with timestamps
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')

logging.warning("Starting script execution...")

# Initialize Spark session
spark = SparkSession.builder.appName("US_Cost_of_living").getOrCreate()
logging.warning("Spark session initialized.")

# Load dataset
logging.warning("Downloading and loading dataset with kagglehub...")
path = kagglehub.dataset_download("asaniczka/us-cost-of-living-dataset-3171-counties")
csv_file = os.path.join(path, "cost_of_living_us.csv")

# Load CSV into pandas first
logging.warning("Loading CSV into pandas dataframe...")
df = pd.read_csv(csv_file, encoding="latin1")
logging.warning(f"Loaded {len(df)} rows.")

# Create a geolocator to convert county names to coordinates
geolocator = Nominatim(user_agent="col_map_app", timeout=10)

# Define geocode cache file
geocodes_cache_file = "geocodes_cache_col.csv"

# Load existing cached geocodes, if any
try:
    geocodes_cache = pd.read_csv(geocodes_cache_file)
    logging.warning(f"Loaded geocodes cache with {len(geocodes_cache)} entries.")
except FileNotFoundError:
    geocodes_cache = pd.DataFrame(columns=["state", "county", "latitude", "longitude"])
    logging.warning("No existing geocodes cache found, starting fresh.")

# Function to get coordinates for a given county and state with logging
def get_coordinates(state, county):
    global geocodes_cache

    logging.info(f"Geocoding: {county}, {state}")

    # Check if the coordinates are already in the cache
    cached_location = geocodes_cache[(geocodes_cache['state'] == state) &
                                     (geocodes_cache['county'] == county)]

    if not cached_location.empty:
        lat = cached_location['latitude'].iloc[0]
        lon = cached_location['longitude'].iloc[0]

        # If coordinates are NaN or None, skip geocoding (known missing location)
        if pd.isna(lat) or pd.isna(lon):
            logging.info(f"Previously marked as not found: {county}, {state}, skipping geocoding.")
            return None
        else:
            logging.info(f"Cache hit for {county}, {state}")
            return (lat, lon)

    try:
        county_name = county.replace(" County", "")
        location = geolocator.geocode({"city": county_name, "state": state, "country": "USA"})

        if location:
            logging.info(f"Geocoded {county}, {state} to ({location.latitude}, {location.longitude})")
            new_location = pd.DataFrame({"state": [state], "county": [county],
                                         "latitude": [location.latitude], "longitude": [location.longitude]})
        else:
            logging.warning(f"Geocode not found for {county}, {state}. Marking as missing.")
            # Mark missing in cache with NaN coordinates
            new_location = pd.DataFrame({"state": [state], "county": [county],
                                         "latitude": [np.nan], "longitude": [np.nan]})

        geocodes_cache = pd.concat([geocodes_cache, new_location], ignore_index=True)
        geocodes_cache.to_csv(geocodes_cache_file, index=False)

        if location:
            return (location.latitude, location.longitude)
        else:
            return None

    except Exception as e:
        logging.error(f"Geocoding location error for {county}, {state}: {e}")
        return None

logging.warning("Starting geocoding for all rows...")
# Apply your get_coordinates function using pandas apply with caching
df['coordinates'] = df.apply(lambda row: get_coordinates(row['state'], row['county']), axis=1)
logging.warning("Geocoding completed.")

# Extract latitude and longitude into separate columns before creating Spark DataFrame
df['latitude'] = df['coordinates'].apply(lambda x: x[0] if x else None)
df['longitude'] = df['coordinates'].apply(lambda x: x[1] if x else None)

# Drop the original 'coordinates' column as it's not needed in Spark DataFrame
df = df.drop(columns=['coordinates'])


# Then create spark dataframe from pandas df
df_spark = spark.createDataFrame(df)
logging.warning("Converted pandas dataframe to Spark dataframe.")

# Extract numbers using regex directly in Spark
df_spark = df_spark.withColumn("parents", regexp_extract(col("family_member_count"), r"(\d+)\s*p", 1).cast("int"))
df_spark = df_spark.withColumn("children", regexp_extract(col("family_member_count"), r"(\d+)\s*c", 1).cast("int"))

# Fill missing values with 0 (for rows where regex didn't match)
df_spark = df_spark.fillna({'parents': 0, 'children': 0})

# Compute total family members
df_spark = df_spark.withColumn("family_members", col("parents") + col("children"))
logging.warning("Calculating total_cost per person...")
# Calculate total_cost per person
# Use null-safe division by checking for family_members > 0
df_spark = df_spark.withColumn(
    "total_cost",
    col("total_cost").cast(DoubleType()) /
    when(col("family_members") > 0, col("family_members")).otherwise(None)
)

logging.warning("Aggregating average total_cost per state and county...")
# Average all total_cost per person from the same unique combination of county and state to an averaged result per combination
df_spark = df_spark.groupBy("state", "county", "latitude", "longitude").agg(mean("total_cost").alias("total_cost"))

# Calculate min and max
logging.warning("Calculating min and max total_cost...")
# Collect the min and max values safely, handling potential None results
min_result = df_spark.agg(spark_min("total_cost")).collect()
min_val = min_result[0][0] if min_result and min_result[0][0] is not None else None

max_result = df_spark.agg(spark_max("total_cost")).collect()
max_val = max_result[0][0] if max_result and max_result[0][0] is not None else None

logging.warning(f"Min total_cost: {min_val}, Max total_cost: {max_val}")


logging.warning("Calculating ranking column...")
# Add ranking column
# Check if min_val and max_val are valid before calculating the ranking
if min_val is not None and max_val is not None and (max_val - min_val) != 0:
    df_spark = df_spark.withColumn("cost_of_living_ranking", 1 - ((col("total_cost") - lit(min_val)) / (lit(max_val - min_val))))
    logging.warning("Ranking column calculated.")
else:
    # If min/max are None or the range is zero, ranking cannot be calculated meaningfully
    logging.warning("Could not calculate ranking column due to invalid min/max values or zero range. Setting ranking to None.")
    df_spark = df_spark.withColumn("cost_of_living_ranking", lit(None).cast(DoubleType())) # Assign None to ranking column

logging.warning("Selecting and saving final results to CSV...")
# Select desired columns and convert to Pandas
# Ensure 'cost_of_living_ranking' column is included even if it's all None
ranking_df = df_spark.select("state", "county", "latitude", "longitude", "total_cost", "cost_of_living_ranking").toPandas()

# Save to CSV
ranking_df.to_csv("cost_of_living_ranking.csv", index=False)

logging.warning("Script finished successfully and CSV saved.")



In [None]:
import pandas as pd
import re

# Load the ranked crime dataset
col_df = pd.read_csv("cost_of_living_ranking.csv")

# Step 3: Remove "county" or "COUNTY" from county column values and strip whitespace
col_df["county"] = col_df["county"].str.replace(r"\bcounty\b", "", flags=re.IGNORECASE, regex=True).str.strip()

# Save the updated DataFrame to a new CSV file
col_df.to_csv("Cost_of_living_ranked.csv", index=False)

print("Processed data saved to Cost_of_living_ranked.csv")
print(col_df.head())


Processed data saved to Cost_of_living_ranked.csv
  state                     county   latitude   longitude    total_cost  \
0    AK  Hoonah-Angoon Census Area        NaN         NaN  24729.205274   
1    AR                    Johnson  36.132856  -94.165482  19146.670728   
2    CO                 Las Animas  38.066456 -103.222478  21664.262724   
3    CO                      Routt  38.134233  -85.466622  27986.883290   
4    GA                     Fannin        NaN         NaN  20829.709668   

   cost_of_living_ranking  
0                0.802815  
1                0.961861  
2                0.890135  
3                0.710005  
4                0.913911  


In [None]:
import pandas as pd
from shapely.geometry import Point

# Load cost of living ranked CSV (with latitude and longitude)
col_df = pd.read_csv("Cost_of_living_ranked.csv")

# Create geometry column from latitude and longitude
col_df["geometry"] = col_df.apply(
    lambda row: Point(row["longitude"], row["latitude"])
                if pd.notna(row["latitude"]) and pd.notna(row["longitude"]) else None,
    axis=1
)

# Drop rows where geometry is None (invalid or missing lat/lon)
col_df = col_df[col_df["geometry"].notnull()]

# Remove latitude and longitude columns now that geometry exists
col_df = col_df.drop(columns=["latitude", "longitude"])

print(col_df.head())

# Save updated dataframe
col_df.to_csv("Cost_of_living_ranked_2.csv", index=False)


  state      county    total_cost  cost_of_living_ranking  \
1    AR     Johnson  19146.670728                0.961861   
2    CO  Las Animas  21664.262724                0.890135   
3    CO       Routt  27986.883290                0.710005   
5    GA       Rabun  19667.994026                0.947008   
6    ID      Jerome  21749.163856                0.887716   

                          geometry  
1   POINT (-94.1654821 36.1328564)  
2  POINT (-103.2224779 38.0664563)  
3   POINT (-85.4666224 38.1342326)  
5   POINT (-83.3864536 34.9572143)  
6   POINT (-114.518808 42.7238458)  
