# Data Generation

This notebook contains the code to generate some of the files used by our project website

In [1]:
import os
import json
# Initial Spark
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import explode
import pyspark.sql.types as T
from pyspark.sql.functions import lit, col, to_date

from pyspark.sql.functions import avg
from pyspark.sql.functions import stddev
from pyspark.sql.functions import count, countDistinct, concat, sum
from pyspark.sql.functions import percentile_approx
import pyspark.sql.functions as F

In [2]:
DATA_FOLDER = "/Users/giacomoorsi/MEGAsync Downloads/Trenitalia-GenMar2023"
file = "all.parquet"

SAVE_COMPUTATIONS = False

In [3]:

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Trenitalia") \
    .getOrCreate()

# set driver memory to 4GB
spark.sparkContext._conf.setAll([('spark.driver.memory', '4g')])

# get sc 
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/21 11:42:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.parquet(os.path.join(DATA_FOLDER, "parquet", file))
print("Number of rows: {}".format(df.count()))



Number of rows: 8316723


                                                                                

In [5]:
# load the dataset of the stops
stops = spark.read.csv(os.path.join(DATA_FOLDER, "stops.csv"), header=True, inferSchema=True)

stops_column_renamer = {
    "name": "stop_name",
    "lat": "stop_lat",
    "lon": "stop_lon",
    "station_id": "stop_id", 
    "name_short": "stop_name_short",
    "id_region": "stop_id_region",
}

for k, v in stops_column_renamer.items():
    stops = stops.withColumnRenamed(k, v)

In [6]:
# number of days
print("Number of days: ", df.select("date").distinct().count())

# first date
print("First date: ", df.select("date").distinct().orderBy("date").first().asDict()["date"])

# last date
print("Last date: ", df.select("date").distinct().orderBy("date", ascending=False).first().asDict()["date"])

# number of trains
print("Number of trains: ", df.select("train_number").distinct().count())

# number of stops
print("Number of stops: ", df.select("stop_name").distinct().count())

# number of train classes
print("Number of train classes: ", df.select("train_class").distinct().count())


                                                                                

Number of days:  90


                                                                                

First date:  2023-01-01


                                                                                

Last date:  2023-03-31


                                                                                

Number of trains:  11496


                                                                                

Number of stops:  2291
Number of train classes:  11


In [7]:
print(df.columns)

['train_arrival_stop_name', 'train_class', 'train_cn', 'train_dl', 'train_number', 'train_arrival_time', 'oae', 'train_oaz', 'train_od', 'train_oo', 'train_departure_time', 'train_ope', 'train_opz', 'train_departure_stop_name', 'train_pr', 'train_arrival_delay', 'train_departure_delay', 'sea', 'train_sep', 'train_sub', 'day', 'month', 'year', 'date', 'stop_name', 'stop_arrival_time', 'stop_departure_time', 'stop_arrival_delay', 'stop_departure_delay']


# 0. Preprocessing
As step of preprocessing, we remove all delays that are anomalous, i.e. they are not in the range [-100, 300] minutes. 

In [8]:
# remove all values of delays that are not in the range [-100, 300] if they are numerical
MIN_DELAY = -100
MAX_DELAY = 300
df = df.filter((col("stop_arrival_delay").cast("double").isNull()) | (col("stop_arrival_delay").cast("double") >= MIN_DELAY) & (col("stop_arrival_delay").cast("double") <= MAX_DELAY))

# remove all the trains that are not in the following classes
KEEP_TRAIN_CLASSES = ["IC", "ICN", "REG", "", "EC"]

df = df.filter(col("train_class").isin(KEEP_TRAIN_CLASSES))

# replace empty train class with "AV"
df = df.withColumn("train_class", F.when(col("train_class") == "", "AV").otherwise(col("train_class")))


# 1. Statistics for each station

For each distinct station, we want to obtain: 
1. Station name
2. Latitude, longitude
3. Average arrival delay
4. Median arrival delay
5. % of trains with delay > 3
6. % of trains with delay > 5
7. % of trains with delay > 10
8. Number of distinct train numbers that stopped 

In [9]:
# add column True if train had > 3 minutes of delay
data_stop = df.join(stops, on="stop_name", how="inner")


data_stop = data_stop \
    .filter(col("stop_arrival_delay").cast("double").isNotNull()) \
    .withColumn("stop_arrival_delay_double", col("stop_arrival_delay").cast("double")) \
    .drop("stop_arrival_delay") \
    .withColumnRenamed("stop_arrival_delay_double", "stop_arrival_delay") \
    .withColumn("3m_delay", col("stop_arrival_delay") > 3)\
    .withColumn("5m_delay", col("stop_arrival_delay") > 5)\
    .withColumn("10m_delay", col("stop_arrival_delay") > 10)\
    .withColumn("day_of_week", F.date_format(F.col("date"), "E"))\
    .withColumn("train_id", concat(col("train_class"), col("train_number")))

In [10]:
data_stop_stat = data_stop.groupBy("stop_name") \
    .agg(
        F.avg("stop_arrival_delay").alias("avg_arrival_delay"),
        F.percentile_approx("stop_arrival_delay", 0.5).alias("median_arrival_delay"),
        F.countDistinct("train_id").alias("count_trains"),
        F.count("train_id").alias("count_stops"),
        F.sum(F.col("3m_delay").cast("long")).alias("count_3m_delay"),
        F.sum(F.col("5m_delay").cast("long")).alias("count_5m_delay"),
        F.sum(F.col("10m_delay").cast("long")).alias("count_10m_delay"),
        F.first("stop_lat").alias("stop_lat"),
        F.first("stop_lon").alias("stop_lon")
    )

In [11]:
print("Number of stops: ", data_stop_stat.count())



Number of stops:  2025


                                                                                

