In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Trips_Analysis").getOrCreate()

# You can read all CSVs at once using a wildcard

df = spark.read.csv("/content/drive/MyDrive/csv_files/Taxi Datset.csv", header=True, inferSchema=True)

raw_df = spark.read.csv("/content/drive/MyDrive/csv_files/Taxi Datset.csv", header=True, inferSchema=True)

df.show(5)
df.printSchema()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1|01/01/2020 12:28:...| 01/01/2020 12:33:...|              1|          1.2|         1|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.47|         0.0|                  0.3

In [5]:
''' file = "TaxiDataset.csv"
data_nodes = ["DataNode_1", "DataNode_2", "DataNode_3",
              "DataNode_4", "DataNode_5", "DataNode_6"]

replication_factor = 3

# pick 3 unique nodes:
import random
replicas = random.sample(data_nodes, replication_factor)

print("HDFS placement:")
print(f"{file} -> {replicas}") '''

' file = "TaxiDataset.csv"\ndata_nodes = ["DataNode_1", "DataNode_2", "DataNode_3",\n              "DataNode_4", "DataNode_5", "DataNode_6"]\n\nreplication_factor = 3\n\n# pick 3 unique nodes:\nimport random\nreplicas = random.sample(data_nodes, replication_factor)\n\nprint("HDFS placement:")\nprint(f"{file} -> {replicas}") '

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import sum as spark_sum

In [7]:
df = df.withColumn("pickup_time", to_timestamp(col("tpep_pickup_datetime"), "MM/dd/yyyy hh:mm:ss a"))

df = df.withColumn("dropoff_time", to_timestamp(col("tpep_dropoff_datetime"), "MM/dd/yyyy hh:mm:ss a"))

In [8]:
df.select("tpep_pickup_datetime", "pickup_time").show(10, truncate=False) #shows the full content of each column.

+----------------------+-------------------+
|tpep_pickup_datetime  |pickup_time        |
+----------------------+-------------------+
|01/01/2020 12:28:15 AM|2020-01-01 00:28:15|
|01/01/2020 12:35:39 AM|2020-01-01 00:35:39|
|01/01/2020 12:47:41 AM|2020-01-01 00:47:41|
|01/01/2020 12:55:23 AM|2020-01-01 00:55:23|
|01/01/2020 12:01:58 AM|2020-01-01 00:01:58|
|01/01/2020 12:09:44 AM|2020-01-01 00:09:44|
|01/01/2020 12:39:25 AM|2020-01-01 00:39:25|
|12/18/2019 03:27:49 PM|2019-12-18 15:27:49|
|12/18/2019 03:30:35 PM|2019-12-18 15:30:35|
|01/01/2020 12:29:01 AM|2020-01-01 00:29:01|
+----------------------+-------------------+
only showing top 10 rows



In [9]:
df = df.withColumn("year", year("pickup_time"))
df = df.filter(col("year").isin(2016, 2017, 2018, 2019, 2020))

df = df.withColumn("trip_duration_minutes", (col("dropoff_time").cast("long") - col("pickup_time").cast("long")) / 60)

df = df.withColumn("total_fare", col("fare_amount") + col("tip_amount"))

df = df.withColumn(
    "extra_charges",
    col("tolls_amount") +
    col("congestion_surcharge") +
    col("mta_tax") +
    col("extra") +
    col("improvement_surcharge")
)

df = df.withColumn("month", month("pickup_time"))
df = df.withColumn("day_of_week", dayofweek("pickup_time"))
df = df.withColumn("hour", hour("pickup_time"))



In [10]:
df.show(1)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-------------------+-------------------+----+---------------------+----------+-------------+-----+-----------+----+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|        pickup_time|       dropoff_time|year|trip_duration_minutes|total_fare|extra_charges|month|day_of_week|hour|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+---------------

In [11]:
total_rows = df.count()

null_percent = df.select([
    (spark_sum(col(c).isNull().cast("int")) / total_rows * 100).alias(c)
    for c in df.columns
])
null_percent.show()


