# 🧼 Data Preprocessing Pipeline

This notebook provides a comprehensive data preprocessing pipeline for the transport dataset from the year 2022. The main goal is to clean and prepare the data for further analysis, ensuring its consistency and quality.

The preprocessing pipeline includes the following steps:

1. Loading data from the given files
2. Translating column names
3. Processing product IDs
4. Processing transport types
5. Downloading and loading BAV list (Betriebszentrale AV)
6. Processing stop data
7. Extracting stop data
8. Processing arrival and departure times
9. Handling inconsistent rows
10. Extracting operator data
11. Deleting unnecessary columns
12. Adding delay columns
13. Saving preprocessed data

For memory considerations, the data is processed and saved day by day. A second step is performed using Spark, which is not included in this notebook, but can be found in the [spark](../../spark) folder.

Furthermore, data about operators and stops is provided in separate files. These files are also loaded and processed in this notebook.

The processed data is saved in the `data/preprocessed` folder, as three separate files:
- `data/preprocessed/operators.csv`
- `data/preprocessed/stops.csv`
- `data/preprocessed/transports.csv`

In [15]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [16]:
import sys
import os

path_to_preprocessing = os.path.join('..', '..', 'src')
sys.path.insert(0, path_to_preprocessing)

from preprocessing.preprocessing_pipeline import preprocess_files
from preprocessing.merge_data import merge_and_save_operator_data, merge_and_save_stop_data, merge_and_save_transport_data

In [95]:
preprocess_files(
    overwrite_existing_file=True,
    print_progress=False
    )

📁 Found 253 valid file(s) in the data folder.


In [130]:
merge_and_save_operator_data()

In [167]:
merge_and_save_stop_data()

In [1]:
import pandas as pd

data = pd.read_parquet("../../data/processed/transports.parquet")
data.head()

Unnamed: 0,trip_id,product_id,line_text,transport_type,stop_id,arrival_time,departure_time,mean_arrival_delay,mean_departure_delay,median_arrival_delay,median_departure_delay,std_arrival_delay,std_departure_delay,n_arrival_delay,n_departure_delay,n_cancelled,n_through_trip,n_additional_trip,n_entries
0,80:06____:17171:000,Train,RB,RB,8500090,14:50:00,,293.939394,,120.0,,388.229414,,68,0,4,0,0,104
1,80:06____:17261:000,Train,RB,RB,8500090,,15:53:00,,61.621622,,0.0,,129.218022,0,9,1,0,0,104
2,80:800693:3053:000,Train,IRE3,IRE,8503424,11:58:00,12:00:00,151.539474,127.605263,41.0,19.0,627.797068,622.499501,60,73,2,0,0,78
3,80:80____:2887:000,Train,ICE,ICE,8500090,22:46:00,,1080.0,,1080.0,,,,1,0,0,0,1,1
4,80:sbg034:14004,Bus,Bus7349,B,8573327,09:07:00,,2.4,,0.0,,29.44332,,5,0,0,0,0,100


In [15]:
data.shape[0]

65804937

In [24]:
data_sub = data[data["n_entries"] > 10]

In [25]:
data_sub.shape[0]

9580419

In [26]:
res = data_sub.groupby(['trip_id', 'stop_id']) \
    .agg({'product_id': 'count'}) \
    .rename(columns={'product_id': 'count'}) \
    .reset_index()

res.sort_values('count', ascending=False).head(10)

Unnamed: 0,trip_id,stop_id,count
3964126,85:834:55232,8504723,12
3996553,85:834:65010,8588341,10
3999954,85:834:65142,8588341,10
3964052,85:834:55226,8504723,10
3999645,85:834:65130,8588341,10
3997165,85:834:65034,8588341,10
4000263,85:834:65154,8588341,10
2197198,85:801:205-1657,8572502,9
2212774,85:801:213-1657,8572502,9
2423327,85:801:315-1658,8572502,9
