In [1]:
import random

import pandas as pd
from ptrail.core.Datasets import Datasets
from ptrail.features.kinematic_features import KinematicFeatures
from ptrail.preprocessing.statistics import Statistics
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, ExtraTreesClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.metrics import f1_score, accuracy_score

from src.augmentation.augment import Augmentation
from src.selection.select import Selection
from src.utils.general_utils import Utilities

In [2]:
BASE = 'base'
TRAINING = 'training'
TEST_X = 'test_x'
TEST_Y = 'test_y'
RANDOM_SELECTED = 'random_selected'
PROPORTIONAL_SELECTED = 'proportional_selected'
FEWEST_SELECTED = 'fewest_selected'
REPRESENTATIVE_SELECTED = 'representative_selected'
BALANCED_ON = 'balanced_on'
BALANCED_IN = 'balanced_in'

In [3]:
def augment_trajectories(dataset, ids_to_augment, circle, class_col):
    dataset = Augmentation.augment_trajectories_with_randomly_generated_points(dataset,
                                                                               ids_to_augment=ids_to_augment,
                                                                               circle=circle)

    for i in range(1, 3):
        dataset = Augmentation.augment_trajectories_with_randomly_generated_points(dataset,
                                                                                   ids_to_augment=ids_to_augment,
                                                                                   circle=circle)

    pivoted = Statistics.pivot_stats_df(
        dataframe=Statistics.generate_kinematic_stats(dataset, class_col), target_col_name=class_col).dropna()

    return pivoted.drop(columns=[class_col]), pivoted[class_col]


In [4]:
traffic_data = Datasets.load_traffic_data()
ready_dataset = KinematicFeatures.create_distance_column(traffic_data).reset_index()

------------------------ Dataset Facts ------------------------------

Number of unique Trajectories in the data: 125
Number of points in the data: 44905
Dataset time range: 0 days 00:00:59.900000
Datatype of the DataFrame: <class 'ptrail.core.TrajectoryDF.PTRAILDataFrame'>
Dataset Bounding Box: (34.7107417, 135.4640652, 34.7156517, 135.4702002)

---------------------------------------------------------------------


In [5]:
def get_test_train_data(seed_val, class_col, k=0.8):
    """
        Given the seed value and a proportion, split the
        data into training and testing set and return it.

        Parameters
        ----------
        seed_val: int
            The seed value to use to control the randomness
            while selecting the train-test split.
        k: float
            The percent of data to be used as training data.

        Returns
        -------
            tuple:
                training, testing_x and testing_y
    """
    # Get all the Trajectory Ids and set the random state.
    traj_ids = list(ready_dataset['traj_id'].unique())
    random.seed(seed_val)

    # Select the ids to be used as training set and the calculate subsequent testing ids.
    train_size = int(len(traj_ids) * k)
    train_traj_ids = random.sample(traj_ids, train_size)
    test_traj_ids = [id_ for id_ in traj_ids if id_ not in train_traj_ids]

    # Split the data into training and testing sets.
    training = ready_dataset.loc[ready_dataset.traj_id.isin(train_traj_ids)]
    testing = ready_dataset.loc[ready_dataset.traj_id.isin(test_traj_ids)]

    pivoted_test = Statistics.pivot_stats_df(Statistics.generate_kinematic_stats(testing, class_col, False), class_col).dropna()

    return training.dropna(), pivoted_test.drop(columns=[class_col]), pivoted_test[class_col]

In [6]:
def get_iterable_map(seed_val: int, class_col):
    """
        Given the seed value, generate the dataset based on it,
        and return a dictionary that contains the following values:
            | 1. Training dataset
            | 2. Testing X data.
            | 3. Testing Y data.
            | 4. Randomly selected trajectories to augment.
            | 5. Proportionally selected trajectories to augment.
            | 6. Fewest selected trajectories to augment.
            | 7. Representative selected trajectories to augment.
            | 8. Balanced using ON strategy Dataset.
            | 9. Balanced using IN strategy Dataset.

        Parameters
        ----------
            seed_val: int
                The seed value to use to control the randomness
                while selecting the train-test split.

        Returns
        -------
            dict:
                Dictionary with the aforementioned values.
    """
    training, test_x, test_y = get_test_train_data(seed_val, class_col)

    # -------------------------- The trajectory selection strategies --------------------------------- #
    # Random selection.
    random_selected_ids = Selection.select_randomly(training, seed=seed_val, k=0.2)

    # Proportional selection.
    proportional_selected = Selection.select_trajectories_proportionally(training, classification_col=class_col, seed=seed_val, k=0.2)

    # Fewest selection.
    fewest_selected = Selection.select_with_fewest_points(training, k=0.2)

    # Representative Selection
    rep_selected = Selection.select_representative_trajectories(training, class_col, closeness_cutoff=0.7, tolerance=10)

    # Balance the dataset.
    balanced_on = Augmentation.balance_dataset_with_augmentation(training, class_col, 'on')
    balanced_in = Augmentation.balance_dataset_with_augmentation(training, class_col, 'in')

    # ------------------------ Create the iterable map to be returned ---------------------------------- #
    return {
        TRAINING: training,
        TEST_X: test_x,
        TEST_Y: test_y,
        RANDOM_SELECTED: random_selected_ids,
        PROPORTIONAL_SELECTED: proportional_selected,
        FEWEST_SELECTED: fewest_selected,
        REPRESENTATIVE_SELECTED: rep_selected,
        BALANCED_ON: balanced_on,
        BALANCED_IN: balanced_in,
    }