+------------------+--------------------+---------------------+------------------+-------------+------------------+------------------+------------+------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------+----+---------------------+----------+-------------+-----+-----------+----+
|          VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|   passenger_count|trip_distance|        RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|      payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_time|dropoff_time|year|trip_duration_minutes|total_fare|extra_charges|month|day_of_week|hour|
+------------------+--------------------+---------------------+------------------+-------------+------------------+------------------+------------+------------+------------------+-----------+-----+-------+----------+------------

In [12]:
dropped_columns = [
    "RatecodeID",             # not needed
    "store_and_fwd_flag",     # not needed
    "tpep_pickup_datetime",   # original string datetime, replaced by trip_duration_minutes
    "tpep_dropoff_datetime",  # original string datetime, replaced by trip_duration_minutes
    "pickup_time",            # replaced by trip_duration_minutes, year, month, day_of_week
    "dropoff_time",           # replaced by trip_duration_minutes, year, month, day_of_week
    "fare_amount",            # merged into total_fare
    "tip_amount",             # merged into total_fare
    "tolls_amount",           # merged into extra_charges
    "congestion_surcharge",   # merged into extra_charges
    "mta_tax",                # merged into extra_charges
    "extra",                  # merged into extra_charges
    "improvement_surcharge"   # merged into extra_charges
]
df = df.drop(*dropped_columns)
df.show(1)

+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+
|VendorID|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|total_amount|year|trip_duration_minutes|total_fare|extra_charges|month|day_of_week|hour|
+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+
|       1|              1|          1.2|         238|         239|           1|       11.27|2020|                  4.8|      7.47|          6.3|    1|          4|   0|
+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+
only showing top 1 row



In [13]:
numeric_cols = ["trip_distance", "total_fare", "extra_charges","trip_duration_minutes", "passenger_count"] #[4] new

#removing outliers
for c in numeric_cols:
    q1, q3 = df.approxQuantile(c, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    df = df.filter((col(c) >= lower_bound) & (col(c) <= upper_bound))

In [14]:
df = df.filter(col("trip_duration_minutes").isNotNull() & (col("trip_duration_minutes") > 0))

In [15]:
#handling nulls in the numerical columns.
for c in numeric_cols:
    median_value = df.approxQuantile(c, [0.5], 0.01)[0]
    df = df.fillna({c: median_value})

In [16]:
#handling nulls in the categorical columns.
categorical_cols = ["PULocationID", "DOLocationID", "payment_type", "VendorID"] # [2], [3] new

for c in categorical_cols:
    mode_value = df.groupBy(c).count().orderBy(col("count").desc()).first()[0]
    df = df.fillna({c: mode_value})

In [17]:
total_rows_after = df.count() #new
null_percent_after = df.select([
    (spark_sum(col(c).isNull().cast("int")) / total_rows * 100).alias(c)
    for c in df.columns
])
null_percent_after.show()

+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+
|VendorID|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|total_amount|year|trip_duration_minutes|total_fare|extra_charges|month|day_of_week|hour|
+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+
|     0.0|            0.0|          0.0|         0.0|         0.0|         0.0|         0.0| 0.0|                  0.0|       0.0|          0.0|  0.0|        0.0| 0.0|
+--------+---------------+-------------+------------+------------+------------+------------+----+---------------------+----------+-------------+-----+-----------+----+



In [19]:
#Spark SQL Aggregations
df.createOrReplaceTempView("trips")

spark.sql("SELECT PULocationID, COUNT(*) AS total_trips FROM trips GROUP BY PULocationID ORDER BY total_trips DESC").show(10)

spark.sql("SELECT year, ROUND(AVG(total_fare),2) AS avg_total_fare FROM trips GROUP BY year ORDER BY year").show()


+------------+-----------+
|PULocationID|total_trips|
+------------+-----------+
|         237|     464327|
|         161|     432344|
|         236|     428958|
|         162|     361393|
|         186|     349178|
|         230|     334989|
|         142|     306790|
|         234|     300971|
|         170|     300843|
|          48|     294838|
+------------+-----------+
only showing top 10 rows

+----+--------------+
|year|avg_total_fare|
+----+--------------+
|2019|          9.72|
|2020|         10.54|
+----+--------------+



In [20]:
# Predictive Analytics
# create the df containing the features & actual data (lable) for model training
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["PULocationID","DOLocationID","trip_distance","trip_duration_minutes","total_fare","extra_charges"],
    outputCol="features"
)
ML_df = assembler.transform(df)\
    .select("features", col("total_amount").alias("label"))

