# Big data tool: PySpark(Apache Spark) to Analyze the Dataset

In [3]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.




[notice] A new release of pip available: 22.2.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour, dayofmonth, month, dayofweek, radians, sin, cos, sqrt, atan2

In [2]:
# Set Spark to Bind to Specific IP/Port to reduce firewall prompts
import os
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'

In [3]:

# Setting Python interpreter for both PySpark driver and worker to match (3.11)
os.environ["PYSPARK_PYTHON"] = "C:/Program Files/ArcGIS/Pro/bin/Python/envs/arcgispro-py3/python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:/Program Files/ArcGIS/Pro/bin/Python/envs/arcgispro-py3/python.exe"

In [4]:
# 1. Starting the session 
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("UberFareAnalysis") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

In [5]:
# 2. Load CSV
df = spark.read.csv("uber.csv", header=True, inferSchema=True)

In [6]:
# 3. Data Cleaning
df = df.dropna()
df = df.dropDuplicates()
df = df.filter(col("fare_amount") > 0)
df = df.filter((col("passenger_count") >= 1) & (col("passenger_count") <= 6))

In [7]:
# NYC boundary filtering
df = df.filter(
    (col("pickup_longitude").between(-75, -72)) &
    (col("pickup_latitude").between(40, 42)) &
    (col("dropoff_longitude").between(-75, -72)) &
    (col("dropoff_latitude").between(40, 42))
)

In [8]:
# 4. Convert to datetime and extract parts
df = df.withColumn("pickup_datetime", to_timestamp("pickup_datetime"))
df = df.dropna(subset=["pickup_datetime"])
df = df.withColumn("hour", hour("pickup_datetime")) \
       .withColumn("day", dayofmonth("pickup_datetime")) \
       .withColumn("month", month("pickup_datetime")) \
       .withColumn("weekday", dayofweek("pickup_datetime"))

In [9]:
# 5. Haversine Distance
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math

def haversine(lat1, lon1, lat2, lon2):
    R = 6371
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)
    a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2
    return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

haversine_udf = udf(haversine, DoubleType())

df = df.withColumn("distance_km", haversine_udf(
    col("pickup_latitude"), col("pickup_longitude"),
    col("dropoff_latitude"), col("dropoff_longitude")
))

df = df.filter(col("distance_km") > 0)

In [10]:
# 6. Features and Label
features = ["passenger_count", "distance_km", "hour", "day", "month"]
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)

In [21]:
import sys
print(sys.executable)

C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe


In [11]:
df.show(5)
df.printSchema()
print("Row count:", df.count())

+--------+-------------------+-----------+-------------------+----------------+---------------+------------------+------------------+---------------+----+---+-----+-------+------------------+--------------------+
|     _c0|                key|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude| dropoff_longitude|  dropoff_latitude|passenger_count|hour|day|month|weekday|       distance_km|            features|
+--------+-------------------+-----------+-------------------+----------------+---------------+------------------+------------------+---------------+----+---+-----+-------+------------------+--------------------+
|42367206|2014-09-23 00:15:52|       15.0|2014-09-23 01:15:52|      -73.994126|      40.750917|        -73.949428|40.781335999999996|              1|   1| 23|    9|      3| 5.060737421252564|[1.0,5.0607374212...|
|51756123|2014-04-19 02:10:00|        9.0|2014-04-19 03:10:00|      -73.997222|      40.763667|-74.01061700000001|         40.730022|              3

## Training Models

In [12]:
# 7. Train-test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [14]:
# 8. Train Models
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

models = {
    "Linear Regression": LinearRegression(featuresCol="features", labelCol="fare_amount"),
    "Decision Tree": DecisionTreeRegressor(featuresCol="features", labelCol="fare_amount", maxDepth=10),
    "Random Forest": RandomForestRegressor(featuresCol="features", labelCol="fare_amount", numTrees=100, maxDepth=10)
}

results = {}
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")

for name, model in models.items():
    fitted_model = model.fit(train_df)
    predictions = fitted_model.transform(test_df)
    mae = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mae").evaluate(predictions)
    rmse = evaluator.evaluate(predictions)
    r2 = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2").evaluate(predictions)
    results[name] = (mae, rmse, r2)
# 9. Display Results
print("Model Performance (MAE, RMSE, R2):\n")
for name, (mae, rmse, r2) in results.items():
    print(f"{name}: MAE={mae:.2f}, RMSE={rmse:.2f}, R2={r2:.3f}")



Model Performance (MAE, RMSE, R2):

Linear Regression: MAE=2.42, RMSE=5.51, R2=0.678
Decision Tree: MAE=2.45, RMSE=5.14, R2=0.721
Random Forest: MAE=2.42, RMSE=4.93, R2=0.743