In [12]:
print("Number of stops in (lat,long) dataset: ", stops.count())

Number of stops in (lat,long) dataset:  2961


In [13]:
print("Number of stops in final dataset: ", data_stop_stat.count())



Number of stops in final dataset:  2025


                                                                                

In [14]:
data_stop_pandas = data_stop_stat.toPandas()
data_stop_pandas.head(100)

                                                                                

Unnamed: 0,stop_name,avg_arrival_delay,median_arrival_delay,count_trains,count_stops,count_3m_delay,count_5m_delay,count_10m_delay,stop_lat,stop_lon
0,ABBASANTA,1.251249,0.0,29,1401,205,131,79,40.128801,8.817733
1,ABBIATEGRASSO,2.978809,2.0,57,3728,1105,596,209,45.400631,8.921305
2,ACQUAVIVA,3.013732,2.0,30,2039,721,327,61,37.570258,13.674927
3,ACQUAVIVA DELLE FONTI,1.041216,1.0,36,2402,158,74,35,40.892806,16.839826
4,ACQUEDOLCI-S.FRATELLO,0.272120,0.0,18,1198,93,57,22,38.058459,14.587597
...,...,...,...,...,...,...,...,...,...,...
95,CANDIOLO,4.380804,4.0,46,3532,1852,1119,123,44.961155,7.598973
96,CANICATTI`,2.282486,1.0,15,885,203,115,41,37.359879,13.855395
97,CANISTRO,4.800000,0.0,8,10,2,2,2,,
98,CANNETO SULL`OGLIO,6.860747,5.0,24,1982,1365,939,288,45.150211,10.371470


In [15]:
# save as csv
if SAVE_COMPUTATIONS:
    data_stop_pandas.to_csv(("dataset_generated/data_stop/data_stop.csv"), index=False)

### b. Statistics for each day of week

In [16]:
data_stop_stat = data_stop.groupBy("stop_name", "day_of_week") \
    .agg(
        F.avg("stop_arrival_delay").alias("avg_arrival_delay"),
        F.percentile_approx("stop_arrival_delay", 0.5).alias("median_arrival_delay"),
        F.countDistinct("train_id").alias("count_trains"),
        F.count("train_id").alias("count_stops"),
        F.sum(F.col("3m_delay").cast("long")).alias("count_3m_delay"),
        F.sum(F.col("5m_delay").cast("long")).alias("count_5m_delay"),
        F.sum(F.col("10m_delay").cast("long")).alias("count_10m_delay"),
        F.first("stop_lat").alias("stop_lat"),
        F.first("stop_lon").alias("stop_lon")
    )

In [17]:
data_stop_stat.show()

data_stop_stat_pandas = data_stop_stat.toPandas()

                                                                                

+--------------------+-----------+-------------------+--------------------+------------+-----------+--------------+--------------+---------------+---------+---------+
|           stop_name|day_of_week|  avg_arrival_delay|median_arrival_delay|count_trains|count_stops|count_3m_delay|count_5m_delay|count_10m_delay| stop_lat| stop_lon|
+--------------------+-----------+-------------------+--------------------+------------+-----------+--------------+--------------+---------------+---------+---------+
|         ABANO TERME|        Mon| 2.1349862258953167|                 2.0|          28|        363|            76|            51|             18|45.355199|11.811533|
|         ABANO TERME|        Tue| 1.3961218836565097|                 2.0|          28|        361|            75|            44|              6|45.355199|11.811533|
|         ABANO TERME|        Wed| 1.6565096952908587|                 2.0|          28|        361|            82|            52|              8|45.355199|11.811533

                                                                                

In [18]:
# create a file for each day of week with the statistics
if SAVE_COMPUTATIONS:
    for day in data_stop_stat_pandas["day_of_week"].unique():
        data_stop_stat_pandas[data_stop_stat_pandas["day_of_week"] == day].to_csv(("dataset_generated/data_stop/data_stop_{}.csv".format(day)), index=False)
    

### 1c. Statistics for each train type

In [19]:
data_stop_stat = data_stop.groupBy("stop_name", "train_class") \
    .agg(
        F.avg("stop_arrival_delay").alias("avg_arrival_delay"),
        F.percentile_approx("stop_arrival_delay", 0.5).alias("median_arrival_delay"),
        F.countDistinct("train_id").alias("count_trains"),
        F.count("train_id").alias("count_stops"),
        F.sum(F.col("3m_delay").cast("long")).alias("count_3m_delay"),
        F.sum(F.col("5m_delay").cast("long")).alias("count_5m_delay"),
        F.sum(F.col("10m_delay").cast("long")).alias("count_10m_delay"),
        F.first("stop_lat").alias("stop_lat"),
        F.first("stop_lon").alias("stop_lon")
    )

In [20]:
data_stop_stat.show()

data_stop_stat_pandas = data_stop_stat.toPandas()

                                                                                

+--------------------+-----------+------------------+--------------------+------------+-----------+--------------+--------------+---------------+---------+---------+
|           stop_name|train_class| avg_arrival_delay|median_arrival_delay|count_trains|count_stops|count_3m_delay|count_5m_delay|count_10m_delay| stop_lat| stop_lon|
+--------------------+-----------+------------------+--------------------+------------+-----------+--------------+--------------+---------------+---------+---------+
|           ABBASANTA|        REG| 1.251249107780157|                 0.0|          29|       1401|           205|           131|             79|40.128801| 8.817733|
|ACQUAVIVA DELLE F...|        REG|1.0412156536219817|                 1.0|          36|       2402|           158|            74|             35|40.892806|16.839826|
|         ACQUI TERME|        REG| 3.531089978054133|                 3.0|          41|       2734|           994|           453|            103| 44.67407| 8.474482|
|AGR

                                                                                

