In [1]:
import os
# spark session
from pyspark.sql import SparkSession

# config - when tuining spark jobs, (memory, shufflepartitions,..)
spark = SparkSession.builder.appName(" Python Spark SQL demo").getOrCreate()



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/08 15:46:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# read files 
device_list_df = spark.read.csv("../data/raw_data/devices_list.csv",header=True,inferSchema=True)

In [3]:
# print first n rows of dataframe
device_list_df.show()

+-----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|Device name|Active|           Road (nl)|           Road (fr)|           Road (en)|    Description (nl)|    Description (fr)|    Description (en)|         Lane schema|        Basic schema|     Detailed schema|           Picture 1|           Picture 2|      Lon (WGS 84)|      Lat (WGS 84)|          X (Lb72)|          Y (Lb72)|
+-----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|      CAT17|  t

In [4]:
# read the bike counts
bike_count_df = spark.read.csv("../data/raw_data/raw_data.csv",header=True, inferSchema=True)


                                                                                

In [5]:
bike_count_df.show(10)

+----------+--------+-----+-------------+-----------+
|      Date|Time gap|Count|Average speed|device_name|
+----------+--------+-----+-------------+-----------+
|2020-11-23|      93|    0|           -1|      CEK31|
|2020-11-23|      94|    0|           -1|      CEK31|
|2020-11-23|      95|    0|           -1|      CEK31|
|2020-11-23|      96|    0|           -1|      CEK31|
|2020-11-24|       1|    0|           -1|      CEK31|
|2020-11-24|       2|    0|           -1|      CEK31|
|2020-11-24|       3|    0|           -1|      CEK31|
|2020-11-24|       4|    0|           -1|      CEK31|
|2020-11-24|       5|    0|           -1|      CEK31|
|2020-11-24|       6|    0|           -1|      CEK31|
+----------+--------+-----+-------------+-----------+
only showing top 10 rows


In [6]:
bike_count_df.head(5)

[Row(Date=datetime.date(2020, 11, 23), Time gap=93, Count=0, Average speed=-1, device_name='CEK31'),
 Row(Date=datetime.date(2020, 11, 23), Time gap=94, Count=0, Average speed=-1, device_name='CEK31'),
 Row(Date=datetime.date(2020, 11, 23), Time gap=95, Count=0, Average speed=-1, device_name='CEK31'),
 Row(Date=datetime.date(2020, 11, 23), Time gap=96, Count=0, Average speed=-1, device_name='CEK31'),
 Row(Date=datetime.date(2020, 11, 24), Time gap=1, Count=0, Average speed=-1, device_name='CEK31')]

In [7]:
# tasks for pyspark
# join the device_list and the bike_count tables
# clean (missing count, standardize station names), enrich ith weathre data, parse and cast types, transform and 
# aggregate raw bike counts using DataFrames/SQL : total rides, busiest stations, hourly/daily/weekly trends, peak usage windows
# 
## Business Value
# Scalable insights: millions of bike count records across years, stations
# Real-time analytics: spot sudden spikes/operational outagaes instantly
# Data quality: automate correction and completeness checks
# Advanced Reporting: maooing, visualisation and predictive analytics for urban, operations, marketing teams


In [8]:
# rename columns
# bike count data
bike_count_df = bike_count_df.toDF(*(c.lower().replace(" ","_") for c in bike_count_df.columns))

bike_count_df.show(10)

+----------+--------+-----+-------------+-----------+
|      date|time_gap|count|average_speed|device_name|
+----------+--------+-----+-------------+-----------+
|2020-11-23|      93|    0|           -1|      CEK31|
|2020-11-23|      94|    0|           -1|      CEK31|
|2020-11-23|      95|    0|           -1|      CEK31|
|2020-11-23|      96|    0|           -1|      CEK31|
|2020-11-24|       1|    0|           -1|      CEK31|
|2020-11-24|       2|    0|           -1|      CEK31|
|2020-11-24|       3|    0|           -1|      CEK31|
|2020-11-24|       4|    0|           -1|      CEK31|
|2020-11-24|       5|    0|           -1|      CEK31|
|2020-11-24|       6|    0|           -1|      CEK31|
+----------+--------+-----+-------------+-----------+
only showing top 10 rows


