Let's start with some basic import.

In [1]:
import findspark
import pyspark
import random

from contextlib import contextmanager
import logging
import pyarrow.parquet as pq
import time

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Let's configure our Spark environment.
For simplicity, we will work in Spark in local settings.

You just need to run the following lines of code in a pyspark ready environment (local[*] stands for "use all the cores of your CPU").

That means you won't be able to use multiple machines, but you will be able to use the multi-processor capabilities of your CPU.

If you have the possibility to use multiple machines connected to the same local network, you may want to configure a Spark standalone (distributed) cluster with one master node and a certain number of worker nodes.


In [2]:
#remember to stop the context if you re-execute this cell multiple times:
#sc.stop()

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Clustering Lyon Traffic Data') \
    .config("spark.network.timeout", "10000000") \
    .config("spark.driver.memory", "30g") \
    .getOrCreate() 

sc = spark.sparkContext
sc

## Let's load the Lyon trips data in parquet format

We are going to work on a Parquet file, containing the GPS observations for the city of Lyon, France on several months in 2017-2019.

Let's load the parquet files of Lyon.
Adapt to your configuration the filepaths to the folders containing the parquet data.

In [3]:
DATA_FOLDER_LYON_TRIPS = '/Volumes/LaCie/datasets/fcd_mediamobile_2017_2018/Parquet' #adapt the path to your case

### Let's read the trips data

We are going to measure the time it takes to perform the following operations in Spark, by using the time_usage function below.

In [4]:
logging.getLogger().setLevel(logging.INFO)

@contextmanager
def time_usage(name=""):
    """log the time usage in a code block
    name: the prefix text to show"""
    start = time.time()
    yield
    end = time.time()
    elapsed_seconds = float("%.4f" % (end - start))
    logging.info('%s: elapsed seconds: %s', name, elapsed_seconds)

In [5]:
#read the content of the parquet:
with time_usage("Reading parquet"):
    dfTrips = spark.read.parquet("file:///" + DATA_FOLDER_LYON_TRIPS)
dfTrips = dfTrips.withColumn("hour", hour(col("parsedTimestamp")))
dfTrips = dfTrips.withColumn("minute", minute(col("parsedTimestamp")))

#show the content of the first 10 lines of the dfTrips
#with time_usage("Showing the first 10 lines"):
#    dfTrips.show(10)

INFO:root:Reading parquet: elapsed seconds: 25.9784


The <code>printSchema()</code> method provides information on the dataframe, including the type associated to each column.

In [6]:
with time_usage("Printing schema"):
    dfTrips.printSchema()

INFO:root:Printing schema: elapsed seconds: 0.0057