In [21]:
# create a file for each day of week with the statistics
if SAVE_COMPUTATIONS:
    for train_class in data_stop_stat_pandas["train_class"].unique():
        data_stop_stat_pandas[data_stop_stat_pandas["train_class"] == train_class].to_csv(("dataset_generated/data_stop/data_stop_class_{}.csv".format(train_class)), index=False)
    

### 1d. Statistics for each week day and train type


In [22]:
data_stop_stat = data_stop.groupBy("stop_name", "train_class", "day_of_week") \
    .agg(
        F.avg("stop_arrival_delay").alias("avg_arrival_delay"),
        F.percentile_approx("stop_arrival_delay", 0.5).alias("median_arrival_delay"),
        F.countDistinct("train_id").alias("count_trains"),
        F.count("train_id").alias("count_stops"),
        F.sum(F.col("3m_delay").cast("long")).alias("count_3m_delay"),
        F.sum(F.col("5m_delay").cast("long")).alias("count_5m_delay"),
        F.sum(F.col("10m_delay").cast("long")).alias("count_10m_delay"),
        F.first("stop_lat").alias("stop_lat"),
        F.first("stop_lon").alias("stop_lon")
    )

In [23]:
data_stop_stat_pandas = data_stop_stat.toPandas()

                                                                                

In [24]:
# for each combination of weekday and train_class, create a file with the statistics
if SAVE_COMPUTATIONS:
    for day in data_stop_stat_pandas["day_of_week"].unique():
        for train_class in data_stop_stat_pandas["train_class"].unique():
            data_stop_stat_pandas[(data_stop_stat_pandas["day_of_week"] == day) & (data_stop_stat_pandas["train_class"] == train_class)].to_csv(("dataset_generated/data_stop/data_stop_mix_{}_{}.csv".format(day, train_class)), index=False)


---

# 2. Train statistics
For each train, the goal is to store the following information:
1. Ordered list of the stations it stopped at
2. For each destination: 
    1. Average arrival delay
    2. Median arrival delay
    3. % of trains with delay > 3
    4. % of trains with delay > 5
    5. % of trains with delay > 10
    6. Number of days it stopped at the destination

To avoid keeping statistics for temporary stops and trains, we remove: 
- Trains that appeared in the dataset only <10  distinct times
- Stops for a train, that appeared less than 20% of the dates that the train appeared in the dataset

In [25]:
# first we add coordinates to each stop, so that we can drop stops for which we don't have the coordinates
df_trains = df.join(stops, on="stop_name", how="inner")


##### 1. For each train, we obtain the statistics of each of its stops, removing the stops that appear less than 20% of the dates that the train appeared in the dataset

In [26]:
print(df.columns)

['train_arrival_stop_name', 'train_class', 'train_cn', 'train_dl', 'train_number', 'train_arrival_time', 'oae', 'train_oaz', 'train_od', 'train_oo', 'train_departure_time', 'train_ope', 'train_opz', 'train_departure_stop_name', 'train_pr', 'train_arrival_delay', 'train_departure_delay', 'sea', 'train_sep', 'train_sub', 'day', 'month', 'year', 'date', 'stop_name', 'stop_arrival_time', 'stop_departure_time', 'stop_arrival_delay', 'stop_departure_delay']


Attention, we have to keep in mind that the same train number can refer to multiple trains, so we also have to group by the destination stop. 

In [27]:
# this shows that there's multiple trains with the same number
df \
    .filter((F.col("train_class") == "REG") & (F.col("train_number") == "4113"))\
    .select("train_arrival_stop_name")\
    .distinct()\
    .show()



+-----------------------+
|train_arrival_stop_name|
+-----------------------+
|           ROMA TERMINI|
|         COMO NORD LAGO|
+-----------------------+



                                                                                

In [28]:
# first we create a new column with the delay in minutes. For all stops this has to be the arrival_delay apart for the first stop of each train, for which it has to be the departure_delay
df_trains1 = df_trains.withColumn("delay", F.when(F.col("stop_arrival_delay") == "N", F.col("stop_departure_delay")).otherwise(F.col("stop_arrival_delay")))

# second, add a column "stop_arrival_time", which is the arrival time at the stop, in the format "HH:MM"
# if the stop is the first stop of the train, then the stop_arrival_time is the departure time of the train

df_trains1 = df_trains1\
    .withColumn("stop_time", F.when(F.col("stop_arrival_time") == 0, F.col("stop_departure_time")).otherwise(F.col("stop_arrival_time"))) \
    .withColumn("stop_time", F.date_format(F.col("stop_time").cast("timestamp"), "HH:mm"))

    

df_trains1.orderBy("stop_time").show(10)

23/05/21 10:56:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-----------------------+-----------+--------+--------+------------+------------------+----+----------+--------+--------+--------------------+---------+----------+-------------------------+--------+-------------------+---------------------+----+---------+---------+---+-----+----+----------+-----------------+-------------------+------------------+--------------------+-------+----------------+---------+---------+--------------+-----+---------+
|           stop_name|train_arrival_stop_name|train_class|train_cn|train_dl|train_number|train_arrival_time| oae| train_oaz|train_od|train_oo|train_departure_time|train_ope| train_opz|train_departure_stop_name|train_pr|train_arrival_delay|train_departure_delay| sea|train_sep|train_sub|day|month|year|      date|stop_arrival_time|stop_departure_time|stop_arrival_delay|stop_departure_delay|stop_id| stop_name_short| stop_lat| stop_lon|stop_id_region|delay|stop_time|
+--------------------+-----------------------+-----------+--------

