In [1]:
!pip install findspark
!pip install pyspark
!pip install numpy
!pip install pandas

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=28d29ca7c15b8d55a201fde2348e17544d6294a1f3a964a342c6cde021f273e0
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import numpy as np
import pandas as pd

In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local").setAppName("BigDataCW")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [6]:
# Define schema
Schema=StructType([
  StructField('Datetime',TimestampType(),nullable=True),
  StructField('Street_ID',StringType(),nullable=True),
  StructField('Traffic_Count',IntegerType(),nullable=True),
  StructField('Traffic_Speed',FloatType(),nullable=True)
])

In [8]:
# Import all Belgium 30 min data
Bel_30min_0101_0103_2019 = spark.read.option("mode",'DROPMALFORMED').schema(Schema).csv('/content/drive/My Drive/BD10/Bel_30min_0101_0103_2019.csv')
Bel_30min_1303_0606_2021 = spark.read.option("mode",'DROPMALFORMED').schema(Schema).csv('/content/drive/My Drive/BD10/Bel_30min_1303_0606_2021.csv')
Bel_30min_0506_1610_2021 = spark.read.option("mode",'DROPMALFORMED').schema(Schema).csv('/content/drive/My Drive/BD10/Bel_30min_0506_1610_2021.csv')

In [9]:
# Check no of rows in the dataset
Bel_30min_0101_0103_2019.count()

11245888

In [10]:
# Check no of rows in the dataset
Bel_30min_1303_0606_2021.count()

9108614

In [11]:
# Check no of rows in the dataset
Bel_30min_0506_1610_2021.count()

17406313

## Belgium 2019 (4 features from Kaggle)

#### The pre processing for this dataset uses Pandas DataFrame to replicate the implementation done by the previous paper
#### by G. Buroni, G. Bontempi, and K. Determe (2021)

In [12]:
# Read csv as pandas dataframes
Bel_30_min_2019_4feat = pd.read_csv('/content/drive/My Drive/BD10/Bel_30min_0101_0103_2019.csv')
Bel_30_min_2019_4feat.columns = ['Datetime', 'Street_ID', 'Sum_Traffic_Count','Traffic_Speed']
Bel_30_min_2019_4feat.head()

Unnamed: 0,Datetime,Street_ID,Sum_Traffic_Count,Traffic_Speed
0,2019-01-10 12:00:00,578.0,165,85.420202
1,2019-01-10 12:00:00,4504.0,184,82.847826
2,2019-01-10 12:00:00,1402.0,207,84.136689
3,2019-01-10 12:00:00,173.0,190,84.202632
4,2019-01-10 12:00:00,2773.0,112,85.430761


In [13]:
# Fill missing values with 0
Bel_30_min_2019_4feat = Bel_30_min_2019_4feat.fillna(0)

In [14]:
# Group by data based on datetime (sum all vehicles in any street ID)
Bel_30_min_2019_4feat = Bel_30_min_2019_4feat.sort_values(by=['Datetime']).groupby(['Datetime']).agg({'Sum_Traffic_Count':'sum'}).reset_index()
Bel_30_min_2019_4feat.head()

Unnamed: 0,Datetime,Sum_Traffic_Count
0,2019-01-01 00:00:00,2391
1,2019-01-01 00:30:00,2386
2,2019-01-01 01:00:00,2428
3,2019-01-01 01:30:00,2617
4,2019-01-01 02:00:00,2068


In [15]:
# Convert datatype to datetime
Bel_30_min_2019_4feat['Datetime_chgType'] = pd.to_datetime(Bel_30_min_2019_4feat['Datetime'])

In [16]:
Bel_30_min_2019_4feat['Hour'] = Bel_30_min_2019_4feat['Datetime_chgType'].dt.hour
Bel_30_min_2019_4feat.head()

Unnamed: 0,Datetime,Sum_Traffic_Count,Datetime_chgType,Hour
0,2019-01-01 00:00:00,2391,2019-01-01 00:00:00,0
1,2019-01-01 00:30:00,2386,2019-01-01 00:30:00,0
2,2019-01-01 01:00:00,2428,2019-01-01 01:00:00,1
3,2019-01-01 01:30:00,2617,2019-01-01 01:30:00,1
4,2019-01-01 02:00:00,2068,2019-01-01 02:00:00,2