In [9]:
# device list 
import re
def clean_col_name(col):
    # lowercase
    col = col.lower()

    # replace the space and dot with underscore
    col = re.sub(r"[ .]+","_",col)

    #remove parantheses,commas, or any other special characters
    col = re.sub(r"[(),]","",col)

    # remove extra underscores from multiple replacements

    col = re.sub(r"__+","_",col)

    return col


device_list_df = device_list_df.toDF(*(clean_col_name(c) for c in device_list_df.columns))


In [10]:
# visualise the device_list_df

device_list_df.show()

+-----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|device_name|active|             road_nl|             road_fr|             road_en|      description_nl|      description_fr|      description_en|         lane_schema|        basic_schema|     detailed_schema|           picture_1|           picture_2|        lon_wgs_84|        lat_wgs_84|            x_lb72|            y_lb72|
+-----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|      CAT17|  t

In [11]:
device_list_df.schema

StructType([StructField('device_name', StringType(), True), StructField('active', BooleanType(), True), StructField('road_nl', StringType(), True), StructField('road_fr', StringType(), True), StructField('road_en', StringType(), True), StructField('description_nl', StringType(), True), StructField('description_fr', StringType(), True), StructField('description_en', StringType(), True), StructField('lane_schema', StringType(), True), StructField('basic_schema', StringType(), True), StructField('detailed_schema', StringType(), True), StructField('picture_1', StringType(), True), StructField('picture_2', StringType(), True), StructField('lon_wgs_84', DoubleType(), True), StructField('lat_wgs_84', DoubleType(), True), StructField('x_lb72', DoubleType(), True), StructField('y_lb72', DoubleType(), True)])

In [12]:
# perform a join on bike_count
bike_count_df.count()
# > 3 million records

3059094

In [13]:
# perform a join on bike_count
bike_join = bike_count_df.join(device_list_df,on='device_name',how='left')

In [37]:
bike_join.schema

StructType([StructField('device_name', StringType(), True), StructField('date', DateType(), True), StructField('time_gap', IntegerType(), True), StructField('count', IntegerType(), True), StructField('average_speed', IntegerType(), True), StructField('active', BooleanType(), True), StructField('road_nl', StringType(), True), StructField('road_fr', StringType(), True), StructField('road_en', StringType(), True), StructField('description_nl', StringType(), True), StructField('description_fr', StringType(), True), StructField('description_en', StringType(), True), StructField('lane_schema', StringType(), True), StructField('basic_schema', StringType(), True), StructField('detailed_schema', StringType(), True), StructField('picture_1', StringType(), True), StructField('picture_2', StringType(), True), StructField('lon_wgs_84', DoubleType(), True), StructField('lat_wgs_84', DoubleType(), True), StructField('x_lb72', DoubleType(), True), StructField('y_lb72', DoubleType(), True)])

25/09/08 17:40:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1956153 ms exceeds timeout 120000 ms
25/09/08 17:40:00 WARN SparkContext: Killing executors is not supported by current scheduler.
25/09/08 17:40:02 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at 

In [14]:
bike_join.show()

+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+------------------+------------------+
|device_name|      date|time_gap|count|average_speed|active|             road_nl|             road_fr|             road_en|      description_nl|      description_fr|      description_en|         lane_schema|        basic_schema|     detailed_schema|           picture_1|           picture_2|       lon_wgs_84|       lat_wgs_84|            x_lb72|            y_lb72|
+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------

In [15]:
# bbc
# save the data as csv 
bike_join.write.mode("overwrite").csv("../data/processed/bike_counts.csv")


                                                                                

In [16]:
# save the data as parquet

