In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

spark = SparkSession.builder.appName('unsupervised').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '8g'), ('spark.app.name', 'Spark Updated Conf'),\
                                        ('spark.executor.cores', '8'), ('spark.cores.max', '8'), ('spark.driver.memory','8g')])

spark.sparkContext.getConf().getAll()

[('spark.stage.maxConsecutiveAttempts', '10'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.submit.pyFiles',
  '/root/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp_2.12-4.4.0.jar,/root/.ivy2/jars/graphframes_graphframes-0.8.2-spark3.1-s_2.12.jar,/root/.ivy2/jars/com.typesafe_config-1.4.2.jar,/root/.ivy2/jars/org.rocksdb_rocksdbjni-6.29.5.jar,/root/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.828.jar,/root/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,/root/.ivy2/jars/com.google.cloud_google-cloud-storage-2.16.0.jar,/root/.ivy2/jars/com.navigamez_greex-1.0.jar,/root/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.12-0.4.4.jar,/root/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,/root/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,/root/.ivy2/jars/com.google.guava_guava-31.1-jre.jar,/root/.ivy2/jars/com.google.guava_failureaccess-1.0.1.jar,/root/.ivy2/jars/com.google.guava_listenablefuture-9999.0-empty-to-avoid-conflict-

In [None]:
spark.stop()

In [3]:
# read in rideshare data for all years, concatenate, create appropriate partitioning
# we are dropping 2020 because covid will affect the performance of our model

df_2018 = spark.read.csv("gs://msca-bdp-student-gcs/bdp-rideshare-project/rideshare/processed_data/rides_2018.csv", inferSchema=True, header=True)
df_2019 = spark.read.csv("gs://msca-bdp-student-gcs/bdp-rideshare-project/rideshare/processed_data/rides_2019.csv", inferSchema=True, header=True)
df_2021 = spark.read.csv("gs://msca-bdp-student-gcs/bdp-rideshare-project/rideshare/processed_data/rides_2021.csv", inferSchema=True, header=True)
df_2022 = spark.read.csv("gs://msca-bdp-student-gcs/bdp-rideshare-project/rideshare/processed_data/rides_2022.csv", inferSchema=True, header=True)
df_2023 = spark.read.csv("gs://msca-bdp-student-gcs/bdp-rideshare-project/rideshare/processed_data/rides_2023.csv", inferSchema=True, header=True)

# dropping new columns in 2023
df_2023 = df_2023.drop('Shared Trip Match','Percent Time Chicago','Percent Distance Chicago')

df_all = df_2018.union(df_2019).union(df_2021).union(df_2022).union(df_2023)
df_all.show(5)

                                                                                

+--------------------+-------------------+-------------------+-------+-----+------------+-------------+-----------+------------+----+---+-----+-------------+--------------+-------------+--------------+-----+------------+----+---+
|                  ID|    start_timestamp|      end_timestamp|seconds|miles|pickup_tract|dropoff_tract|pickup_area|dropoff_area|Fare|Tip|total|   pickup_lat|    pickup_lon|  dropoff_lat|   dropoff_lon|month|day_of_month|hour|day|
+--------------------+-------------------+-------------------+-------+-----+------------+-------------+-----------+------------+----+---+-----+-------------+--------------+-------------+--------------+-----+------------+----+---+
|625e77ae6e0ff7191...|2018-11-06 19:00:00|2018-11-06 19:15:00|   1142|  5.8| 17031063400|  17031010400|          6|           1|12.5|  0| 15.0|41.9346591566|-87.6467297286| 42.004764559| -87.659122427|   11|           6|  19|  3|
|62945fdb2e70957f0...|2018-11-06 19:00:00|2018-11-06 19:00:00|    341|  1.2| 170

In [5]:
#display number of records by partition
def displaypartitions(df):
    #number of records by partition
    num = df.rdd.getNumPartitions()
    print("Partitions:", num)
    df.withColumn("partitionId", F.spark_partition_id())\
        .groupBy("partitionId")\
        .count()\
        .orderBy(F.asc("count"))\
        .show(num)

df_all.rdd.getNumPartitions()
displaypartitions(df_all)

Partitions: 534




+-----------+------+
|partitionId| count|
+-----------+------+
|         33|152646|
|        233|328837|
|        232|328975|
|        231|329131|
|        230|329163|
|        229|329209|
|        227|329245|
|        225|329263|
|        228|329263|
|        224|329311|
|        226|329315|
|        222|329332|
|        223|329344|
|        221|329373|
|        218|329389|
|        219|329390|
|        217|329399|
|        215|329410|
|        216|329410|
|        214|329418|
|        220|329427|
|        213|329428|
|        210|329461|
|        212|329481|
|        211|329505|
|        207|329507|
|        208|329513|
|        209|329519|
|        206|329523|
|        204|329533|
|        203|329555|
|        205|329574|
|        201|329587|
|        202|329591|
|        198|329607|
|        200|329623|
|        196|329624|
|        199|329630|
|        197|329633|
|        195|329646|
|        192|329654|
|        194|329673|
|        193|329678|
|        184|329704|
|        191|

                                                                                

In [4]:
# repartitioning to 600 partitions
df_all = df_all.repartition(600).cache()
#displaypartitions(df_all)

**Feature Engineering**


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp

df_all = df_all.withColumn("start_timestamp", unix_timestamp("start_timestamp"))
df_all = df_all.withColumn("end_timestamp", unix_timestamp("end_timestamp"))

# Calculate trip duration in seconds
df_all = df_all.withColumn("trip_duration", col("end_timestamp") - col("start_timestamp"))

# Convert seconds to minutes
df_all = df_all.withColumn("trip_duration", col("trip_duration") / 60)

df_all.select('trip_duration').show()


[Stage 17:>                                                         (0 + 1) / 1]

+-------------+
|trip_duration|
+-------------+
|         30.0|
|         30.0|
|         30.0|
|          0.0|
|         30.0|
|         15.0|
|         15.0|
|         15.0|
|         15.0|
|         15.0|
|         15.0|
|         15.0|
|         60.0|
|         15.0|
|         45.0|
|         15.0|
|          0.0|
|         30.0|
|         15.0|
|         45.0|
+-------------+
only showing top 20 rows



                                                                                

In [None]:
df_2018 = df_2018.withColumn("start_timestamp", unix_timestamp("start_timestamp"))
df_2018 = df_2018.withColumn("end_timestamp", unix_timestamp("end_timestamp"))

# Calculate trip duration in seconds
df_2018 = df_2018.withColumn("trip_duration", col("end_timestamp") - col("start_timestamp"))

# Convert seconds to minutes
df_2018 = df_2018.withColumn("trip_duration", col("trip_duration") / 60)

df_2018.select('trip_duration').show()

23/11/24 21:05:28 WARN org.apache.spark.storage.BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1069)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1013)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1013)
	at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1151)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:88)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sc

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from sklearn.cluster import DBSCAN
from sklearn import metrics
from pyspark.sql.functions import col, radians, acos, sin, cos, lit
import time
from pyspark.ml.feature import VectorAssembler