train, test = ML_df.randomSplit([0.8, 0.2], seed=42)

train.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,1.0,0.0,0.03...|  3.3|
|[1.0,1.0,0.0,0.05...| 21.3|
|[1.0,1.0,0.0,0.21...| 20.3|
|[1.0,1.0,0.0,0.28...| 12.3|
|[1.0,1.0,0.0,0.31...| 10.3|
+--------------------+-----+
only showing top 5 rows



In [None]:
# create a regression model , training it and predecting using the df created above
from pyspark.ml.regression import LinearRegression
reg = LinearRegression(featuresCol="features", labelCol="label")

Reg_model = reg.fit(train)
prediction = Reg_model.transform(test)

prediction.show(10)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(prediction)
print("RMSE =", rmse)


In [None]:
# evaluation of Regression model
evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="r2"
)

r2 = evaluator.evaluate(prediction)
print(f"R2 score: {r2}")

In [None]:
df.groupBy("passenger_count") \
  .agg(
      avg("total_fare").alias("avg_fare"),
      avg("total_amount").alias("avg_total")
  ).orderBy("passenger_count") \
  .show()

In [None]:
df.groupBy("VendorID") \
  .agg(
      avg("total_fare").alias("avg_fare"),
      avg("total_amount").alias("avg_total")
  ) \
  .orderBy("VendorID") \
  .show()


In [None]:
df.groupBy("payment_type") \
  .agg(
      avg("total_fare").alias("avg_fare"),
      avg("total_amount").alias("avg_total")
  ) \
  .orderBy("payment_type") \
  .show()

In [None]:
df.groupBy("hour").count().orderBy("hour").show()

In [None]:
df.groupBy("day_of_week").count().orderBy("day_of_week").show()


In [None]:
df.groupBy("month").count().orderBy("month").show()

In [None]:
df_bins = df.withColumn(
    "duration_bin",
    when(col("trip_duration_minutes") < 5, "<5 min")
    .when(col("trip_duration_minutes") < 10, "5-10 min")
    .when(col("trip_duration_minutes") < 20, "10-20 min")
    .otherwise(">20 min")
)
df_bins.groupBy("duration_bin") \
    .agg(
        avg(col("trip_distance") / col("trip_duration_minutes")).alias("avg_distance_per_minute")
    ) \
    .orderBy("duration_bin") \
    .show()

In [None]:
from pyspark.sql.functions import max, min

df.groupBy("payment_type") \
  .agg(
      max("total_fare").alias("max_fare"),
      min("total_fare").alias("min_fare")
  ) \
  .orderBy("payment_type") \
  .show()

In [None]:
df.select(
    max("trip_distance").alias("max_distance"),
    min("trip_distance").alias("min_distance")
).show()

In [None]:
df_filtered = df.filter(col("trip_distance") >= 0)

df_filtered.select(
    max("trip_distance").alias("max_distance"),
    min("trip_distance").alias("min_distance")
).show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

df_pd = df.toPandas("trips").copy()
plt.figure(df_pd).sort_values("avg_fare").plot(kind="bar", x="VendorID", y="avg_fare", edgecolor="black")
plt.title("Average Fare by Vendor (from JSON docs)")
plt.xlabel("Vendor"); plt.ylabel("Average Fare")
plt.show()