bike_join.write.mode("overwrite").parquet("../data/processed/bike_counts.parquet")


25/09/08 15:47:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [17]:
# create a datetime comumn

from pyspark.sql.functions import to_timestamp, col, expr, add_months, concat,lpad

df_with_time = bike_join.withColumn(
    "time",
    concat(
        lpad(expr("cast(((time_gap - 1) * 15) / 60 as int)"), 2, '0'),
        expr("':'"),
        lpad(expr("cast(((time_gap - 1) * 15) % 60 as int)"), 2, '0'),
        expr("':00'")
    )
)

In [18]:
df_with_time.show()

+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+------------------+------------------+--------+
|device_name|      date|time_gap|count|average_speed|active|             road_nl|             road_fr|             road_en|      description_nl|      description_fr|      description_en|         lane_schema|        basic_schema|     detailed_schema|           picture_1|           picture_2|       lon_wgs_84|       lat_wgs_84|            x_lb72|            y_lb72|    time|
+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-

In [19]:
# combine date a
df_with_time.select("time").show(5)

+--------+
|    time|
+--------+
|23:00:00|
|23:15:00|
|23:30:00|
|23:45:00|
|00:00:00|
+--------+
only showing top 5 rows


In [20]:
from pyspark.sql.functions import concat_ws

df_with_datetime = df_with_time.withColumn(
    "datetime",
    to_timestamp(concat_ws(" ",col("date").cast("string"),col("time")))
).drop("time")




In [21]:
df_with_datetime.select("datetime").show()

+-------------------+
|           datetime|
+-------------------+
|2020-11-23 23:00:00|
|2020-11-23 23:15:00|
|2020-11-23 23:30:00|
|2020-11-23 23:45:00|
|2020-11-24 00:00:00|
|2020-11-24 00:15:00|
|2020-11-24 00:30:00|
|2020-11-24 00:45:00|
|2020-11-24 01:00:00|
|2020-11-24 01:15:00|
|2020-11-24 01:30:00|
|2020-11-24 01:45:00|
|2020-11-24 02:00:00|
|2020-11-24 02:15:00|
|2020-11-24 02:30:00|
|2020-11-24 02:45:00|
|2020-11-24 03:00:00|
|2020-11-24 03:15:00|
|2020-11-24 03:30:00|
|2020-11-24 03:45:00|
+-------------------+
only showing top 20 rows


In [22]:
# use plotly to visualise
# total number of bike_counts aggregated per year
from pyspark.sql.functions import year, sum

# get year

df_with_year = df_with_datetime.withColumn("year",year("datetime"))


In [23]:
df_with_year.show()

+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+------------------+------------------+-------------------+----+
|device_name|      date|time_gap|count|average_speed|active|             road_nl|             road_fr|             road_en|      description_nl|      description_fr|      description_en|         lane_schema|        basic_schema|     detailed_schema|           picture_1|           picture_2|       lon_wgs_84|       lat_wgs_84|            x_lb72|            y_lb72|           datetime|year|
+-----------+----------+--------+-----+-------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------

In [24]:

# group by station and year, aggregate by sum of counts

df_yearly = df_with_year.groupBy("device_name","year").agg(sum("count").alias("total_count")).orderBy("device_name","year")

In [25]:
df_yearly.show()



+-----------+----+-----------+
|device_name|year|total_count|
+-----------+----+-----------+
|      CAT17|2020|      46033|
|      CAT17|2021|     423492|
|      CAT17|2022|     533284|
|      CAT17|2023|     545711|
|      CAT17|2024|     569622|
|    CB02411|2019|     436553|
|    CB02411|2020|     664437|
|    CB02411|2021|     604896|
|    CB02411|2022|     824891|
|    CB02411|2023|    1052527|
|    CB02411|2024|    1105636|
|     CB1101|2022|      14195|
|     CB1101|2023|    1456137|
|     CB1101|2024|    1427037|
|     CB1142|2019|     351174|
|     CB1142|2020|     585926|
|     CB1142|2021|     660553|
|     CB1142|2022|    1023809|
|     CB1142|2023|    1049197|
|     CB1142|2024|     700479|
+-----------+----+-----------+
only showing top 20 rows


                                                                                