In [29]:
# convert the delay to double and add a counter if the train was 3m, 5m, 10m late
# add a column with the number of distinct dates for each train
# remove the column if the delay cannot be casted to double
df_trains2 = df_trains1 \
    .withColumn("delay", F.col("delay").cast("double")) \
    .filter(F.col("delay").isNotNull()) \
    .withColumn("3m_delay", F.when(F.col("delay") >= 3, 1).otherwise(0)) \
    .withColumn("5m_delay", F.when(F.col("delay") >= 5, 1).otherwise(0)) \
    .withColumn("10m_delay", F.when(F.col("delay") >= 10, 1).otherwise(0))\
    .select("train_departure_stop_name", "train_arrival_stop_name", "train_class", "train_number", "date", "stop_id", "stop_time", "stop_name", "stop_name_short", "stop_lat", "stop_lon", "stop_id_region", "3m_delay", "5m_delay", "10m_delay", "delay")

df_trains2.show(1)

+-------------------------+-----------------------+-----------+------------+----------+-------+---------+--------------+---------------+---------+---------+--------------+--------+--------+---------+-----+
|train_departure_stop_name|train_arrival_stop_name|train_class|train_number|      date|stop_id|stop_time|     stop_name|stop_name_short| stop_lat| stop_lon|stop_id_region|3m_delay|5m_delay|10m_delay|delay|
+-------------------------+-----------------------+-----------+------------+----------+-------+---------+--------------+---------------+---------+---------+--------------+--------+--------+---------+-----+
|           FIRENZE S.M.N.|           ROMA TERMINI|        REG|        4113|2023-02-06| S06421|    21:14|FIRENZE S.M.N.| Firenze S.M.N.|43.776893|11.247373|          13.0|       1|       1|        1| 19.0|
+-------------------------+-----------------------+-----------+------------+----------+-------+---------+--------------+---------------+---------+---------+--------------+-----

In [30]:

# For each train, we obtain the statistics of each of its stops, removing the stops that appear less than 20% of the dates that the train appeared in the dataset
# add the stop incremental number
df_trains_stat1 = df_trains2.groupBy("train_class", "train_number", "train_departure_stop_name", "train_arrival_stop_name", "stop_name") \
    .agg(
        F.avg("delay").alias("avg_arrival_delay"),
        F.percentile_approx("delay", 0.5).alias("median_arrival_delay"),
        F.countDistinct("date").alias("count_dates_stop"),
        F.min("date").alias("first_date"),
        F.max("date").alias("last_date"),
        F.sum(F.col("3m_delay").cast("long")).alias("count_3m_delay"),
        F.sum(F.col("5m_delay").cast("long")).alias("count_5m_delay"),
        F.sum(F.col("10m_delay").cast("long")).alias("count_10m_delay"),
        F.first("stop_lat").alias("stop_lat"),
        F.first("stop_lon").alias("stop_lon"),
        F.mode("stop_time").alias("stop_time")
    )


In [31]:
# counts how many distinct dates a train apparead in the dataset
# and filter out trains that didn't appear at least 4 times a month (12 times in total)
MIN_THRESHOLD_TRAIN = 12

df_trains_counts = df_trains2.groupBy("train_class", "train_number", "train_departure_stop_name", "train_arrival_stop_name") \
    .agg(
        F.countDistinct("date").alias("count_dates_train")
    ) \
    .filter(F.col("count_dates_train") >= MIN_THRESHOLD_TRAIN)

In order to store the data we create a mapper from a (train_class, train_number, train_arrival_stop_name) to an id, and we store the information of that train on a file with the name of the id. To do that, we add a column which is a monotonic increasing id, and we use that as the id of that train. 

In [32]:
df_trains_counts = df_trains_counts \
    .withColumn("train_id", F.monotonically_increasing_id()) \
    .cache()

In [33]:
# merge the two datasets and filter out stations that appear in less than 20% of the dates
df_trains_stat2 = df_trains_stat1.join(df_trains_counts, on=["train_class", "train_number", "train_arrival_stop_name", "train_departure_stop_name"], how="inner") \
    .filter(F.col("count_dates_stop") >= F.col("count_dates_train") * 0.2) \
    .cache()

In [34]:
df_trains_stat2.show(10)

                                                                                

+-----------+------------+-----------------------+-------------------------+---------------+------------------+--------------------+----------------+----------+----------+--------------+--------------+---------------+---------+---------+---------+-----------------+--------+
|train_class|train_number|train_arrival_stop_name|train_departure_stop_name|      stop_name| avg_arrival_delay|median_arrival_delay|count_dates_stop|first_date| last_date|count_3m_delay|count_5m_delay|count_10m_delay| stop_lat| stop_lon|stop_time|count_dates_train|train_id|
+-----------+------------+-----------------------+-------------------------+---------------+------------------+--------------------+----------------+----------+----------+--------------+--------------+---------------+---------+---------+---------+-----------------+--------+
|         IC|         700|           ROMA TERMINI|              BATTIPAGLIA|         LATINA| 9.083333333333334|                 4.0|              12|2023-01-21|2023-02-01|    

In [35]:
from pyspark.sql.window import Window

# add the stop incremental number
df_trains_stat3 = df_trains_stat2 \
    .withColumn("stop_number", F.row_number().over(Window.partitionBy("train_id").orderBy("stop_time"))) \
    .sort("train_id", "stop_number") \
    .cache()


df_trains_stat3.show(50)



+-----------+------------+-----------------------+-------------------------+--------------------+--------------------+--------------------+----------------+----------+----------+--------------+--------------+---------------+---------+---------+---------+-----------------+--------+-----------+
|train_class|train_number|train_arrival_stop_name|train_departure_stop_name|           stop_name|   avg_arrival_delay|median_arrival_delay|count_dates_stop|first_date| last_date|count_3m_delay|count_5m_delay|count_10m_delay| stop_lat| stop_lon|stop_time|count_dates_train|train_id|stop_number|
+-----------+------------+-----------------------+-------------------------+--------------------+--------------------+--------------------+----------------+----------+----------+--------------+--------------+---------------+---------+---------+---------+-----------------+--------+-----------+
|        REG|       22821|                  RECCO|                   SAVONA|              SAVONA|  2.5238095238095237|

                                                                                

