In [5]:
# set Java environment
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"



In [6]:
# Import statemants
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, min, max, count, round, lit, when
from pyspark.sql.types import IntegerType


In [7]:
try:
    spark.stop()
except:
    pass # Ignore if 'spark' is not defined
spark = SparkSession.builder.appName("LightRailData").getOrCreate()

sc = spark.sparkContext

In [8]:
# Upload data
from google.colab import files
uploaded = files.upload()

Saving geonames-postal-code.csv to geonames-postal-code.csv
Saving ValleyMetroRailStations_-7532295976615093627.csv to ValleyMetroRailStations_-7532295976615093627.csv


In [9]:
df = spark.read.csv("ValleyMetroRailStations_-7532295976615093627.csv", header=True, inferSchema=True)

In [10]:
df.printSchema()
df.show(5)

root
 |-- OBJECTID: integer (nullable = true)
 |-- StationId: integer (nullable = true)
 |-- StationName: string (nullable = true)
 |-- Jurisdiction: string (nullable = true)
 |-- StationStatus: string (nullable = true)
 |-- POINT_X: double (nullable = true)
 |-- POINT_Y: double (nullable = true)
 |-- Image: string (nullable = true)
 |-- Image2: string (nullable = true)
 |-- TVM: integer (nullable = true)
 |-- Bench: integer (nullable = true)
 |-- TrashCan: integer (nullable = true)
 |-- BikeRacks: string (nullable = true)
 |-- WaterFountain: string (nullable = true)
 |-- Restroom: string (nullable = true)
 |-- InfoDisplay: string (nullable = true)
 |-- Em_Call: string (nullable = true)
 |-- NextRide: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- ServiceType: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)

+--------+---------+--------------------+------------+-------------+------------+-----------+--------------------+

In [11]:
df_clean = df.select(
    col("OBJECTID"),
    col("StationName"),
    col("POINT_X"),
    col("POINT_Y"),
    col("Address"),
    col("x"),
    col("y"))

In [12]:
df_clean.printSchema()
df_clean.show(5)

root
 |-- OBJECTID: integer (nullable = true)
 |-- StationName: string (nullable = true)
 |-- POINT_X: double (nullable = true)
 |-- POINT_Y: double (nullable = true)
 |-- Address: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)

+--------+--------------------+------------+-----------+--------------------+-----------+-----------+
|OBJECTID|         StationName|     POINT_X|    POINT_Y|             Address|          x|          y|
+--------+--------------------+------------+-----------+--------------------+-----------+-----------+
|       1|   19th Ave / Dunlap|-112.0993888|33.56708951|1935 West Dunlap ...|643925.8366|934023.0151|
|       2|    Center / Main St|-111.8306604|33.41509754| 26 East Main Street|726247.5682|878572.8615|
|       3| Northern / 19th Ave|-112.0993595|33.55318964|7832 North 19th A...|644260.3402|928445.8602|
|       4| Glendale / 19th Ave|-112.0993289|33.53864324|6813 North 19th A...|644228.9987|923101.9688|
|       5|44t

In [13]:
# Save the updated DataFrame as a CSV file
output_csv_path_updated = "updated_lightRail.csv"
df_clean.write.mode("overwrite").csv(output_csv_path_updated, header=True)
print(f"\nUpdated records saved to '{output_csv_path_updated}'.")


Updated records saved to 'updated_lightRail.csv'.


In [14]:
df_2 = spark.read.csv("geonames-postal-code.csv", header=True, inferSchema=True)

In [15]:
df_2.printSchema()
df_2.show(5)

root
 |-- country code: string (nullable = true)
 |-- postal code: integer (nullable = true)
 |-- place name: string (nullable = true)
 |-- admin name1: string (nullable = true)
 |-- admin code1: string (nullable = true)
 |-- admin name2: string (nullable = true)
 |-- admin code2: integer (nullable = true)
 |-- admin name3: string (nullable = true)
 |-- admin code3: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- accuracy: integer (nullable = true)
 |-- coordinates: string (nullable = true)

+------------+-----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+--------+---------+--------+------------------+
|country code|postal code|place name|admin name1|admin code1|admin name2|admin code2|admin name3|admin code3|latitude|longitude|accuracy|       coordinates|
+------------+-----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+--------+--------

In [16]:
df_clean2 = df_2.select(
    col("postal code"),
    col("latitude"),
    col("longitude"),
    col("accuracy"))

In [17]:
df_clean2.show()

