# Imports

In [1]:
import ast
import gc
import os.path
import zipfile
from datetime import datetime

import geopandas as gpd
import movingpandas as mpd
import pandas as pd
from shapely.geometry import LineString
from shapely.geometry import Point
from sklearn.model_selection import train_test_split
from srai.regionalizers import geocode_to_region_gdf
from tqdm.autonotebook import tqdm
from tqdm.contrib.concurrent import process_map

  from tqdm.autonotebook import tqdm


In [2]:
gpd.options.io_engine = "pyogrio"

# Parameters

In [3]:
# trajectory length
lower_limit = 20  # 5 minutes
upper_limit = 240  # 1 hour

# whole dataset (~1.5mln) is too big, have to use subset
subset_size = 500_000

# multiprocessing
chunk_size = 1000
num_workers = 20

# outlier detection
outlier_max_speed = 120

# trajectory generalization
generalization_tolerance = 0.0001

# Data Loading

In [4]:
# https://www.kaggle.com/competitions/pkdd-15-taxi-trip-time-prediction-ii/data?select=train.csv.zip
zipfile_porto_taxi = zipfile.ZipFile(file="data/train.csv.zip")

In [5]:
df_porto_taxi = pd.read_csv(zipfile_porto_taxi.open("train.csv"))

In [6]:
df_porto_taxi.rename(
    columns={
        "TRIP_ID": "trip_id",
        "CALL_TYPE": "call_type",
        "ORIGIN_CALL": "origin_call",
        "ORIGIN_STAND": "origin_stand",
        "TAXI_ID": "taxi_id",
        "TIMESTAMP": "timestamp",
        "DAY_TYPE": "day_type",
        "MISSING_DATA": "missing_data",
        "POLYLINE": "geometry",
    },
    inplace=True,
)

In [7]:
df_porto_taxi.head()

Unnamed: 0,trip_id,call_type,origin_call,origin_stand,taxi_id,timestamp,day_type,missing_data,geometry
0,1372636858620000589,C,,,20000589,1372636858,A,False,"[[-8.618643,41.141412],[-8.618499,41.141376],[..."
1,1372637303620000596,B,,7.0,20000596,1372637303,A,False,"[[-8.639847,41.159826],[-8.640351,41.159871],[..."
2,1372636951620000320,C,,,20000320,1372636951,A,False,"[[-8.612964,41.140359],[-8.613378,41.14035],[-..."
3,1372636854620000520,C,,,20000520,1372636854,A,False,"[[-8.574678,41.151951],[-8.574705,41.151942],[..."
4,1372637091620000337,C,,,20000337,1372637091,A,False,"[[-8.645994,41.18049],[-8.645949,41.180517],[-..."


# Remove missing data

In [8]:
df_porto_taxi = df_porto_taxi[df_porto_taxi["missing_data"] == False]

In [9]:
df_porto_taxi.drop("missing_data", axis=1, inplace=True)

# Drop duplicates

In [10]:
df_porto_taxi.drop_duplicates(subset="trip_id", keep=False, inplace=True)

# Convert string to list

In [11]:
df_porto_taxi["geometry"] = process_map(
    ast.literal_eval,
    df_porto_taxi["geometry"],
    chunksize=chunk_size,
    max_workers=num_workers,
)

  0%|          | 0/1710499 [00:00<?, ?it/s]

# Calculate trajectory length

In [12]:
df_porto_taxi["length"] = process_map(
    len, df_porto_taxi["geometry"], chunksize=chunk_size, max_workers=num_workers
)

  0%|          | 0/1710499 [00:00<?, ?it/s]

In [13]:
"""
The total travel time of the trip is defined as the (number of points-1) x 15 seconds
"""

df_porto_taxi = df_porto_taxi[
    (df_porto_taxi["length"] > lower_limit) & (df_porto_taxi["length"] <= upper_limit)
]

# Create subset
Due to the size of original dataset, subset needs to be created to avoid MemoryError

In [14]:
df_leftovers, df_porto_taxi_subset = train_test_split(
    df_porto_taxi, test_size=subset_size, stratify=df_porto_taxi["length"]
)

In [15]:
df_porto_taxi_subset.drop("length", axis=1, inplace=True)

# Activate garbage collection
Remove unused variables

In [16]:
del zipfile_porto_taxi
del df_porto_taxi
del df_leftovers

gc.collect()

38

# Convert list to LineString