In [37]:
# now we store df_trains_stat2 in a csv file, partitioned by the column "train_id". 
# Each "train_id" should have a single file, and each file should contain the statistics of all the stops of the train

# remove folder if it already exists

if SAVE_COMPUTATIONS : 
    import shutil
    shutil.rmtree("dataset_generated/data_train_stat")


    df_trains_stat3 \
        .repartition("train_id") \
        .write \
        .partitionBy("train_id") \
        .option("header", "true") \
        .csv("dataset_generated/data_train_stat/data_train_stat.csv")


                                                                                

In [26]:
# read from file
df_trains_stat3 = spark.read.option("header", "true").csv("dataset_generated/data_train_stat/data_train_stat.csv")

                                                                                

Now, we want to elaborate some general stastistics about a train, and store a dataset which is an index of the train ids, and the statistics of that train.

The aggregated statistics that we want to store are: 
1. Average arrival delay at each destination
2. Median arrival delay at each destination
3. % of trains with delay > 3 at each destination
4. % of trains with delay > 5 at each destination
5. % of trains with delay > 10 at each destination
6. Number of days the train ran
7. Number of stops of the train


In [None]:
df_trains_stat3.show(1)

In [38]:
# Now, we want to elaborate some general stastistics about a train, and store a dataset which is an index of the train ids, and the statistics of that train.

# The aggregated statistics that we want to store are: 
# 1. Average arrival delay at each destination
# 2. Median arrival delay at each destination
# 3. % of trains with delay > 3 at each destination
# 4. % of trains with delay > 5 at each destination
# 5. % of trains with delay > 10 at each destination
# 6. Number of days the train ran
# 7. Number of stops of the train


df_trains_stat4 = df_trains_stat3 \
    .groupBy("train_id", "train_class", "train_number", "train_departure_stop_name", "train_arrival_stop_name") \
    .agg(
        F.avg("avg_arrival_delay").alias("avg_arrival_delay"),
        F.avg("median_arrival_delay").alias("median_arrival_delay"),
        (F.avg("count_3m_delay") / F.first("count_dates_train")).alias("perc_3m_delay"),
        (F.avg("count_5m_delay") / F.first("count_dates_train")).alias("perc_5m_delay"),
        (F.avg("count_10m_delay") / F.first("count_dates_train")).alias("perc_10m_delay"),
        F.first("count_dates_train").alias("count_dates_train"),
        F.countDistinct("stop_name").alias("count_stops_train"),
        F.first("first_date").alias("first_date"),
        F.first("last_date").alias("last_date"),
        F.min("stop_time").alias("departure_time"),
        F.max("stop_time").alias("arrival_time"),
    ) \
    .cache()

In [39]:
df_trains_stat4_pandas = df_trains_stat4.toPandas()

                                                                                

In [40]:
df_trains_stat4_pandas.head(10)

Unnamed: 0,train_id,train_class,train_number,train_departure_stop_name,train_arrival_stop_name,avg_arrival_delay,median_arrival_delay,perc_3m_delay,perc_5m_delay,perc_10m_delay,count_dates_train,count_stops_train,first_date,last_date,departure_time,arrival_time
0,94489280513,REG,4096,ROMA TERMINI,FIRENZE S.M.N.,4.621205,1.071429,0.441558,0.281656,0.13474,88,14,2023-01-02,2023-03-31,07:02,10:48
1,128849018888,REG,4565,FOLIGNO,ROMA TERMINI,0.47865,-0.285714,0.12406,0.041353,0.013158,76,7,2023-01-02,2023-03-31,05:55,08:00
2,137438953510,REG,2116,GENOVA P.PRINCIPE,TORINO P.NUOVA,1.689368,-0.875,0.166667,0.104167,0.058333,30,8,2023-01-01,2023-03-26,06:27,08:30
3,146028888076,REG,12984,CATANIA CENTRALE,GIARRE RIPOSTO,0.499278,-0.333333,0.072797,0.019157,0.015326,87,3,2023-01-01,2023-03-31,14:40,15:15
4,188978561024,REG,827,M N CADORNA,SARONNO,0.0,0.0,0.0,0.0,0.0,88,5,2023-01-01,2023-03-31,09:47,10:07
5,223338299438,REG,23209,MONTEBELLUNA,PADOVA,1.715833,1.25,0.39,0.13,0.0,25,4,2023-02-27,2023-03-31,09:46,10:33
6,309237645350,REG,10469,MILANO GRECO PIRELLI,STRADELLA,2.718519,1.666667,0.348148,0.17037,0.04,75,9,2023-01-02,2023-03-31,13:47,15:05
7,386547056691,REG,20214,COLLEFERRO-SEGNI-PALIANO,ROMA TERMINI,0.820789,0.555556,0.11828,0.008961,0.0,62,9,2023-01-02,2023-03-31,19:15,20:13
8,412316860444,REG,21700,PUNTA RAISI,PALERMO CENTRALE,0.814815,0.333333,0.079861,0.038194,0.009259,72,12,2023-01-07,2023-03-31,05:20,06:20
9,489626271804,REG,17631,FERRARA,RAVENNA,1.0,0.888889,0.194444,0.009259,0.0,12,9,2023-01-01,2023-03-26,16:37,17:48


In [41]:
if SAVE_COMPUTATIONS :    
    # store the file in a csv file  
    df_trains_stat4_pandas.to_csv("dataset_generated/data_train_index.csv", index=False)

    # zip the file
    !zip dataset_generated/data_train_index.zip dataset_generated/data_train_index.csv

