# Data wrangling 
We explore the data to get a better understanding. The data is in `json` format. We want to covert the data to a table, in which every row is a stop of a train (e.g. the train RE 123 from Milano to Rome stopped at the station Firenze with a delay of 5 minutes). This should make queries easier and more efficient. The size of the data will grow but we will store in columnar format to enable compression. We rename all the columns in order to have a meaningful yet simple naming. 

In [1]:
import os
import json

### Data
First, you need to download the data. In order to do so, go to [this](https://mega.nz/#F!vIAyDaTJ!PcLTFDbKaJaa0FZIEh5E-w) folder and download all (or only some) of the JSON files. Then, put them in the `DATA_FOLDER`. 

In [14]:
DATA_FOLDER = "..." # replace with the folder where you downloaded the data
file = "dati_02_09_2022.json"

In [23]:
# Initial Spark
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import explode
import pyspark.sql.types as T
from pyspark.sql.functions import lit, col, to_date



In [15]:

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Trenitalia") \
    .getOrCreate()

# set driver memory to 4GB
spark.sparkContext._conf.setAll([('spark.driver.memory', '4g')])

# get sc 
sc = spark.sparkContext

The schema is as follows:
```
{'type': 'struct',
 'fields': [{'name': 'avvisiRFI',
   'type': {'type': 'array',
    'elementType': {'type': 'struct',
     'fields': [{'name': 'corpo',
       'type': 'string',
       'nullable': True,
       'metadata': {}},
      {'name': 'data', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'link', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'titolo', 'type': 'string', 'nullable': True, 'metadata': {}}]},
    'containsNull': True},
   'nullable': True,
   'metadata': {}},
  {'name': 'avvisiTI',
   'type': {'type': 'array',
    'elementType': {'type': 'struct',
     'fields': [{'name': 'corpo',
       'type': 'string',
       'nullable': True,
       'metadata': {}},
      {'name': 'data', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'titolo', 'type': 'string', 'nullable': True, 'metadata': {}}]},
    'containsNull': True},
   'nullable': True,
   'metadata': {}},
  {'name': 'giorno', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'riassunto',
   'type': {'type': 'struct',
    'fields': [{'name': 'anticipoAccumulato',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'arrivoAnticipo',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'arrivoInOrario',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'arrivoInRitardo',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'circolazione',
      'type': 'string',
      'nullable': True,
      'metadata': {}},
     {'name': 'dataAggiornamento',
      'type': 'string',
      'nullable': True,
      'metadata': {}},
     {'name': 'numEC', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'numEN', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'numES', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'numIC', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'numNC', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'numREG', 'type': 'long', 'nullable': True, 'metadata': {}},
     {'name': 'partenzaInOrario',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'partenzaRitardo',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'ritardoAccumulato',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'ritardoArrivoTrenoPeggiore',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'ritardoPartenzaTrenoPeggiore',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'treniCancellati',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'treniCircolati',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'treniMonitorati',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'treniRegolari',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'treniRiprogrammati',
      'type': 'long',
      'nullable': True,
      'metadata': {}},
     {'name': 'trenoPeggiore',
      'type': 'string',
      'nullable': True,
      'metadata': {}}]},
   'nullable': True,
   'metadata': {}},
  {'name': 'timeZone', 'type': 'long', 'nullable': True, 'metadata': {}},
  {'name': 'treni',
   'type': {'type': 'array',
    'elementType': {'type': 'struct',
     'fields': [{'name': 'a',
       'type': 'string',
       'nullable': True,
       'metadata': {}},
      {'name': 'c', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'cn', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'dl', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'fr',
       'type': {'type': 'array',
        'elementType': {'type': 'struct',
         'fields': [{'name': 'n',
           'type': 'string',
           'nullable': True,
           'metadata': {}},
          {'name': 'oa', 'type': 'long', 'nullable': True, 'metadata': {}},
          {'name': 'op', 'type': 'long', 'nullable': True, 'metadata': {}},
          {'name': 'ra', 'type': 'string', 'nullable': True, 'metadata': {}},
          {'name': 'rp', 'type': 'string', 'nullable': True, 'metadata': {}}]},
        'containsNull': True},
       'nullable': True,
       'metadata': {}},
      {'name': 'n', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'oa', 'type': 'long', 'nullable': True, 'metadata': {}},
      {'name': 'oaz', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'od', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'oo', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'op', 'type': 'long', 'nullable': True, 'metadata': {}},
      {'name': 'ope', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'opz', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'p', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'pr', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'ra', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'rp', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'sep', 'type': 'string', 'nullable': True, 'metadata': {}},
      {'name': 'sub', 'type': 'string', 'nullable': True, 'metadata': {}}]},
    'containsNull': True},
   'nullable': True,
   'metadata': {}}]}


```

# 1. Extract the `treni` data
There are multiple information in a json dump. We now consider only information about the trains, excluding riassunto e avvisi

In [None]:
df = spark.read.json(os.path.join(DATA_FOLDER, "dati_02_09_2022.json"))

date = df.select("giorno")
date = date.select("giorno").first().asDict()["giorno"]
print(date)

day, month, year = date.split("/")

In [None]:
# extract the trains information
trains = df.select("treni")

In [None]:
# read the first row in treni and extract dataframe using the specified schema in `schema`
trains1 = trains.select(explode("treni").alias("treni"))

# extract the struct of each row
trains2 = trains1.select("treni.*")

In [None]:
trains2.show(5)

In [None]:
# show list of columns
print(list(trains2.columns))

In [None]:
columns_mapper = {
    "a": "train_arrival_stop_name", 
    "c": "train_class",
    "cn" : "train_cn", # unknown
    "dl": "train_dl", # unknown
    "fr": "stops",
    "n" : "train_number",
    "oa": "train_arrival_time",
    "oaz": "train_oaz", # unknown
    "od": "train_od", # unknown
    "oo" : "train_oo", # unknown
    "op": "train_departure_time",
    "ope" : "train_ope", # unknown
    "opz" : "train_opz", # unknown
    "p" : "train_departure_stop_name",
    "pr": "train_pr", # unknown
    "ra": "train_arrival_delay",
    "rp" : "train_departure_delay",
    "sep" : "train_sep", # unknown
    "sub" : "train_sub" # unknown
}

In [None]:
# rename all columns
trains3 = trains2
for k, v in columns_mapper.items():
    trains3 = trains3.withColumnRenamed(k, v)

trains3.show(5)

In [None]:
stops_columns_mapper = {
    "n": "stop_name",
    "oa": "stop_arrival_time",
    "op": "stop_departure_time",
    "ra": "stop_arrival_delay",
    "rp": "stop_departure_delay"
}

In [None]:
# the stops are stored in an array of structs ([{...}, {...}, ...]). 
# by using explode, we can extract the struct and create a new row for each stop
# we then drop the original array of structs `stops` and we extract from the structs {...} the columns by using "stops_extracted.*"
# we then drop the original struct and we rename the columns using the mapper "stops_columns_mapper
trains4 = trains3.select("*", explode("stops").alias("stops_extracted")) \
    .drop("stops") \
    .select("*", "stops_extracted.*") \
    .drop("stops_extracted")

for k, v in stops_columns_mapper.items():
    trains4 = trains4.withColumnRenamed(k, v)

trains4.show(5)

In [None]:


trains5 = trains4.withColumn("day", lit(day)).withColumn("month", lit(month)).withColumn("year", lit(year)).withColumn("date", lit(date))

# convert day to date format 
from pyspark.sql.functions import to_date

trains6 = trains5.withColumn("date", to_date("date", "dd/MM/yyyy"))
trains6.select(col("day"), col("month"), col("year"), col("date")).show(1)

In [None]:
# all done! now this should be done for all the files in the folder and then we can save the data in parquet format
# we can then use the parquet files to do the analysis

In [None]:
new_day = day.replace("/", "-")

In [None]:
# store the new dataframe in parquet format
trains6.write.parquet(os.path.join(DATA_FOLDER, "parquet", new_day + ".parquet"))

In [None]:
# read the parquet file
trains7 = spark.read.parquet(os.path.join(DATA_FOLDER, "parquet", new_day + ".parquet"))

# 2. Create a big dataframe containing all the daily datasets

By using the code of point 1, we create a big dataframe combining all the daily datasets. 

Approach: read all the datasets, extract the `treni` data, add a column with the date for each one of them, and then apply the other transformations. 

In [16]:
from tqdm import tqdm


In [24]:
def read_preprocess_daily_dataset(file) -> pyspark.sql.DataFrame: 
    """Here we just add the date column and we explore "treni" column"""
    df = spark.read.json(os.path.join(DATA_FOLDER, file), multiLine=True)
    date = df.select("giorno")
    date = date.select("giorno").first().asDict()["giorno"]
    day, month, year = date.split("/")

    df = df.select("treni").select(explode("treni").alias("treni")).select("treni.*")
    df = df.withColumn("day", lit(day)).withColumn("month", lit(month)).withColumn("year", lit(year)).withColumn("date", lit(date))
    df = df.withColumn("date", to_date("date", "dd/MM/yyyy"))
    
    return df

In [25]:
def read_all_daily_datasets() -> pyspark.sql.DataFrame: 
    """Reads all daily datasets and union them after applying daily preprocessing"""
    all_files = os.listdir(DATA_FOLDER)
    all_files = [f for f in all_files if f.endswith(".json")]
    all_files.sort()
    df = read_preprocess_daily_dataset(all_files[0])
    for f in tqdm(all_files[1:]):
        df = df.unionByName(read_preprocess_daily_dataset(f), allowMissingColumns=True)
    return df

In [26]:
def get_date_from_file_name(file_name: str) -> str:
    """Extracts the date from the file name"""
    date = file_name.split("_")[1:]
    date = "_".join(date)
    date = date.split(".")[0]
    # convert to yyyy-mm-dd format
    day, month, year = date.split("_")
    date = "{}-{}-{}".format(year, month, day)
    return date

In [27]:
from typing import List

def get_available_files() -> List[str]:
    """Returns the list of files that are available in the data folder"""
    all_files = os.listdir(DATA_FOLDER)
    all_files = [f for f in all_files if f.endswith(".json")]
    all_files.sort(key= lambda x: get_date_from_file_name(x))
    return all_files

In [28]:
print("There are {} files in the data folder".format(len(get_available_files())))
print("The first file is {}".format(get_available_files()[0]))
print("The last file is {}".format(get_available_files()[-1]))

There are 90 files in the data folder
The first file is dati_01_01_2023.json
The last file is dati_31_03_2023.json


In [29]:
df = read_all_daily_datasets()

100%|██████████| 89/89 [01:16<00:00,  1.17it/s]


In [30]:
def preprocess_dataset(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame: 
    """Procesess the entire dataset of multiple days"""

    columns_mapper = {
        "a": "train_arrival_stop_name", 
        "c": "train_class",
        "cn" : "train_cn", # unknown
        "dl": "train_dl", # unknown
        "fr": "stops",
        "n" : "train_number",
        "oa": "train_arrival_time",
        "oaz": "train_oaz", # unknown
        "od": "train_od", # unknown
        "oo" : "train_oo", # unknown
        "op": "train_departure_time",
        "ope" : "train_ope", # unknown
        "opz" : "train_opz", # unknown
        "p" : "train_departure_stop_name",
        "pr": "train_pr", # unknown
        "ra": "train_arrival_delay",
        "rp" : "train_departure_delay",
        "sep" : "train_sep", # unknown
        "sub" : "train_sub" # unknown
    }

    # rename all columns
    for k, v in columns_mapper.items():
        df = df.withColumnRenamed(k, v)

    stops_columns_mapper = {
        "n": "stop_name",
        "oa": "stop_arrival_time",
        "op": "stop_departure_time",
        "ra": "stop_arrival_delay",
        "rp": "stop_departure_delay"
    }

    df = df.select("*", explode("stops").alias("stops_extracted")) \
    .drop("stops") \
    .select("*", "stops_extracted.*") \
    .drop("stops_extracted")

    for k, v in stops_columns_mapper.items():
        df = df.withColumnRenamed(k, v)

    return df

In [31]:
df2 = preprocess_dataset(df)

### Store the data
Here we store all data in a joined Parquet dataset, which can be used for downstream analysis. 

In [32]:
# store in columnar format
df2.write.parquet(os.path.join(DATA_FOLDER, "parquet", "all.parquet"))

23/04/22 20:24:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/04/22 20:24:44 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


                                                                                