In [17]:
df_porto_taxi_subset["geometry"] = process_map(
    LineString,
    df_porto_taxi_subset["geometry"],
    chunksize=chunk_size,
    max_workers=num_workers,
)

  0%|          | 0/500000 [00:00<?, ?it/s]

# Convert LineString to Point

In [18]:
exploded_rows = []

for idx, row in tqdm(
    df_porto_taxi_subset.iterrows(), total=df_porto_taxi_subset.shape[0]
):
    start_timestamp = row.timestamp
    current_timestamp = start_timestamp
    for xy in row.geometry.coords:
        point = Point(xy)
        row_dict = row.to_dict()
        row_dict["geometry"] = point
        row_dict["timestamp"] = current_timestamp
        current_timestamp += 15
        exploded_rows.append(row_dict)

  0%|          | 0/500000 [00:00<?, ?it/s]

# Create GeoDataFrame

In [19]:
gdf_porto_taxi_points = gpd.GeoDataFrame(
    exploded_rows, geometry="geometry", crs="EPSG:4326"
)

In [20]:
del exploded_rows

gc.collect()

18

In [21]:
gdf_porto_taxi_points["timestamp"] = gdf_porto_taxi_points["timestamp"].apply(
    lambda x: datetime.fromtimestamp(x)
)

# Restricting to Porto Area

In [22]:
porto_area = geocode_to_region_gdf("Porto District, Portugal")

In [23]:
gdf_porto_taxi_points_inside_porto = gdf_porto_taxi_points.sjoin(porto_area)

In [24]:
gdf_merged = gdf_porto_taxi_points.merge(
    gdf_porto_taxi_points_inside_porto, how="left", indicator=True
)
df_porto_taxi_points_outside_porto = gdf_merged[gdf_merged["_merge"] == "left_only"]

In [25]:
trajectories_outside_porto = list(
    df_porto_taxi_points_outside_porto["trip_id"].unique()
)

In [26]:
gdf_porto_taxi_points = gdf_porto_taxi_points[
    ~gdf_porto_taxi_points["trip_id"].isin(trajectories_outside_porto)
]

# Trajectory Collection

In [27]:
trajectory_collection = mpd.TrajectoryCollection(
    data=gdf_porto_taxi_points, traj_id_col="trip_id", t="timestamp"
)

### Speed calculation

In [28]:
trajectory_collection.add_speed(
    units=("km", "h"), n_threads=num_workers, overwrite=True
)

TrajectoryCollection with 498557 trajectories

### Outliers removal

In [29]:
trajectory_collection = mpd.OutlierCleaner(trajectory_collection).clean(
    v_max=outlier_max_speed, units=("km", "h")
)

In [30]:
valid_trajectories = [
    trajectory
    for trajectory in trajectory_collection.trajectories
    if trajectory.size() >= 2
]

In [31]:
valid_trajectory_collection = mpd.TrajectoryCollection(
    data=valid_trajectories, traj_id_col="trip_id", t="timestamp"
)

### Generalization

In [32]:
trajectory_collection = mpd.DouglasPeuckerGeneralizer(
    valid_trajectory_collection
).generalize(tolerance=generalization_tolerance)

### Speed re-calculation
It is not automatically calculated after outliers are removed

In [33]:
trajectory_collection.add_speed(
    units=("km", "h"), n_threads=num_workers, overwrite=True
)

TrajectoryCollection with 498556 trajectories

# Conversion to Point GeoDataFrame

In [34]:
gdf_trajectory_point_collection = trajectory_collection.to_point_gdf().sort_values(
    by=["trip_id", "timestamp"]
)

In [35]:
gdf_trajectory_point_collection.reset_index(inplace=True)

# Adapt to Parquet format on HuggingFace

## Separate Point to x, y coordinates

In [36]:
gdf_trajectory_point_collection[
    "longitude"
] = gdf_trajectory_point_collection.geometry.x
gdf_trajectory_point_collection["latitude"] = gdf_trajectory_point_collection.geometry.y

In [37]:
gdf_trajectory_point_collection.drop("geometry", axis=1, inplace=True)

## Convert datetime to timestamp

In [38]:
gdf_trajectory_point_collection["timestamp"] = gdf_trajectory_point_collection[
    "timestamp"
].apply(lambda x: datetime.timestamp(x))

# Save to parquet

In [39]:
gdf_trajectory_point_collection.to_parquet(os.path.join("data", "porto_taxi.parquet"))