root
 |-- vehicleId: string (nullable = true)
 |-- trackId: long (nullable = true)
 |-- obsId: long (nullable = true)
 |-- linkId: long (nullable = true)
 |-- coverage: float (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- speedInKph: integer (nullable = true)
 |-- parsedTimestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [7]:
#temporary filtering for testing
with time_usage("Filtering trips"):
    #Apply temporal filtering to reduce the size of your dataset. 
    
    #dfTrips = dfTrips.filter((col("year") == 2017) & (col("month") == 10) & (col("day") >= 15))
    dfTrips = dfTrips.filter((col("year") == 2017) | (col("year") == 2018))
    '''dfTrips = dfTrips.filter((col("year") == 2019) | (col("year") == 2018)\
                             | ((col("year") == 2017) & (col("month") >= 11))\
                             | ((col("year") == 2017) & (col("month") == 10) & (col("day") >= 15)))'''
    
    #dfTrips = dfTrips.filter((col("year") == 2019) | ((col("year") == 2018) & (col("month") >= 5)))


INFO:root:Filtering trips: elapsed seconds: 0.2602


In [8]:
#dfTrips.show(10)

### Let's read the links_with_iris file now!

We are going now to use additional information related to the road links to which our FCD observations are related, such as the commune the link belongs to, as retrieved from the file links_with_iris.csv.

The file is available at this url:

https://www.dropbox.com/s/mx0kom8wa754zn8/links_with_iris.csv?dl=0

In [9]:
LINKS_FILEPATH = '/Volumes/LaCie/datasets/fcd_mediamobile_2017_2018/Arc/graph_with_iris/links_with_iris.csv'
dfLinks = spark.read.load(LINKS_FILEPATH, format="csv", inferSchema="true", header="true")

#dfLinks.show(10)

In [10]:
dfLinks.printSchema()

root
 |-- id: long (nullable = true)
 |-- length: double (nullable = true)
 |-- iris_count: integer (nullable = true)
 |-- iris_length: double (nullable = true)
 |-- ffs: integer (nullable = true)
 |-- speedlimit: integer (nullable = true)
 |-- frc: integer (nullable = true)
 |-- netclass: integer (nullable = true)
 |-- fow: integer (nullable = true)
 |-- routenumber: string (nullable = true)
 |-- areaname: string (nullable = true)
 |-- name: string (nullable = true)
 |-- FROM: integer (nullable = true)
 |-- TO: integer (nullable = true)
 |-- INSEE_COM: integer (nullable = true)
 |-- NOM_COM: string (nullable = true)
 |-- IRIS: integer (nullable = true)
 |-- CODE_IRIS: integer (nullable = true)
 |-- NOM_IRIS: string (nullable = true)
 |-- TYP_IRIS: string (nullable = true)
 |-- geom: string (nullable = true)



Let's explore one sample of the data, to better understand the nature of the data:

In [11]:
#dfLinks.filter(col("INSEE_COM") == 69389).show(3)

In [12]:
#dfLinks.filter(col("INSEE_COM") == 69383).show(3)

In [13]:
#dfLinks.filter(col("NOM_COM") == "Villeurbanne").show(3)

In [14]:
'''print(dfLinks.filter(col("iris_count") > 5).count())
dfLinks.filter(col("iris_count") > 5).show(30)'''

'print(dfLinks.filter(col("iris_count") > 5).count())\ndfLinks.filter(col("iris_count") > 5).show(30)'

### Preparing the data

We want to prepare our trip dataset for clustering analysis.<br>

We would like to identify all observations that belong to a subset of arrondissments around the city of Lyon and neighbor communes. You can decide to focus on other/more/less communes by selecting the areas of interest based on the <code>INSEE_COM</code> or <code>NOM_COM</code> columns of the links data frame.

- First, we can decide to filter abnormal speed values (>= 200 kmh).
- Then, we will need to retrieve for each start observation the IRIS code of the associated link.
- Finally, we will compute a congestion indicator, i.e., the ratio between the measured speed and the link free flow speed.

We define all these filters/join operations in the following prepareTripsData.

In [15]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Communes to retain
#communesToRetain = [69381, 69382, 69383, 69384, 69385, 69386, 69387, 69388, 69389, 69266]
communesToRetain = [69381, 69382, 69383, 69384, 69385, 69386, 69387, 69388, 69389, 69266,\
                    69034, 69202, 69259, 69152, 69199, 69142, 69029, 69149, 69271, 69290,\
                    69256, 69081, 69244, 69089,69040]

def prepareTripsData(dfTrips, dfLinks, sampling_time_minutes=15):
    dfFilteredTrips = dfTrips.select("linkId", "year", "month", "day", "hour", "minute", "speedInKph")
        
    with time_usage("Filtering abnormal speed values"):
        dfFilteredTrips = dfFilteredTrips.filter((col('speedInKph') < 200) & (col("minute").isNotNull()))
        #binning_to_sampling_time_udf = udf(lambda minutes: sampling_time_minutes*(minutes // sampling_time_minutes))
        dfFilteredTrips = dfFilteredTrips.withColumn("15_minutes_slot", (sampling_time_minutes * (col("minute") / sampling_time_minutes).cast('integer')))
    #dfFilteredTrips.show(100)
    
    # Some links are splitted over several iris communes
    # For simplicity, we will filter the Links file by only retaining the link with 
    # the largest length within a same zone iris
    '''window = Window.partitionBy("id").orderBy(dfLinks["iris_length"].desc())
    dfLinks = dfLinks.withColumn("rank", F.row_number().over(window))
    dfLinks = dfLinks.filter(dfLinks["rank"] == 1)'''
    #dfLinks.filter(col("iris_count") > 5).show(100)
    
    #let's join now our trips with the insee information associated to the links
    dfFilteredTrips = dfFilteredTrips.join(dfLinks, dfFilteredTrips.linkId == dfLinks["id"])
    #dfOnlyOriginTripsFiltered.show(5)
    dfFilteredTrips.printSchema()
    
    # let's keep only the trips whose origin falls in one of the selected communes
    '''with time_usage("Counting the size of the data frame before INSEE filtering"):
        print("Original df has size: " + str(dfOnlyOriginTripsFiltered.count()))'''
    with time_usage("Filtering by INSEE_COM"):
        dfFilteredTrips = dfFilteredTrips.filter(col('INSEE_COM').isin(communesToRetain))
    with time_usage("Computing speed ratio"):
        speed_ratio_col = when((col("speedInKph").isNull()) | (col("ffs").isNull()) | (col("ffs") == 0), None) \
                                                 .otherwise(col("speedInKph") / col("ffs"))
        dfFilteredTrips = dfFilteredTrips.withColumn("speed_ratio", speed_ratio_col)
        
    '''with time_usage("Counting the size of the data frame after INSEE filtering"):
        print("Original df has size: " + str(dfOnlyOriginTripsFiltered.count()))'''
    
    dfFilteredTrips.printSchema()
    return dfFilteredTrips.select("year", "month", "day", "hour", "15_minutes_slot", "speed_ratio", "INSEE_COM")

We now apply the <code>prepareTripsData</code> to the our selected days of observation. You can decide to further reduce the days, depending on the computing power of your machine.

In [16]:
#filteredDfTrips = dfTrips.filter((col("month") == 10) & (col("day") <= 7))
dfTripsPrepared = prepareTripsData(dfTrips, dfLinks)
dfTripsPrepared.printSchema()

INFO:root:Filtering anomalous speed values: elapsed seconds: 0.245
INFO:root:Filtering by INSEE_COM: elapsed seconds: 0.1572


root
 |-- linkId: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- speedInKph: integer (nullable = true)
 |-- 15_minutes_slot: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- length: double (nullable = true)
 |-- iris_count: integer (nullable = true)
 |-- iris_length: double (nullable = true)
 |-- ffs: integer (nullable = true)
 |-- speedlimit: integer (nullable = true)
 |-- frc: integer (nullable = true)
 |-- netclass: integer (nullable = true)
 |-- fow: integer (nullable = true)
 |-- routenumber: string (nullable = true)
 |-- areaname: string (nullable = true)
 |-- name: string (nullable = true)
 |-- FROM: integer (nullable = true)
 |-- TO: integer (nullable = true)
 |-- INSEE_COM: integer (nullable = true)
 |-- NOM_COM: string (nullable = true)
 |-- IRIS: integer (nullable = true)
 |-- CODE_IRIS: integer 

INFO:root:Computing speed ratio: elapsed seconds: 0.0493


root
 |-- linkId: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- speedInKph: integer (nullable = true)
 |-- 15_minutes_slot: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- length: double (nullable = true)
 |-- iris_count: integer (nullable = true)
 |-- iris_length: double (nullable = true)
 |-- ffs: integer (nullable = true)
 |-- speedlimit: integer (nullable = true)
 |-- frc: integer (nullable = true)
 |-- netclass: integer (nullable = true)
 |-- fow: integer (nullable = true)
 |-- routenumber: string (nullable = true)
 |-- areaname: string (nullable = true)
 |-- name: string (nullable = true)
 |-- FROM: integer (nullable = true)
 |-- TO: integer (nullable = true)
 |-- INSEE_COM: integer (nullable = true)
 |-- NOM_COM: string (nullable = true)
 |-- IRIS: integer (nullable = true)
 |-- CODE_IRIS: integer 

We have produced a dataframe containing the year, month, day, hour of the day, commune and 15-minutes time slot of the observed speed (ratio) sample.

### Clustering 
We will cluster now the couples (year, month, day, hour, time-slot) based on the average speed ratio observed over the different communes.
The idea is to find moments in time that have similar features of congestion in the selected communes. 

We hope to find some patterns, such as moments of the day (e.g. morning hours of working days) which are pretty similar in the way traffic is distributed as well as moments of the day (e.g. morning hours of weekend days) that are very peculiar and different from the other ones.

Based on the previous prepared dataframe, we will now compute the mean speed ratio in each of the retained areas at a given time slots, by a simple groupby operation (on the fields "year", "month", "day", "hour", "15_minutes_slot") followed by an average operation per each INSEE_COM. Pivoting is necessary to transform the different means (each related to an INSEE_COM) into columns.

We also compute count and standard deviation for analysis purposes.

In [17]:
#we first sort the dataframe based on the INSEE_COM and the group by day and hour
dfTripsGrouped = dfTripsPrepared.groupby("year", "month", "day", "hour", "15_minutes_slot").pivot("INSEE_COM")

In [18]:
import pyspark.sql.functions as func
dfTripsGrouped = dfTripsGrouped.agg(func.mean('speed_ratio').alias('sr_mean'), func.count('speed_ratio').alias('sr_count'), \
                                       func.stddev('speed_ratio').alias('sr_std'))
dfTripsGrouped.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- 15_minutes_slot: integer (nullable = true)
 |-- 69029_sr_mean: double (nullable = true)
 |-- 69029_sr_count: long (nullable = true)
 |-- 69029_sr_std: double (nullable = true)
 |-- 69034_sr_mean: double (nullable = true)
 |-- 69034_sr_count: long (nullable = true)
 |-- 69034_sr_std: double (nullable = true)
 |-- 69040_sr_mean: double (nullable = true)
 |-- 69040_sr_count: long (nullable = true)
 |-- 69040_sr_std: double (nullable = true)
 |-- 69081_sr_mean: double (nullable = true)
 |-- 69081_sr_count: long (nullable = true)
 |-- 69081_sr_std: double (nullable = true)
 |-- 69089_sr_mean: double (nullable = true)
 |-- 69089_sr_count: long (nullable = true)
 |-- 69089_sr_std: double (nullable = true)
 |-- 69142_sr_mean: double (nullable = true)
 |-- 69142_sr_count: long (nullable = true)
 |-- 69142_sr_std: double (nullable = true)
 

In this example, we won't perform feature scaling, as speed ratios are already relative measures (ratio of the speed and the free flow over the links). 

Measures larger than one are however possibles.

### Assembling features

In order to run Spark’s k-means clustering algorithm, a single input column with “features” column of Vector type must be given. Now, the features are several separate columns (the average speed ratio per commune per time-slot). 

To merge them into a single Vector column, Spark’s VectorAssembler transformation is used.

In [19]:
from pyspark.ml.feature import *

assembler = VectorAssembler(inputCols=[str(x) + "_sr_mean" for x in communesToRetain], outputCol="features", handleInvalid = "skip")
dfTripsFeatures = assembler.transform(dfTripsGrouped)

Once the assembly operation has been performed, we are going to apply the KMeans algorithm on the assembled features, using NUM_CLUSTERS clusters and a seed = 1 for reproducibility reasons.

The ml API to apply machine learning algorithms in Pyspark is very similar to the sckit learn one:
- first you fit the model to train your algorithm (in this case to apply the K-Means algorithm to the selected features of your trainning set.
- then you apply transform on the trained model to retrieve the cluster labels in the prediction field

In [20]:
#dfTripsGrouped.show(10)

In [21]:
#dfTripsFeatures.show(10)

In [None]:
from pyspark.ml.clustering import KMeans, BisectingKMeans

NUM_CLUSTERS = 16
INITIAL_SEED = 1

# Fitting the model to apply K-means
with time_usage("Performing K-means clustering"):
    #kmeans = KMeans(k=NUM_CLUSTERS, seed=INITIAL_SEED)
    kmeans = BisectingKMeans(k=NUM_CLUSTERS, seed=INITIAL_SEED).setFeaturesCol("features")
    model = kmeans.fit(dfTripsFeatures.select('features'))

The <code>clusterCenters</code> function from the KMeans trained model retrieves the centers of the NUM_CLUSTERS cluster.

In [None]:
#print(model.clusterCenters())

Let's retrieve the clestering labels associated to the time slots.

In [None]:
# Transforming the model to retrieve the labels
with time_usage("Retrieving K-means labels"):
    dfClustered = model.transform(dfTripsFeatures)
#dfClustered.show()

Let's save the results to a parquet dataframe, partitioned by cluster. <br>
In the following code, choose the proper filename.

In [None]:
OUTPUT_FILENAME = "/Users/furno/Desktop/clusteredTrips" 
'''dfClustered.write \
    .format("parquet") \
    .partitionBy("prediction") \
    .save('file:///' + OUTPUT_FILENAME)'''

As the final dataframe has a much reduced size than the original one thanks to our processing, we can think of converting it to a pandas dataframe and save it in a simple csv file, as follows:

In [None]:
dfClusteredTripsPandas = dfClustered.toPandas()
#dfClusteredTripsPandas.to_csv(OUTPUT_FILENAME + '/../clustered_dataframe.csv', index=False)
dfClusteredTripsPandas.to_csv('clustered_dataframe.csv', index=False)

In [None]:
dfClusteredTripsPandas = pd.read_csv('clustered_dataframe.csv')
dfClusteredTripsPandas.head()

### Analyzing and plotting the clusters 

In the previos steps, we have clustered points that correspond to moments in time, i.e., (hour, day) pairs, based on the distributions of the emitted demand from our selected communes.

The best way to analyze the cluster results appears to be the following:
1. plotting with a different color each time slot,
2. plotting with an histogram the associated traffic distribution over the selected communes.

In [None]:
dfClusteredTripsPandas["day"] = dfClusteredTripsPandas["day"].apply(lambda x: '{0:0>2}'.format(x))
dfClusteredTripsPandas["month"] = dfClusteredTripsPandas["month"].apply(lambda x: '{0:0>2}'.format(x))
dfClusteredTripsPandas["hour"] = dfClusteredTripsPandas["hour"].apply(lambda x: '{0:0>2}'.format(x))
dfClusteredTripsPandas["15_minutes_slot"] = dfClusteredTripsPandas["15_minutes_slot"].apply(lambda x: '{0:0>2}'.format(x))
dfClusteredTripsPandas["the_date"] = dfClusteredTripsPandas[["year", "month", "day"]].apply(lambda row: '-'.join(row.values.astype(str)), axis=1)
dfClusteredTripsPandas["the_hour"] = dfClusteredTripsPandas[["hour", "15_minutes_slot"]].apply(lambda row: ':'.join(row.values.astype(str)), axis=1)

dfClusteredTripsPandas.head()

In [None]:
# 1. plotting the clusters over time

import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline


font = {'size' : 50}

matplotlib.rc('font', **font)
temp_df = dfClusteredTripsPandas.pivot(index='the_date', columns='the_hour', values='prediction')
temp_df.sort_index(inplace=True) 
print(temp_df.head() )

fig = plt.figure(figsize = (300,100))
ax = fig.add_subplot(111)

cax = ax.matshow(temp_df.values, cmap="Spectral")
ax.xaxis.set_ticks_position('bottom') 

col_labels = temp_df.columns.values
x_labels = [(col_labels[i] if (i%2 == 1) else "") for i in range(len(col_labels))]
ax.set_xticks(range(len(col_labels)))
ax.set_xticklabels(x_labels, rotation=90) 

row_labels = temp_df.index.values
y_labels = [(row_labels[i]  if (i % 2 == 0 or i == len(row_labels) - 1) else "") for i in range(len(row_labels))] 
ax.set_yticks(range(len(row_labels)))
ax.set_yticklabels(y_labels)

fig.colorbar(cax)
plt.tight_layout()
plt.show()

In [None]:
# 2. plotting clusters'centroid
import pandas as pd

dfLinks = pd.read_csv(LINKS_FILEPATH)
communes = pd.Series(dfLinks["NOM_COM"].values,index=dfLinks["INSEE_COM"]).to_dict()
print(communes)

selected_columns = [c for c in dfClusteredTripsPandas.columns if "sr_mean" in c]
selected_columns.append("prediction")
print(selected_columns)

dfClusteredTripsPandas = dfClusteredTripsPandas[selected_columns]
dfClusteredTripsPandasJoined = dfClusteredTripsPandas.rename(columns={str(key) + "_sr_mean":value for (key,value) in communes.items()})

dfClusteredTripsPandasJoined.head()

In [None]:
font = {'size': 10}

matplotlib.rc('font', **font)

dfClusteredTripsPandasJoined = dfClusteredTripsPandasJoined.groupby('prediction').mean()
dfClusteredTripsPandasJoined.head(20)
axes = dfClusteredTripsPandasJoined.plot.barh(rot=0, figsize = (24,24))
plt.title("Average speed per commune per each cluster", bbox={'facecolor':'0.8', 'pad':5})
plt.show()

Multiple Clustering

In [None]:
'''dfClusteredList = [dfClusteredTripsPandasJoined]
numClusters = [16]
for n in range(6, 35, 4):
    # Fitting the model to apply K-means
    with time_usage("Performing K-means clustering with " + str(n) + " clusters:"):
        kmeans_n = KMeans(k=n, seed=INITIAL_SEED)
        model_n = kmeans_n.fit(dfTripsFeatures.select('features'))
        dfClustered_n = model.transform(dfTripsFeatures)
        dfClusteredTripsPandas_n = dfClustered_n.toPandas()
        dfClusteredTripsPandas_n.to_csv(OUTPUT_FILENAME + '/../n_clusters/clustered_dataframe_' + str(n) + '.csv', index=False)
        dfClusteredList.append(dfClusteredTripsPandas_n)
        numClusters.append(n)'''

In [None]:
#sc.stop()