In [14]:
from hyperopt import hp, fmin, tpe, Trials
from functools import partial
from sklearn import metrics
import pandas as pd
import numpy as np
import math
import warnings
warnings.filterwarnings("ignore")

In [15]:
dataset_path = './groundTruthGenerator/groundTruth'

In [16]:
move_stop_train = pd.read_csv(f'{dataset_path}/stop_train.csv')
move_stop_train['index'] = move_stop_train.index

In [17]:

from haversine import Haversine
from geopy import distance
from random import uniform
from math import radians
class MSN:
    def __init__(self, es=0, et=0, ev=0, o=0, p=0.5):
        self.St_max = 0
        self.Tt_max = 0
        self.Vt_max = 0
        self.At_max = 0
        self.St = pd.Series() # distances time series
        self.Tt = pd.Series() # duration time series
        self.Vt = pd.Series() # speed time series
        self.At = pd.Series() # turning angles time series
        self.es = es # distance threshold
        self.et = et # duration threshold
        self.ev = ev # speed threshold
        self.o = o # minimum turning
        self.p = p # e lower and upper boundaries of an interval from where some value will be randomly selected and added to the duration value of each point

    def calculate_turn_angle(self, x1, y1, x2, y2, x3, y3):
        # Calculate vectors from (x1, y1) to (x2, y2) and from (x2, y2) to (x3, y3)
        vector1 = (x2 - x1, y2 - y1)
        vector2 = (x3 - x2, y3 - y2)
        # Calculate the magnitudes of the vectors
        magnitude1 = math.sqrt(vector1[0] ** 2 + vector1[1] ** 2) + 0.0000001
        magnitude2 = math.sqrt(vector2[0] ** 2 + vector2[1] ** 2) + 0.0000001
        # Check if either vector has zero magnitude
        if magnitude1 == 0 or magnitude2 == 0:
            raise ValueError("Vectors have zero magnitude, cannot calculate angle.")
        # Calculate the dot product of the two vectors
        dot_product = vector1[0] * vector2[0] + vector1[1] * vector2[1]
        # Ensure that dot_product is within the valid range [-1, 1]
        dot_product = max(-1, min(1, dot_product))
        # Calculate the cosine of the angle between the vectors using the dot product
        cosine_theta = dot_product
        # Calculate the angle in radians using the arccosine function
        theta_radians = math.acos(cosine_theta)
        # Convert the angle to degrees
        theta_degrees = math.degrees(theta_radians)
        return theta_degrees
    
    def distances_durations_speeds_angles(self, points:[[float, float, float]]):
        distances = pd.Series()
        durations = pd.Series()
        speeds = pd.Series()
        turn_angles = pd.Series()
        for i in range(len(points) - 1):
            # compute distance
            p1 = points[i]
            p2 = points[i+1]
            distances[i] = distance(p1[:2], p2[:2]).meters
            # compute duration
            durations[i] = p2[2] - p1[2]
            # compute speed
            time_difference_hours = durations[i] / 3600.0
            speeds[i] = distances[i] / time_difference_hours
            if i < len(points) - 2:
                #compute turn angle
                p3 = points[i+2]
                turn_angles[i] = self.calculate_turn_angle(p1[1], p1[0], p2[1], p2[0], p3[1], p3[0])
            
        return distances, durations, speeds, turn_angles


    def huber_mad(self, data:np.array, k=1.4826) -> float:
        return np.median(abs(data - data.median())) * k


    def modifided_z_score(self, xi, x_median, MAD):
        return (0.6745 * (xi - x_median))/MAD


    def _msn(self, St, Tt, Vt, At, es, et, ev, o, p, quiet=True):
        t = [i for i in range(len(St)+1)]
        distance_outliers = []
        Ms = self.modifided_z_score(St,  np.median(St), self.huber_mad(St))
        for i, ms in enumerate(Ms):
            # Identifying long distances
            if ms > es:
                distance_outliers.append(i)
        direction_outliers = []
        i = 0
        while i < len(At)-1:
            at1 = At[i]
            at2 = At[i+1]
            # Identifying sharp turning angles
            if at1 < o and at2 < o:
                direction_outliers.append(i)
                direction_outliers.append(i+1)
                i += 2
                continue
            i += 1
        noise_indexes = list(set(distance_outliers) | set(direction_outliers))
        clean_indexes = list(set(t) - set(noise_indexes))
        # Removing noisy_points
        Tt = Tt.drop(index=noise_indexes)
        Vt = Vt.drop(index=noise_indexes)
        # Adding a random uniform jitter
        Tt = Tt + p
        duration_outliers = []
        Mt = self.modifided_z_score(Tt, np.median(Tt), self.huber_mad(Tt))
        for i, mt in enumerate(Mt):
            # Identifying long durations
            if mt > et:
                duration_outliers.append(i)
        # Natural log of speed
        Vt = np.log(Vt)
        speed_outliers = []
        Mv = self.modifided_z_score(Vt, np.median(Vt), self.huber_mad(Vt))
        for i, mv in enumerate(Mv):
            # Identifying slow speeds
            if mv < -ev:
                speed_outliers.append(i)
        stop_indexes = list(set(duration_outliers).union(set(speed_outliers)))
        move_indexes = list(set(clean_indexes) - set(stop_indexes))
        return move_indexes, stop_indexes, noise_indexes

    def predict(self, X, lng='y', lat='x', t='timestep'):
        self.St, self.Tt, self.Vt, self.At = self.distances_durations_speeds_angles(X[[lng, lat, t]].values)
        p = uniform(-self.p, self.p)
        move_indexes, stop_indexes, noise_indexes = self._msn(self.St, self.Tt, self.Vt, self.At,
                                                              self.es, self.et, self.ev, self.o, p)
        y_pred = []
        for i in range(len(X)):
            if i in move_indexes:
                y_pred.append(False)
            elif i in stop_indexes:
                y_pred.append(True)
            elif i in noise_indexes:
                y_pred.append(False)
            else:
                y_pred.append('unkown')
        return y_pred
    
    def fit(self, X, lng='y', lat='x', t='timestep', stop='stop'):
        St, Tt, Vt, At = self.distances_durations_speeds_angles(X[[lng, lat, t]].values)
        def z_score(t):
            return self.modifided_z_score(t, np.median(t), self.huber_mad(t))
        X['St'] = z_score(St)
        X['Tt'] = z_score(Tt)
        X['Vt'] = z_score(Vt)
        X['At'] = At
        Stops = X[X[stop] == True].dropna()
        Moves = X[X[stop] == False].dropna()
        NaN = float('nan')
        crr_es = Moves['St'].dropna().min()
        self.es = min(self.es, crr_es)
        crr_et = Stops['Tt'].dropna().min()
        self.et = min(self.et, crr_et)
        crr_ev = Stops['Vt'].dropna().min()
        self.ev = min(self.ev, crr_ev)
        crr_o = Moves['At'].dropna().max()
        self.o = max(self.o, crr_o)

    def get_params(self):
        params = {
            'es':self.es,
            'et':self.et,
            'ev':self.ev,
            'o':self.o,
            'p':self.p
        }
        return params