+-----------+--------+---------+--------+
|postal code|latitude|longitude|accuracy|
+-----------+--------+---------+--------+
|      85037| 33.4913|-112.2468|       4|
|      85038| 33.4484| -112.074|       4|
|      85258| 33.5647|-111.8931|       4|
|      85326| 33.3703|-112.5838|       4|
|      85015| 33.5082|-112.1011|       4|
|      85039| 33.4484| -112.074|       4|
|      85063| 33.4484| -112.074|       4|
|      85080| 33.4484| -112.074|       4|
|      85203|  33.437|-111.8057|       4|
|      85204| 33.3992|-111.7896|       4|
|      85327| 33.8333|-111.9508|       4|
|      85354| 33.4228|-112.9528|       4|
|      85028| 33.5851|-112.0087|       4|
|      85267| 33.5092| -111.899|       4|
|      85013| 33.5085|-112.0827|       4|
|      85050| 33.6863|-111.9963|       4|
|      85226| 33.3092|-111.9198|       4|
|      85251| 33.4936|-111.9167|       4|
|      85255| 33.6968|-111.8892|       4|
|      85285| 33.4148|-111.9093|       4|
+-----------+--------+---------+--

In [18]:
# Save the updated DataFrame as a CSV file
output_csv_path_updated = "updated_ZIp.csv"
df_clean2.write.mode("overwrite").csv(output_csv_path_updated, header=True)
print(f"\nUpdated records saved to '{output_csv_path_updated}'.")


Updated records saved to 'updated_ZIp.csv'.


In [24]:
uploaded = files.upload()

In [20]:
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, udf, lit, row_number, when
from pyspark.sql.types import DoubleType

def haversine(lon1, lat1, lon2, lat2):
    """Calculates the distance (miles) between two Lon/Lat points."""
    R = 3958.8  # Radius of Earth in miles
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
    c = 2 * np.arcsin(np.sqrt(a))
    distance = R * c
    return float(distance) # Ensure the return type is a native float

In [21]:
# Register the UDF with Spark
haversine_udf = udf(haversine, DoubleType())
spark.udf.register("haversine_udf", haversine, DoubleType())

In [22]:
zip_path = "updated_ZIp.csv"
zip_df = spark.read.csv(zip_path, header=True, inferSchema=True) \
    .select(
        col("postal code").alias("ZIP"),
        col("latitude").alias("ZIP_Lat"),
        col("longitude").alias("ZIP_Lon")
    )

In [25]:
# Load Light Rail Stations
rail_path = "updated_lightRail.csv"
rail_df = spark.read.csv(rail_path, header=True, inferSchema=True) \
    .select(
        col("OBJECTID").alias("StationID"),
        col("StationName"),
        col("POINT_X").alias("Station_Lon"),
        col("POINT_Y").alias("Station_Lat")
    )

In [26]:
# Cross Join combines every row from zip_df with every row from rail_df
cross_df = zip_df.crossJoin(rail_df)

# Calculate the distance in miles using the Haversine UDF
distance_df = cross_df.withColumn(
    "Distance_Miles",
    haversine_udf(
        col("ZIP_Lon"), col("ZIP_Lat"),
        col("Station_Lon"), col("Station_Lat")
    )
)

In [27]:
# Define a window partitioned by ZIP, ordered by distance
window_spec = Window.partitionBy("ZIP").orderBy(col("Distance_Miles").asc())

# Rank the stations by distance for each ZIP code
ranked_df = distance_df.withColumn("rank", row_number().over(window_spec))

# Filter to keep only the nearest station (rank = 1)
nearest_station_df = ranked_df.filter(col("rank") == 1)

In [28]:
ADJACENCY_THRESHOLD_MILES = 2.5

final_proximity_df = nearest_station_df.select(
    "ZIP",
    col("Distance_Miles").alias("Rail_Distance_Miles"),
    "StationName",
    # Create the binary Adjacency Indicator
    when(col("Distance_Miles") <= ADJACENCY_THRESHOLD_MILES, 1)
        .otherwise(0)
        .alias("Adjacency_Indicator")
)

In [29]:
# Show the resulting feature table
print("\n--- Final Proximity Features Schema ---")
final_proximity_df.printSchema()
print("\n--- Final Proximity Features Head (Nearest Station per ZIP) ---")
final_proximity_df.show(5, truncate=False)


--- Final Proximity Features Schema ---
root
 |-- ZIP: integer (nullable = true)
 |-- Rail_Distance_Miles: double (nullable = true)
 |-- StationName: string (nullable = true)
 |-- Adjacency_Indicator: integer (nullable = false)


--- Final Proximity Features Head (Nearest Station per ZIP) ---
+-----+--------------------+--------------------------------+-------------------+
|ZIP  |Rail_Distance_Miles |StationName                     |Adjacency_Indicator|
+-----+--------------------+--------------------------------+-------------------+
|85001|0.023546974696654167|Downtown Phx Hub / Washington St|1                  |
|85002|0.023546974696654167|Downtown Phx Hub / Washington St|1                  |
|85003|0.14479125177917848 |Van Buren / 1st Ave             |1                  |
|85004|0.4034337935478666  |Van Buren / Central Ave         |1                  |
|85005|0.023546974696654167|Downtown Phx Hub / Washington St|1                  |
+-----+--------------------+---------------------

In [30]:
final_proximity_df.write.csv("zip_rail_proximity_features.csv", header=True, mode="overwrite")