In [17]:
# Get sin cos values from hours
Bel_30_min_2019_4feat['Hour_x']=np.sin(Bel_30_min_2019_4feat.Hour*(2.*np.pi/23))
Bel_30_min_2019_4feat['Hour_y']=np.cos(Bel_30_min_2019_4feat.Hour*(2.*np.pi/23))

In [18]:
# Extract day of the week
Bel_30_min_2019_4feat['DayOfWeek'] = Bel_30_min_2019_4feat['Datetime_chgType'].dt.dayofweek

# Encode day of the week 
# 2 for weekdays, 1 for Saturday, and 0 for Sunday
Bel_30_min_2019_4feat['WorkingDays'] = Bel_30_min_2019_4feat['DayOfWeek'].apply(lambda y: 2 if y < 5 else y)
Bel_30_min_2019_4feat['WorkingDays'] = Bel_30_min_2019_4feat['WorkingDays'].apply(lambda y: 1 if y == 5 else y)
Bel_30_min_2019_4feat['WorkingDays'] = Bel_30_min_2019_4feat['WorkingDays'].apply(lambda y: 0 if y == 6 else y)

In [19]:
# Drop columns that are no longer used
Bel_30_min_2019_4feat = Bel_30_min_2019_4feat.drop(['Datetime_chgType','Hour'], axis=1)

In [20]:
# Recheck data before exporting
Bel_30_min_2019_4feat.head()

Unnamed: 0,Datetime,Sum_Traffic_Count,Hour_x,Hour_y,DayOfWeek,WorkingDays
0,2019-01-01 00:00:00,2391,0.0,1.0,1,2
1,2019-01-01 00:30:00,2386,0.0,1.0,1,2
2,2019-01-01 01:00:00,2428,0.269797,0.962917,1,2
3,2019-01-01 01:30:00,2617,0.269797,0.962917,1,2
4,2019-01-01 02:00:00,2068,0.519584,0.854419,1,2


In [21]:
# Check dataframe shape
Bel_30_min_2019_4feat.shape

(2832, 6)

In [22]:
# Export csv file
file_name = '/content/drive/My Drive/BD10/Bel_30_min_2019_4features'
Bel_30_min_2019_4feat.to_csv(file_name + '.csv', index=False)

## Belgium 2019 (our novel implementation using PySpark)

In [23]:
# Drop missing values
Bel_30_min_2019_df = Bel_30min_0101_0103_2019.na.drop()

In [24]:
# Group by data based on datetime (sum all vehicles in any street ID)
Bel_30_min_2019_df = Bel_30_min_2019_df.groupBy("Datetime").sum("Traffic_Count")

In [25]:
# Get day of the week from date
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Day_Of_Week',dayofweek(Bel_30_min_2019_df.Datetime))
# 1- Sunday , 2- Monday …… 7- Saturday

In [26]:
# Extract date, month, and year from date
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Date', dayofmonth(Bel_30_min_2019_df.Datetime))
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Month', month(Bel_30_min_2019_df.Datetime))
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Year', year(Bel_30_min_2019_df.Datetime))

In [27]:
# Extract hour, minute, and second from date
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Hour', hour(Bel_30_min_2019_df.Datetime))
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Minute', minute(Bel_30_min_2019_df.Datetime))
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn('Second', second(Bel_30_min_2019_df.Datetime))

In [28]:
# Encode Hour column to classify day or night
# Day = 6 am to before 6 pm -> encoded as 0
# Night = 6 pm to before 6 am -> encoded as 1

Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn("Day_Or_Night", 
        when((Bel_30_min_2019_df['Hour'] >= 6) & (Bel_30_min_2019_df['Hour'] < 18), 0).otherwise(1))

In [29]:
# Encode Day_Of_Week column to classify whether the date is weekdays or weekend
# Weekdays: day_of_week 2 to 6 -> encoded as 0
# Weekend: day_of_week 1 (Sunday) or 7 (Saturday) -> encoded as 1