### Train

In [18]:
move_stop_train = pd.read_csv(f'{dataset_path}/stop_train.csv')
move_stop_train['index'] = move_stop_train.index

In [19]:
from tqdm import tqdm
model_trained = MSN(p=0)
data = move_stop_train
veh_id_unique = data['id'].unique()
for veh_id in tqdm(veh_id_unique):
    trajectory = data[data['id'] == veh_id]
    trajectory = trajectory.sort_values(by='timestep')
    model_trained.fit(trajectory)
    if type(model_trained.es) == float('nan'):
        break
    if type(model_trained.et) == float('nan'):
        break
    if type(model_trained.ev) == float('nan'):
        break
    if type(model_trained.o) == float('nan'):
        break
print(model_trained.get_params())

100%|██████████| 1191/1191 [08:25<00:00,  2.36it/s]

{'es': 0, 'et': 0, 'ev': 0, 'o': 0, 'p': 0}





### Test / Validation

In [20]:
move_stop_test = pd.read_csv(f'{dataset_path}/stop_test.csv')
move_stop_test['index'] = move_stop_test.index
veh_id_unique = move_stop_test['id'].unique()

In [21]:
ac_list = []
pr_list = []
re_list = []
f1_list = []
model = model_trained
model.p = 0.5
data = move_stop_test
for veh_id in veh_id_unique:
    trajectory = data[data['id'] == veh_id]
    trajectory = trajectory.sort_values(by='timestep')
    y_true = trajectory['stop']
    y_pred = model.predict(trajectory.drop(columns=['stop']))

    ac_list.append(metrics.accuracy_score(y_true, y_pred))
    pr_list.append(metrics.precision_score(y_true, y_pred))
    re_list.append(metrics.recall_score(y_true, y_pred))
    f1_list.append(metrics.f1_score(y_true, y_pred))

ac_mean = np.mean(ac_list)
pr_mean = np.mean(pr_list)
re_mean = np.mean(re_list)
f1_mean = np.mean(f1_list)

print('Accuracy mean:', ac_mean)
print('Precision mean:', pr_mean)
print('Recall mean:', re_mean)
print('F1 mean:', f1_mean)

Accuracy mean: 0.743042938181204
Precision mean: 0.05189316015618539
Recall mean: 0.38655102162814126
F1 mean: 0.07944864778024081