In [26]:
# to visualise on plotly, need to convert the spark dataframe to pandas dataframe
pdf = df_yearly.toPandas()

# pivot for wide format: years as rows, stations as columns

pivot_pdf =pdf.pivot(index='year',columns='device_name',values="total_count")

import plotly.express as px

fig = px.line(
    pivot_pdf,
    x=pivot_pdf.index,
    y=pivot_pdf.columns,
    labels={'x': 'Year', 'value':'Total Bike count', 'variable':'Station'},
    title='Total Bike Counts Per Year Per Station'
)

fig.show()

                                                                                

In [27]:
import nbformat

In [28]:
# filtering the pandas dataframe for year 2024

df_yearly_2024 = pdf[pdf['year']==2024]
df_yearly_2024.head()

Unnamed: 0,device_name,year,total_count
4,CAT17,2024,569622
10,CB02411,2024,1105636
13,CB1101,2024,1427037
19,CB1142,2024,700479
25,CB1143,2024,930700


In [29]:
# aggregate but maintain the latitude and longitude


# group by station and year, aggregate by sum of counts

df_2024 = df_with_year.filter(df_with_year.year == 2024).groupBy("device_name","lat_wgs_84","lon_wgs_84").agg(sum("count").alias("total_count_2024")).orderBy("device_name")

In [30]:
df_2024.show()



+-----------+------------------+------------------+----------------+
|device_name|        lat_wgs_84|        lon_wgs_84|total_count_2024|
+-----------+------------------+------------------+----------------+
|      CAT17|50.820621002525904| 4.301991995919691|          569622|
|    CB02411| 50.88185000251615| 4.373960995908418|         1105636|
|     CB1101| 50.84534100252618|  4.36858299590588|         1427037|
|     CB1142|50.843570002527876| 4.378737995903186|          700479|
|     CB1143| 50.84337000252792| 4.378813995903146|          930700|
|     CB1599| 50.81240800253699| 4.379253995899753|          325219|
|     CB1699| 50.81228000253699| 4.378799995899851|          297998|
|     CB2105| 50.84048000252439| 4.340009995912418|          611051|
|     CEE016| 50.82137600253522| 4.386718995898861|          512085|
|     CEK049| 50.82448000253513| 4.393892995897416|          713978|
|      CEK18| 50.83920000253155| 4.400258995897405|          435371|
|      CEK31| 50.83818200253104| 4

                                                                                

In [None]:
# plot on a map
# convert to pandas dataframe
pdf_2024 = df_2024.toPandas()
import plotly.express as px

fig = px.scatter(
    pdf_2024,
    x='lon_wgs_84',
    y='lat_wgs_84',
    size='total_count_2024',
    color = 'total_count_2024',
    hover_name='device_name',
    title='Bike Counts Per Station',
    labels={'lon_wgs_84':'Longitude','lat_wgs_84':'Latitude','total_count_2024':'Total Count'}
)

fig.show()

In [34]:
# map
fig = px.scatter_mapbox(
    pdf_2024,
    lat='lat_wgs_84',
    lon='lon_wgs_84',
    size='total_count_2024',
    color = 'total_count_2024',
    hover_name='device_name',
    zoom=11,
    mapbox_style='carto-positron',
    title='Bike Counts Per Station'
)

fig.show()


*scatter_mapbox* is deprecated! Use *scatter_map* instead. Learn more at: https://plotly.com/python/mapbox-to-maplibre/



In [36]:
# map
fig = px.scatter_map(
    pdf_2024,
    lat='lat_wgs_84',
    lon='lon_wgs_84',
    size='total_count_2024',
    color = 'total_count_2024',
    hover_name='device_name',
    zoom=11,
    title='Bike Counts Per Station'
)

fig.show()