Bel_30_min_2019_df = Bel_30_min_2019_df.withColumn("Weekdays_Or_Weekend", 
        when((Bel_30_min_2019_df['Day_Of_Week'] == 1) | (Bel_30_min_2019_df['Day_Of_Week'] == 7), 1) 
      .otherwise(0))

In [30]:
# Sort df according to datetime
Bel_30_min_2019_df = Bel_30_min_2019_df.sort("Datetime")

In [31]:
# Rename columns
Bel_30_min_2019_df = Bel_30_min_2019_df.withColumnRenamed("sum(Traffic_Count)","Sum_Traffic_Count")

In [32]:
Bel_30_min_2019_df.count()

2832

In [33]:
# Convert to pandas df and export csv file
pandasDF = Bel_30_min_2019_df.toPandas()
pandasDF.to_csv('/content/drive/My Drive/BD10/Bel_30_min_2019_processed.csv', index=False)

## Belgium 2021 (our novel implementation using PySpark)

In [34]:
# Union 2021 data and only take distinct rows to avoid duplication
Bel_30_min_2021_df = Bel_30min_1303_0606_2021.union(Bel_30min_0506_1610_2021).distinct()

In [35]:
# Drop missing values
Bel_30_min_2021_df = Bel_30_min_2021_df.na.drop()

In [36]:
# Recheck number of missing values for each column after dropping
#Bel_30_min_2021_df.select([count(when(col(c).isNull(), c)).alias(c) for c in Bel_30_min_2021_df.columns]).show()

In [37]:
# Group by data based on datetime (sum all vehicles in any street ID)
Bel_30_min_2021_df = Bel_30_min_2021_df.groupBy("Datetime").sum("Traffic_Count")

In [38]:
# Get day of the week from date
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Day_Of_Week',dayofweek(Bel_30_min_2021_df.Datetime))
# 1- Sunday , 2- Monday …… 7- Saturday

In [39]:
# Extract date, month, and year from date
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Date', dayofmonth(Bel_30_min_2021_df.Datetime))
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Month', month(Bel_30_min_2021_df.Datetime))
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Year', year(Bel_30_min_2021_df.Datetime))

In [40]:
# Extract hour, minute, and second from date
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Hour', hour(Bel_30_min_2021_df.Datetime))
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Minute', minute(Bel_30_min_2021_df.Datetime))
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn('Second', second(Bel_30_min_2021_df.Datetime))

In [41]:
# Encode Hour column to classify day or night
# Day = 6 am to before 6 pm
# Night = 6 pm to before 6 am

Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn("Day_Or_Night", 
        when((Bel_30_min_2021_df['Hour'] >= 6) & (Bel_30_min_2021_df['Hour'] < 18), 0).otherwise(1))

In [42]:
# Encode Day_Of_Week column to classify whether the date is weekdays or weekend
# Weekdays: day_of_week 2 to 6 -> encoded as 0
# Weekend: day_of_week 1 (Sunday) or 7 (Saturday) -> encoded as 1

Bel_30_min_2021_df = Bel_30_min_2021_df.withColumn("Weekdays_Or_Weekend", 
        when((Bel_30_min_2021_df['Day_Of_Week'] == 1) | (Bel_30_min_2021_df['Day_Of_Week'] == 7), 1) 
      .otherwise(0))

In [43]:
# Sort df according to datetime
Bel_30_min_2021_df = Bel_30_min_2021_df.sort("Datetime")

In [44]:
# Rename columns
Bel_30_min_2021_df = Bel_30_min_2021_df.withColumnRenamed("sum(Traffic_Count)","Sum_Traffic_Count")

In [45]:
# Check no of rows in dataset after data pre processing and cleaning
Bel_30_min_2021_df.count()

10411

In [47]:
# Convert to pandas df and export csv file
pandasDF = Bel_30_min_2021_df.toPandas()
pandasDF.to_csv('/content/drive/My Drive/BD10/Bel_30_min_2021_processed.csv', index=False)

In [48]:
sc.stop()