In [7]:
df_all.printSchema()

root
 |-- ID: string (nullable = true)
 |-- start_timestamp: long (nullable = true)
 |-- end_timestamp: long (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- miles: double (nullable = true)
 |-- pickup_tract: long (nullable = true)
 |-- dropoff_tract: long (nullable = true)
 |-- pickup_area: integer (nullable = true)
 |-- dropoff_area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tip: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- pickup_lat: double (nullable = true)
 |-- pickup_lon: double (nullable = true)
 |-- dropoff_lat: double (nullable = true)
 |-- dropoff_lon: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- trip_duration: double (nullable = true)



In [8]:
pdf_all = df_2018.select('ID', 'pickup_lat','pickup_lon', 'dropoff_lat', 'dropoff_lon', 'month','day_of_month', 'day', 'trip_duration').toPandas()


23/11/24 21:00:17 WARN org.apache.spark.storage.BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1069)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1013)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1013)
	at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1151)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:88)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sc

KeyboardInterrupt: 

In [25]:
# Fit DBSCAN model for pickup location using scikit-learn
epsilon_pickup = 1.5 / 6371  # Earth's radius in kilometers
dbscan_pickup = DBSCAN(eps=epsilon_pickup, min_samples=5)
pickup_features = np.array(pdf_all[["pickup_lon", "pickup_lat"]])
pdf_all["pickup_prediction"] = dbscan_pickup.fit_predict(pickup_features)