In [7]:
def select_correct_test_train_split(iter_map, strat, circle_strat, class_col):
    x_train, y_train = None, None
    if strategy != BASE:
        if not "balanced" in strat:
            x_train, y_train = augment_trajectories(dataset=iter_map[TRAINING], ids_to_augment=iter_map[strat],
                                                    circle=circle_strat, class_col=class_col)
        if "balanced" in strat and circle_strat == "on":
            training = Statistics.pivot_stats_df(
                Statistics.generate_kinematic_stats(iter_map[BALANCED_ON], class_col), class_col).dropna()
            x_train, y_train = training.drop(columns=[class_col]), training[class_col]
        if "balanced" in strategy and circle_strat == "in":
            training = Statistics.pivot_stats_df(
                Statistics.generate_kinematic_stats(iter_map[BALANCED_IN], class_col), class_col).dropna()
            x_train, y_train = training.drop(columns=[class_col]), training[class_col]
    else:
        training = Statistics.pivot_stats_df(Statistics.generate_kinematic_stats(iter_map[TRAINING], class_col), class_col).dropna()
        x_train = training.drop(columns=[class_col])
        y_train = training[class_col]

    return x_train, y_train

In [8]:
# Get the 20 seed values that we are going to use.
seed_generator = Utilities.generate_pi_seed(20)
seed_vals = [next(seed_generator) for i in range(20)]
final_results = ["seed, strategy, model, accuracy, f1_score"]

# All our selection strategies.
select_strategies = [BASE, RANDOM_SELECTED, PROPORTIONAL_SELECTED, FEWEST_SELECTED, REPRESENTATIVE_SELECTED, BALANCED_ON, BALANCED_IN]
models = [GradientBoostingClassifier(), DecisionTreeClassifier(), SVC()]

for seed in seed_vals:
    # Get the iterable map for the seed.
    iter_map = get_iterable_map(seed, 'vehicle_type')
    for strategy in select_strategies:
        for model in models:
            for circle_strategy in ['on', 'in']:
                train_x, train_y = select_correct_test_train_split(iter_map, strategy, circle_strategy, 'vehicle_type')
                if (train_x is not None) and (train_y is not None):
                    # Fit the model and predict.
                    model.random_state = seed
                    model.fit(X=train_x, y=train_y)
                    pred_vals = model.predict(X=iter_map[TEST_X])

                    # Calculate the accuracy and f1 score.
                    acc = accuracy_score(y_true=iter_map[TEST_Y], y_pred=pred_vals)
                    score = f1_score(y_true=iter_map[TEST_Y], y_pred=pred_vals, average='weighted')
                    print(f"{seed}, {strategy}_{circle_strategy}, {model.__class__.__name__}, {acc}, {score}")
                    final_results.append(f"{seed}, {strategy}_{circle_strategy}, {model.__class__.__name__}, {acc}, {score}")


1415, base_on, GradientBoostingClassifier, 0.875, 0.8944444444444444
1415, base_in, GradientBoostingClassifier, 0.875, 0.8944444444444444
1415, base_on, DecisionTreeClassifier, 0.875, 0.8944444444444444
1415, base_in, DecisionTreeClassifier, 0.875, 0.8944444444444444
1415, base_on, SVC, 0.9583333333333334, 0.9379432624113475
1415, base_in, SVC, 0.9583333333333334, 0.9379432624113475
1415, random_selected_on, GradientBoostingClassifier, 0.8333333333333334, 0.8712121212121211
1415, random_selected_in, GradientBoostingClassifier, 0.7916666666666666, 0.8468992248062017
1415, random_selected_on, DecisionTreeClassifier, 0.7083333333333334, 0.7955840455840456
1415, random_selected_in, DecisionTreeClassifier, 0.7916666666666666, 0.8468992248062017
1415, random_selected_on, SVC, 0.9583333333333334, 0.9379432624113475
1415, random_selected_in, SVC, 0.9583333333333334, 0.9379432624113475
1415, proportional_selected_on, GradientBoostingClassifier, 0.7916666666666666, 0.8468992248062017
1415, propo

In [9]:
import csv

# Specify the file path to save the CSV file
file_path = "traffic_results.csv"

# Write the list to a CSV file
with open(file_path, mode="w", newline="") as file:
    writer = csv.writer(file)
    for item in final_results:
        writer.writerow(item.split(", "))