In [2]:
# If on GCP Workbench install Java
# I just opened a terminal and sudo apt-get install default-jdk

In [3]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [4]:
"""
Loading Pandas to validate some checks in sample data, to optimize time.
"""

from divvy_bike_share_data_analysis.data_loader import load_dataset_to_local_fs
from divvy_bike_share_data_analysis.utils_pyspark import load_data


import glob

# Collecting data to local directory
DATA_COLLECTION_DIR: str = '../data_collection/'

# This is commented out, as I only collect the csvs in the very first run.
if not glob.glob(os.path.join(DATA_COLLECTION_DIR, '*.csv')):
    print('collecting data to local fs')
    load_dataset_to_local_fs(DATA_COLLECTION_DIR, [2020, 2021, 2022, 2023])

# creating pyspark session and pre-processing data
divvy_df = load_data(DATA_COLLECTION_DIR)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/02 18:56:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
divvy_df.show(5)

                                                                                

+----------------+-------------+-------------------+-------------------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_name|start_station_id|end_station_name|end_station_id|start_lat|start_lng|end_lat|end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+------------------+----------------+----------------+--------------+---------+---------+-------+-------+-------------+
|         ride_id|rideable_type|               NULL|               NULL|start_station_name|start_station_id|end_station_name|end_station_id|     NULL|     NULL|   NULL|   NULL|member_casual|
|550CF7EFEAE0C618|electric_bike|2022-08-07 21:34:15|2022-08-07 21:41:46|              NULL|            NULL|            NULL|          NULL|    41.93|   -87.69|  41.94| -87.72|       casual|
|DAD198F405F9C5F5|electric_bike|2022-08-08 14

In [6]:
"""
Once I split the loaded data into a sample size, I can use pandas to validate some checks, and save time as Spark is slow locally. 
"""

sampled_df = divvy_df.sample(withReplacement=False, fraction=0.01)
# sampled_df.columns
divvy_df_pandas = sampled_df.toPandas()

                                                                                

In [7]:
from divvy_bike_share_data_analysis import bike_stations
bike_stations = bike_stations.get_unique_bike_stations_ids(sampled_df)



+----------------+--------------------+--------------+--------------------+
|start_station_id|  start_station_name|end_station_id|    end_station_name|
+----------------+--------------------+--------------+--------------------+
|           13389|Clarendon Ave & J...|         13389|Clarendon Ave & J...|
|           13154|Sheffield Ave & K...|         13154|Sheffield Ave & K...|
|             291|Wells St & Evergr...|           291|Wells St & Evergr...|
|             344|Ravenswood Ave & ...|           344|Ravenswood Ave & ...|
|           20102| Loomis St & 89th St|         20102| Loomis St & 89th St|
|             536|Kostner Ave & Lak...|           536|Kostner Ave & Lak...|
|           20214| Avenue O & 134th St|         20214| Avenue O & 134th St|
|             464|Western Ave & Ard...|           464|Western Ave & Ard...|
|    KA1503000055|Halsted & 63rd - ...|  KA1503000055|Halsted & 63rd - ...|
|           13277|Broadway & Belmon...|         13277|Broadway & Belmon...|
|    KA15030

                                                                                

In [9]:
# Pandas approach
grouped_stations_by_matching_id_pd = divvy_df_pandas[
    ['start_station_id', 'start_station_name', 'end_station_id', 'end_station_name']].drop_duplicates().dropna()
conflicting_stations = grouped_stations_by_matching_id_pd[
    grouped_stations_by_matching_id_pd['start_station_name'] != grouped_stations_by_matching_id_pd['end_station_name']]

bike_stations_pd = grouped_stations_by_matching_id_pd[["start_station_id", "start_station_name"]].rename(
    columns={"start_station_id": "station_id", "start_station_name": "station_name"}).drop_duplicates()

# At this point we know that trips without Ids, are not relevant for our analysis. So we can drop them
sampled_df = sampled_df.dropna(subset=['start_station_id', 'end_station_id'])

In [13]:
# from divvy_bike_share_data_analysis.bike_stations import categorize_time_of_day
from pyspark.sql.functions import udf, hour
from pyspark.sql.types import StringType

# Adding a categorical field to determine if the station is a start or end station
# Convert to a UDF Function by passing in the function and return type

# TODO check how udf will load function form local module. For Now Leaving this function mirrored in notebook.
def categorize_time_of_day(hour):
    """
    return label for hour of day
    :param hour:
    :return: string
    """
    if 5 <= hour < 12:
        return 'Morning'
    if 12 <= hour < 17:
        return 'Afternoon'
    if 17 <= hour < 21:
        return 'Evening'
    return 'Night'


# Before we start, check for nulls
# sampled_df.filter(sampled_df.started_at.isNull()).count() # there's 0

time_of_day_udf = udf(categorize_time_of_day, StringType())
sampled_df_with_day_period = sampled_df.withColumn('day_period', time_of_day_udf(hour('started_at')))
# sampled_df_with_day_period.show(5)

In [14]:
#%%
from pyspark.sql.functions import col

sampled_df_with_day_period.filter(col('day_period').isNull()).count() 

                                                                                