# Filter out noise points for pickup location (prediction == -1)
pdf_all = pdf_all[pdf_all["pickup_prediction"] != -1]


TypeError: __init__() got an unexpected keyword argument 'minPoints'

In [None]:
assembler = VectorAssembler(inputCols=["pickup_lon", "pickup_lat"], outputCol="pickup_features")
df_all = assembler.transform(df_all)

# Fit DBSCAN model for pickup location
epsilon = 1.5 / kms_per_rad
dbscan = DBSCAN(eps=epsilon, minPoints=1, inputCol="pickup_features", outputCol="pickup_prediction")
model_pickup = dbscan.fit(df_all)
df_all = model_pickup.transform(df_all)

# Filter out noise points for pickup location (prediction == -1)
df_all = df_all.filter(col("pickup_prediction") != -1)

# Assemble features into a vector for dropoff location
assembler = VectorAssembler(inputCols=["dropoff_lon", "dropoff_lat"], outputCol="dropoff_features")
df_all = assembler.transform(df_all)

# Fit DBSCAN model for dropoff location
model_dropoff = dbscan.fit(df_all)
df_all = model_dropoff.transform(df_all)

# Filter out noise points for dropoff location (prediction == -1)
df_all = df_all.filter(col("dropoff_prediction") != -1)

# Collect cluster labels and centroids for pickup location
cluster_labels_pickup = df_all.select("pickup_prediction").distinct().rdd.flatMap(lambda x: x).collect()
clusters_pickup = [df_all.filter(col("pickup_prediction") == label).select("pickup_lon", "pickup_lat").collect() for label in cluster_labels_pickup]

# Collect cluster labels and centroids for dropoff location
cluster_labels_dropoff = df_all.select("dropoff_prediction").distinct().rdd.flatMap(lambda x: x).collect()
clusters_dropoff = [df_all.filter(col("dropoff_prediction") == label).select("dropoff_lon", "dropoff_lat").collect() for label in cluster_labels_dropoff]

# Find the point in each cluster that is closest to its centroid for pickup location
centermost_points_pickup = [get_centermost_point(cluster) for cluster in clusters_pickup]

# Find the point in each cluster that is closest to its centroid for dropoff location
centermost_points_dropoff = [get_centermost_point(cluster) for cluster in clusters_dropoff]

# Unzip the list of centermost points (lat, lon) tuples into separate lat and lon lists for pickup location
lats_pickup, lons_pickup = zip(*centermost_points_pickup)
rep_points_pickup = spark.createDataFrame(list(zip(lons_pickup, lats_pickup)), ["pickup_lon", "pickup_lat"])

# Unzip the list of centermost points (lat, lon) tuples into separate lat and lon lists for dropoff location
lats_dropoff, lons_dropoff = zip(*centermost_points_dropoff)
rep_points_dropoff = spark.createDataFrame(list(zip(lons_dropoff, lats_dropoff)), ["dropoff_lon", "dropoff_lat"])

# Pull rows from the original data set where lat/lon match the lat/lon of each row of representative points for pickup location
rs_pickup = rep_points_pickup.join(df_all, ["pickup_lon", "pickup_lat"])

# Pull rows from the original data set where lat/lon match the lat/lon of each row of representative points for dropoff location
rs_dropoff = rep_points_dropoff.join(df_all, ["dropoff_lon", "dropoff_lat"])

# All done, print outcome
message_pickup = 'Clustered {:,} pickup points down to {:,} points, for {:.2f}% compression in {:,.2f} seconds.'
message_dropoff = 'Clustered {:,} dropoff points down to {:,} points, for {:.2f}% compression in {:,.2f} seconds.'

print(message_pickup.format(df_all.count(), rs_pickup.count(), 100*(1 - float(rs_pickup.count()) / df_all.count()), time.time()-start_time))
print(message_dropoff.format(df_all.count(), rs_dropoff.count(), 100*(1 - float(rs_dropoff.count()) / df_all.count()), time.time()-start_time))