updating: dataset_generated/data_train_index.csv (deflated 69%)


Now we store the data in a better format, which is a folder in which each file is a train, and the filename is the train id. 

In [42]:
import os
import shutil
# if the folder already exists, delete it
if os.path.exists("renamed_csv_files"):
    shutil.rmtree("renamed_csv_files")

# create a new directory to store the renamed CSV files
if not os.path.exists("renamed_csv_files"):
    os.mkdir("renamed_csv_files")

# loop through all directories in the "data_train_stat/data_train_stat.csv" directory that start with "train_id="
for dirpath, dirnames, filenames in os.walk("dataset_generated/data_train_stat/data_train_stat.csv"):
    for dirname in dirnames:
        if dirname.startswith("train_id="):
            # extract the ID from the directory name
            id = dirname.split("=")[1]
            # loop through all CSV files in the directory
            for filename in os.listdir(os.path.join(dirpath, dirname)):
                if filename.endswith(".csv"):
                    # rename the file to "[ID].csv" and move it to the "renamed_csv_files" directory
                    src_path = os.path.join(dirpath, dirname, filename)
                    dst_path = os.path.join("renamed_csv_files", f"{id}.csv")
                    shutil.copy(src_path, dst_path)


# create a zip file containing the "renamed_csv_files" directory, the files have to be in a directory when unzipped
shutil.make_archive("renamed_csv_files", "zip", "renamed_csv_files")

# delete the "renamed_csv_files" directory
shutil.rmtree("renamed_csv_files")

# 3. Path of trains over a map
Now that we have computed a dataset with the statistics for each train, in which we managed to extract the timetable, we can use it to plot the path of each train on a map. 