0

In [15]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="day_period", outputCol="day_period_index")
sampled_df_with_day_period_indexed = indexer.fit(sampled_df_with_day_period).transform(sampled_df_with_day_period)

                                                                                

In [None]:
from pyspark.sql.functions import when

sampled_df_with_day_period_indexed.filter(col('day_period_index').isNull()).count()
sampled_df_with_day_period_indexed = sampled_df_with_day_period_indexed.withColumn("day_period_index", when(col("day_period_index").isNull(), 0.0).otherwise(col("day_period_index"))) # TODO review this as potential problem. 


In [16]:
from pyspark.ml.feature import OneHotEncoder
# OneHotEnconding

encoder = OneHotEncoder(inputCols=["day_period_index"], outputCols=["day_period_ohe"])
sampled_df_with_day_period_ohed = encoder.fit(sampled_df_with_day_period_indexed).transform(sampled_df_with_day_period_indexed)



In [19]:
sanitized_bike_stations = sampled_df_with_day_period_ohed.join(
    bike_stations.withColumnRenamed("station_name", "start_station_name"),
    sampled_df_with_day_period_ohed.start_station_id == bike_stations.station_id,
    "left").drop("station_id")

# Join to map end_station_id to end_station_name
sanitized_bike_stations = sanitized_bike_stations.join(
    bike_stations.withColumnRenamed("station_name", "end_station_name"),
    sanitized_bike_stations.end_station_id == bike_stations.station_id,
    "left").drop("station_id")

In [18]:

"""
Pandas Approach: Used to test the approach and repeat on Spark
divvy_df_pandas.dropna(subset=['start_station_id', 'end_station_id'], inplace=True)
divvy_df_pandas['start_station_name', 'end_station_name'] = divvy_df_pandas['start_station_id'].map(lambda x: bike_stations_pd[bike_stations_pd['station_id'] == x]['station_name'].values[0])
divvy_df_pandas[['start_station_id', 'start_station_name', 'end_station_id', 'end_station_name']]
"""

"\nPandas Approach: Used to test the approach and repeat on Spark\ndivvy_df_pandas.dropna(subset=['start_station_id', 'end_station_id'], inplace=True)\ndivvy_df_pandas['start_station_name', 'end_station_name'] = divvy_df_pandas['start_station_id'].map(lambda x: bike_stations_pd[bike_stations_pd['station_id'] == x]['station_name'].values[0])\ndivvy_df_pandas[['start_station_id', 'start_station_name', 'end_station_id', 'end_station_name']]\n"

In [20]:
from pyspark.ml.feature import StringIndexer

indexer_start_station_id = StringIndexer(inputCol="start_station_id", outputCol="start_station_id_index")
indexer_end_station_id = StringIndexer(inputCol="end_station_id", outputCol="end_station_id_index")
sanitized_bike_stations_id_indexed = indexer_start_station_id.fit(sanitized_bike_stations).transform(
    sanitized_bike_stations)
sanitized_bike_stations_id_indexed = indexer_end_station_id.fit(sanitized_bike_stations).transform(
    sanitized_bike_stations_id_indexed)
# sanitized_bike_stations_id_indexed.show()

                                                                                

In [21]:
from pyspark.ml.feature import VectorAssembler

# most popular stations by period of day
feature_cols = ['start_station_id_index', 'day_period_ohe']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the data to have the features column
adf_kmeans = assembler.transform(sanitized_bike_stations_id_indexed)

In [22]:
### Custom spark Session to support synapsesML:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:1.0.2") \
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
    .getOrCreate()

from pyspark.ml.clustering import KMeans

# Number of clusters
k = 5  # Adjust based on your data and needs

kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(adf_kmeans)

24/02/02 19:18:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/02/02 19:21:38 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
    eval_type = read_int(infile)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	

Py4JJavaError: An error occurred while calling o356.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 446) (bigdatacourse.europe-west6-a.c.x-oxygen-413115.internal executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1293)
	at org.apache.spark.rdd.RDD.$anonfun$takeSample$1(RDD.scala:626)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:615)
	at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:400)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithmWithWeight(KMeans.scala:273)
	at org.apache.spark.mllib.clustering.KMeans.runWithWeight(KMeans.scala:231)
	at org.apache.spark.ml.clustering.KMeans.trainWithRow(KMeans.scala:446)
	at org.apache.spark.ml.clustering.KMeans.$anonfun$fit$1(KMeans.scala:382)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:371)
	at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:295)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Make predictions
# predictions = model.transform(adf_kmeans)
# 
# # Group by cluster label and start_station_id, and count occurrences
# popular_start_stations = predictions.groupBy("prediction", "start_station_id_index")
# 
# # Find the most popular station in each cluster
# most_popular_start_stations = popular_start_stations.groupBy("prediction").max("count")
# 
# popular_destinations = predictions.groupBy("prediction", "end_station_id_index").count()
# most_popular_destination = popular_start_stations.groupBy("prediction").max("count")

# TODO apply K-means clustering and to find the most popular source stations - Plot
# TODO - Can I apply K-means for most popular stations by time of day? - Plot
# TODO - most popular routes - Plot 