The task of matching information of a train, with its exact journey on the railway is a complex task. The algorithm developed by [Bast and Brosi (2019)](https://ad-publications.cs.uni-freiburg.de/SIGSPATIAL_Sparse%20map%20matching%202018.pdf) matches the GTFS schedule of a train with the OpenStreetMap railway network. Thankfully, they published the code of this algorithm on GitHub. Their library takes a GTFS schedule of a train and a railway network in OpenStreetMap format and returns the most likely journey of the train on the railway network in `shapefile` format. GTFS is a standard format for public transport schedules, promoted by Google.

First, we need to generate a GTFS timetable for our trains. Then, we can run the algorithm a produce a shapefile for each train.

In [50]:
trains_timetable = df_trains_stat3 \
    .select("train_id", "train_class", "train_number", "train_departure_stop_name", "train_arrival_stop_name", "stop_number", "stop_name", "stop_time", "stop_lat", "stop_lon")

# need to join with the stops to get stop_id that we have lost on the way
trains_timetable = trains_timetable \
    .join(stops, on=["stop_name", "stop_lat", "stop_lon"], how="inner")

trains_timetable_pandas = trains_timetable.toPandas()

                                                                                

In [51]:
trains_timetable_pandas.head(10)

Unnamed: 0,stop_name,stop_lat,stop_lon,train_id,train_class,train_number,train_departure_stop_name,train_arrival_stop_name,stop_number,stop_time,stop_id,stop_name_short,stop_id_region
0,SAVONA,44.306892,8.470259,0,REG,22821,SAVONA,RECCO,1,06:28,S04801,Savona,2.0
1,GENOVA VOLTRI,44.428467,8.758163,0,REG,22821,SAVONA,RECCO,2,06:48,S04534,Genova Voltri,2.0
2,GENOVA SESTRI PONENTE,44.422374,8.847707,0,REG,22821,SAVONA,RECCO,3,07:02,S04537,Genova Sestri P.,2.0
3,GENOVA SAMPIERDARENA,44.413102,8.887271,0,REG,22821,SAVONA,RECCO,4,07:13,S04220,GE Sampierdarena,2.0
4,GENOVA P.PRINCIPE,44.417784,8.9207,0,REG,22821,SAVONA,RECCO,5,07:20,S04700,Genova P.Princ.,2.0
5,GENOVA BRIGNOLE,44.407213,8.947553,0,REG,22821,SAVONA,RECCO,6,07:31,S04702,Genova Brignole,2.0
6,GENOVA QUARTO DEI MILLE,44.388607,8.99597,0,REG,22821,SAVONA,RECCO,7,07:41,S04704,Genova Quarto,2.0
7,GENOVA NERVI,44.381271,9.039895,0,REG,22821,SAVONA,RECCO,8,07:48,S04707,Genova Nervi,2.0
8,RECCO,44.361215,9.146788,0,REG,22821,SAVONA,RECCO,9,08:08,S04714,Recco,2.0
9,SARONNO,45.625309,9.030839,1,REG,876,SARONNO,M N CADORNA,1,19:53,S01933,SARONNO,1.0


In [45]:
import pandas as pd

# delete folder if it exists
import shutil
if os.path.exists("gtfs"):
    shutil.rmtree("gtfs")

# create folder gtfs 
if not os.path.exists("gtfs"):
    os.mkdir("gtfs")

# 1. Create the agency.txt file
agency = {
    "agency_id": "1",
    "agency_name": "Trenitalia",
    "agency_url": "https://www.trenitalia.com",
    "agency_timezone": "Europe/Rome",
    "agency_lang": "it",
    "agency_phone": ""
}
agency = pd.DataFrame(agency, index=[0])
agency.to_csv("gtfs/agency.txt", index=False)

In [62]:
# 2. create routes.txt
# docs: https://gtfs.org/schedule/reference/#routestxt


routes = pd.DataFrame()

# get unique tuples (train_id, train_class, train_number)
train_ids = trains_timetable_pandas[["train_id", "train_class", "train_number"]].drop_duplicates()

routes["route_id"] = train_ids["train_id"]
routes["route_short_name"] = train_ids["train_class"] + " " + train_ids["train_number"]
routes["route_long_name"] = train_ids["train_class"] + " " + train_ids["train_number"]

color_mapper = {
    "IC": "E0A434",
    "REG": "036864",
    "ICN": "E0A434",
    "AV": "DC263B",
    "EC": "DC263B",
}

routes["agency_id"] = "1"
routes["route_type"] = "2" # 2 is train

routes["route_color"] = routes["route_short_name"].apply(lambda x: color_mapper[x.split(" ")[0]])

routes.to_csv("gtfs/routes.txt", index=False)

In [63]:
# 3. create stops.txt
# docs: https://gtfs.org/schedule/reference/#stopstxt
stops_gtfs = trains_timetable_pandas[["stop_id", "stop_name", "stop_lat", "stop_lon"]].drop_duplicates()
stops_gtfs.to_csv("gtfs/stops.txt", index=False)

In [64]:
# 4. Tough one, create calendar.txt and trips.txt
# calendar describes the span of a service
# trips describes the service for a particular route

# 4a: create calendar.txt
# docs: https://gtfs.org/schedule/reference/#calendartxt
calendar = {
    # we just create a service that runs every day for the whole year
    "service_id": "1",
    "monday": "1",
    "tuesday": "1",
    "wednesday": "1",
    "thursday": "1",
    "friday": "1",
    "saturday": "1",
    "sunday": "1",
    "start_date": "19000101",
    "end_date": "21000101",
}
calendar = pd.DataFrame(calendar, index=[0])
calendar.to_csv("gtfs/calendar.txt", index=False)

In [65]:
# 4b: create trips.txt
# docs: https://gtfs.org/schedule/reference/#tripstxt
# trips = {
#     "route_id": "1",
#     "service_id": "1",
#     "trip_id": "1",
#     "trip_headsign": query_7["train_class"] + " " + query_7["train_number"],
#     "trip_short_name": query_7["train_class"] + " " + query_7["train_number"],
#     "direction_id": "",
#     "block_id": "",
#     "shape_id": "",
#     "wheelchair_accessible": "",
#     "bikes_allowed": "",
# }

# trips = pd.DataFrame(trips, index=[0])

trips = pd.DataFrame()

trips["route_id"] = train_ids["train_id"]
trips["service_id"] = "1"
trips["trip_id"] = train_ids["train_id"]
trips["trip_headsign"] = train_ids["train_class"] + " " + train_ids["train_number"]
trips["trip_short_name"] = train_ids["train_class"] + " " + train_ids["train_number"]
trips["direction_id"] = ""
trips["block_id"] = ""
trips["shape_id"] = ""
trips["wheelchair_accessible"] = ""
trips["bikes_allowed"] = ""


trips.to_csv("gtfs/trips.txt", index=False)

In [66]:
trains_timetable_pandas.head(10)

Unnamed: 0,stop_name,stop_lat,stop_lon,train_id,train_class,train_number,train_departure_stop_name,train_arrival_stop_name,stop_number,stop_time,stop_id,stop_name_short,stop_id_region
0,SAVONA,44.306892,8.470259,0,REG,22821,SAVONA,RECCO,1,06:28,S04801,Savona,2.0
1,GENOVA VOLTRI,44.428467,8.758163,0,REG,22821,SAVONA,RECCO,2,06:48,S04534,Genova Voltri,2.0
2,GENOVA SESTRI PONENTE,44.422374,8.847707,0,REG,22821,SAVONA,RECCO,3,07:02,S04537,Genova Sestri P.,2.0
3,GENOVA SAMPIERDARENA,44.413102,8.887271,0,REG,22821,SAVONA,RECCO,4,07:13,S04220,GE Sampierdarena,2.0
4,GENOVA P.PRINCIPE,44.417784,8.9207,0,REG,22821,SAVONA,RECCO,5,07:20,S04700,Genova P.Princ.,2.0
5,GENOVA BRIGNOLE,44.407213,8.947553,0,REG,22821,SAVONA,RECCO,6,07:31,S04702,Genova Brignole,2.0
6,GENOVA QUARTO DEI MILLE,44.388607,8.99597,0,REG,22821,SAVONA,RECCO,7,07:41,S04704,Genova Quarto,2.0
7,GENOVA NERVI,44.381271,9.039895,0,REG,22821,SAVONA,RECCO,8,07:48,S04707,Genova Nervi,2.0
8,RECCO,44.361215,9.146788,0,REG,22821,SAVONA,RECCO,9,08:08,S04714,Recco,2.0
9,SARONNO,45.625309,9.030839,1,REG,876,SARONNO,M N CADORNA,1,19:53,S01933,SARONNO,1.0


In [67]:
# 5. create stop_times.txt
# docs: https://gtfs.org/schedule/reference/#stop_timestxt
# convert stop_departure_time to HH:MM:SS using pyspark function

# stop_times: pd.DataFrame = train_data_day_df[["stop_id", "stop_departure_time", "stop_arrival_time"]] \
#     .rename(columns={"stop_id": "stop_id", "stop_departure_time": "departure_time", "stop_arrival_time": "arrival_time"})


# def convert_to_hh_mm_ss(time):
    
                                                                                                            
# # add column with stop_sequence
# stop_times["stop_sequence"] = stop_times.index + 1
# # add column with trip_id
# stop_times["trip_id"] = "1"
# # put departure time of the last stop equal to the arrival time
# stop_times.loc[stop_times.index[-1], "departure_time"] = stop_times.loc[stop_times.index[-1], "arrival_time"]
# store it

stop_times_gtfs = pd.DataFrame()

stop_times_gtfs["trip_id"] = trains_timetable_pandas["train_id"]
stop_times_gtfs["arrival_time"] = trains_timetable_pandas["stop_time"].apply(lambda x: x + ":00")
stop_times_gtfs["departure_time"] = trains_timetable_pandas["stop_time"].apply(lambda x: x + ":00")
stop_times_gtfs["stop_id"] = trains_timetable_pandas["stop_id"]
stop_times_gtfs["stop_sequence"] = trains_timetable_pandas["stop_number"]

stop_times_gtfs.to_csv("gtfs/stop_times.txt", index=False)

In [70]:
# generate a zip using libzip
import zipfile
zf = zipfile.ZipFile('gtfs.zip', mode='w')
try:
    zf.write("gtfs/agency.txt")
    zf.write("gtfs/calendar.txt")
    zf.write("gtfs/routes.txt")
    zf.write("gtfs/stops.txt")
    zf.write("gtfs/stop_times.txt")
    zf.write("gtfs/trips.txt")
finally:
    zf.close()

##### Run the algorithm
We downloaded and installed pfaedle as well as an OSM dump for Italy. 

In [74]:
os.getcwd()

'/Users/giacomoorsi/Documents/EPFL/MA4/Data Visualization/project-2023-rail-runners/data'

In [None]:
# get current folder name
# import os
# os.path.basename(os.getcwd())


In [72]:
"""pfaedle -x ~/Downloads/italy-latest.osm {}""".format(os.path.join(os.getcwd(), "gtfs"))

[2023-05-21 11:28:59.404] INFO : Reading GTFS feed gtfs ...
[2023-05-21 11:28:59.469] INFO : Matching shapes for mots <bus>
[2023-05-21 11:28:59.765] INFO : Matched 0 trips in 0.131 ms.
[2023-05-21 11:28:59.766] INFO : Matching shapes for mots <ferry>
[2023-05-21 11:29:00.057] INFO : Matched 0 trips in 0.1 ms.
[2023-05-21 11:29:00.058] INFO : Matching shapes for mots <tram>, <subway>
[2023-05-21 11:29:00.363] INFO : Matched 0 trips in 0.143 ms.
[2023-05-21 11:29:00.363] INFO : Matching shapes for mots <coach>
[2023-05-21 11:29:00.658] INFO : Matched 0 trips in 0.106 ms.
[2023-05-21 11:29:00.658] INFO : Matching shapes for mots <funicular>
[2023-05-21 11:29:00.949] INFO : Matched 0 trips in 0.089 ms.
[2023-05-21 11:29:00.949] INFO : Matching shapes for mots <rail>
[2023-05-21 11:29:00.956] INFO : Reading OSM file /Users/giacomoorsi/Downloads/italy-latest.osm ... 
[2023-05-21 11:29:01.048] ERROR: Could not parse OSM data, reason was:
/Users/giacomoorsi/Downloads/italy-latest.osm at posit

Now that we have all the shapes in the "shapes.txt" file, we want to separate all those files into one file for each train.

In [36]:
import pandas as pd

shapes_gtfs = pd.read_csv("gtfs-out/shapes.txt")
trips_gtfs = pd.read_csv("gtfs-out/trips.txt")

shapes_gtfs.head(10)

Unnamed: 0,shape_id,shape_pt_lat,shape_pt_lon,shape_pt_sequence,shape_dist_traveled
0,shp_2_1,45.060719,12.054278,1,0.0
1,shp_2_1,45.060688,12.055992,2,134.766
2,shp_2_1,45.060673,12.056358,3,163.568
3,shp_2_1,45.060551,12.057537,4,257.302
4,shp_2_1,45.060535,12.058019,5,295.222
5,shp_2_1,45.060532,12.058478,6,331.315
6,shp_2_1,45.060558,12.05903,7,374.774
7,shp_2_1,45.060612,12.059502,8,412.392
8,shp_2_1,45.060711,12.060016,9,454.289
9,shp_2_1,45.060802,12.060353,10,482.701


In [37]:
trips_gtfs = trips_gtfs[["trip_id","shape_id"]]
# merge columns on shape_id
shapes_gtfs_merged = shapes_gtfs.merge(trips_gtfs, on="shape_id", how="inner")

In [38]:
# drop shape_id
print("The number of distinct trains is ", shapes_gtfs_merged["trip_id"].nunique(), "while the number of distinct shapes is ", shapes_gtfs_merged["shape_id"].nunique())

The number of distinct trains is  9983 while the number of distinct shapes is  2707


We could save some space by storing the shapes independently from the trains, since there's multiple trains that share the same shape. However, to keep the frontend of the website simple, we decided to store the shape of each train in a separated file. 

In [40]:
pd.unique(shapes_gtfs_merged["trip_id"])

array([1236950581288, 1322849927205, 1357209665555, ...,  893353197609,
        996432412709, 1142461300786])

In [44]:
from tqdm import tqdm

# delete folder if exists
import shutil
if os.path.exists("dataset_generated/trains_shapes"):
    shutil.rmtree("dataset_generated/trains_shapes")

# create folder
os.mkdir("dataset_generated/trains_shapes")

# for each trid_id, save all the corrisponding shapes in a file
for trip_id in tqdm(pd.unique(shapes_gtfs_merged["trip_id"])):
    shapes_gtfs_merged[shapes_gtfs_merged["trip_id"] == trip_id][["shape_pt_lat", "shape_pt_lon"]].to_csv("dataset_generated/trains_shapes/{}.csv".format(trip_id), index=False)


100%|██████████| 9983/9983 [00:46<00:00, 215.31it/s]


In [45]:
# zip the folder with shutil
shutil.make_archive("dataset_generated/trains_shapes", 'zip', "dataset_generated/trains_shapes")

'/Users/giacomoorsi/Documents/EPFL/MA4/DataVisualization/project-2023-rail-runners/data/dataset_generated/trains_shapes.zip'