In [1]:
from TrajGen import (
    TrajectoryGenerator,
    DataPoint,
    TrajectoryGeneratorCollection,
    WayPoint,
    color_hex,
    rand_24_bit,
    plot_trajectory,
    plot_datapoint,
    setup_map,
)
import gpxpy
import folium
import random
from typing import Tuple, List
from folium import plugins
import matplotlib.pyplot as plt
import time
import pandas as pd
from datetime import datetime
from datetime import timedelta
import numpy as np
import uuid
import utm 
import gpxpy
import json


center = [55.39594, 10.38831]

plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 100

In [2]:
kmh_to_ms = (1000 / (60 * 60))
# Set trajectory charateristics
mean_speed = 5.5 * kmh_to_ms  # Average human walking speed in m/s
std_speed = 0.5 * kmh_to_ms   # Deviation in walking speed in m/s
mean_time_delta = 10.
std_time_delta = 20.

def ncrands(n: int, min_: int, max_: int):
    rs = []; r0 = np.random.randint(min_, max_)
    for _ in range(n):
        r1 = np.random.randint(min_, max_ - 1) 
        r2 = (r0 + r1 + 1) % max_
        r0 = r2
        rs.append(r2)
    return rs

In [3]:
# Used for generating: Normal trajectories and trajectories with pacing behavior
start_latitude = 55.375106225495145
start_longitude = 10.396267369060276

start_middle_latitude = 55.38169472786001
start_middle_longitude = 10.395885727870679

end_middle_latitude = 55.387728473654015
end_middle_longitude = 10.396407680375694

end_latitude = 55.39472434817015
end_longitude = 10.393096102223307

# Used for generating: Normal trajectories and deviating trajectories
middle_latitude = np.abs((start_middle_latitude - end_middle_latitude)) / 2 + start_middle_latitude
middle_longitude = np.abs((start_middle_longitude - end_middle_longitude)) / 2 + start_middle_longitude

In [4]:
def generate_looping_patterns(seed: int, n_normal = 5, n_looping = 5, repeat_loop = 1):
    np.random.seed(seed)
    normal_pattern = [
        WayPoint(
            latitude = start_latitude,
            longitude = start_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        ),
        WayPoint(
            latitude = end_latitude,
            longitude = end_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        ),
        WayPoint(
            latitude = start_latitude,
            longitude = start_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        ),
    ]
    list_waypoints = []
    # 5 trajectories that visit the following waypoints in order:
    # --> start waypoint --> end waypoint --> start waypoint
    list_waypoints.append((False, n_normal, normal_pattern))
    # A looping pattern go between the following waypoints in a random order:
    options = [
            WayPoint(
                latitude = start_middle_latitude,
                longitude = start_middle_longitude,
                duration = 0, # In seconds
                std = 0, # In meters
            ),
            WayPoint(
                latitude = end_middle_latitude,
                longitude = end_middle_longitude,
                duration = 0, # In seconds
                std = 0, # In meters
            ),    
            WayPoint(
                latitude = end_latitude,
                longitude = end_longitude,
                duration = 0, # In seconds
                std = 0, # In meters
            ),    
    ]
    # Generate several looping patterns:
    looping_pattern = [
        WayPoint(
            latitude = start_latitude,
            longitude = start_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        ),
    ]
    rands = ncrands(
        2 * repeat_loop,
        min_ = 0,
        max_ = len(options),
    )
    for i in range(2 * repeat_loop):
        looping_pattern.extend([options[i]])
    looping_pattern.append(
        WayPoint(
            latitude = start_latitude,
            longitude = start_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        )
    )
    list_waypoints.append((True, 1, looping_pattern))
    return list_waypoints 

looping_pattern_waypoints_list = generate_looping_patterns(
    seed = 12,
    n_normal = 5,
    n_looping = 5, 
)

# Generate and plot generated data
map_ = setup_map(center)
map_ = plot_datapoint([start_latitude, start_longitude], color = "red", map_ = map_)
map_ = plot_datapoint([start_middle_latitude, start_middle_longitude], color = "green", map_ = map_)
map_ = plot_datapoint([end_middle_latitude, end_middle_longitude], color = "blue", map_ = map_)
map_ = plot_datapoint([end_latitude, end_longitude], color = "cyan", map_ = map_)

trajectories0 = []
for anomaly, n, waypoints in looping_pattern_waypoints_list:
    for _ in range(n):
        random.seed(time.time())
        color = "#" + str(color_hex(num = rand_24_bit()))
        trajectory_generator = TrajectoryGenerator(
            waypoints = waypoints,
            mean_time_delta = mean_time_delta,
            std_time_delta = std_time_delta,
            mean_speed = mean_speed,
            std_speed = std_speed,
            std_datapoint = 14,
        )
        map_ = plot_trajectory(
            trajectory_generator.to_latlon(),
            color = color,
            map_ = map_,
        )
        trajectories0.append((anomaly, trajectory_generator))
map_

In [5]:
def generate_deviating_patterns(seed: int, n_normal = 5, n_deviating = 5):
    np.random.seed(seed)
    init_pair = [
        WayPoint(
            latitude = start_latitude,
            longitude = start_longitude,
            duration = 240, # In seconds
            std = 14, # In meters
        ),
    ]
    list_waypoints = []
    x, y, zone, lat_band  = utm.from_latlon(middle_latitude, middle_longitude)
    for _ in range(n_deviating):
        while True:
            dx0 = np.random.normal(0, 500)
            dx1 = np.random.normal(0, 500)
            if np.abs(dx0) > 250:
                break
        lat, lon = utm.to_latlon(dx0 + x, dx1 + y, zone, lat_band)
        list_waypoints.append(
            (True, 1, 
                init_pair + \
                 [WayPoint(
                    latitude = lat,
                    longitude = lon,
                    duration = 0, # Seconds
                    std = 0, # In meters
                )] + \
                 [WayPoint(
                    latitude = end_latitude,
                    longitude = end_longitude,
                    duration = 240, # In seconds
                    std = 14, # In meters
                )],
            )
        )
    list_waypoints.append(
        (False, n_normal, 
            init_pair + [WayPoint(
                latitude = end_latitude,
                longitude = end_longitude,
                duration = 240, # In seconds
                std = 14, # In meters
            )],
        )
    )
    return list_waypoints
   
deviating_patterns_waypoint_list = generate_deviating_patterns(
    seed = 12,
    n_normal = 5,
    n_deviating = 5, 
)

map_ = setup_map(center)
map_ = plot_datapoint([start_latitude, start_longitude], color = "red", map_ = map_)
map_ = plot_datapoint([middle_latitude, middle_longitude], color = "green", map_ = map_)
map_ = plot_datapoint([end_latitude, end_longitude], color = "green", map_ = map_)

trajectories1 = []
for anomaly, n, waypoints in deviating_patterns_waypoint_list:
    for _ in range(n):
        random.seed(time.time())
        color = "#" + str(color_hex(num = rand_24_bit()))
        trajectory_generator = TrajectoryGenerator(
            waypoints = waypoints,
            mean_time_delta = mean_time_delta,
            std_time_delta = std_time_delta,
            mean_speed = mean_speed,
            std_speed = std_speed,
            std_datapoint = 14,
        )
        map_ = plot_trajectory(
            trajectory_generator.to_latlon(),
            color = color,
            map_ = map_,
        )
        trajectories1.append((anomaly, trajectory_generator))
map_

In [6]:
def generate_trajectories(
    waypoints: List,
    start_datetime: datetime,
    gap: timedelta = timedelta(days = 1.),
    *args, **kwargs,
    ) -> TrajectoryGeneratorCollection: 
    trajectories = []
    for anomaly, n, waypoints in waypoints:
        for _ in range(n):
            random.seed(time.time())
            color = "#" + str(color_hex(num = rand_24_bit()))
            trajectory_generator = TrajectoryGenerator(waypoints = waypoints, *args, **kwargs)
            trajectories.append((anomaly, trajectory_generator))
    return TrajectoryGeneratorCollection(
        trajectory_generators = trajectories,
        start_datetime = start_datetime,
        gap = gap,
    ) 

In [7]:
def generate_batches_deviating(n_batches):
    batches = []
    seeds = [i for i in range(12, 12 + n_batches + 1)]
    for seed in seeds:
        waypoints = generate_deviating_patterns(
            seed = seed,
            n_normal = 10,
            n_deviating = 5, 
        ) 
        kwargs = {
            "mean_time_delta": mean_time_delta,
            "std_time_delta": std_time_delta,
            "mean_speed": mean_speed,
            "std_speed": std_speed,
            "std_datapoint": 14,
        }
        tgc = generate_trajectories(
            waypoints = waypoints, 
            start_datetime = datetime.now(),
            **kwargs,
        )
        main_df = tgc.to_dataframe()
        batches.append(main_df.to_json(orient = "split"))
    return batches

def generate_batches_looping(n_batches):
    batches = []
    seeds = [i for i in range(12, 12 + n_batches + 1)]
    for seed in seeds:
        waypoints = generate_looping_patterns(
            seed = seed,
            n_normal = 10,
            n_looping = 5, 
        ) 
        kwargs = {
            "mean_time_delta": mean_time_delta,
            "std_time_delta": std_time_delta,
            "mean_speed": mean_speed,
            "std_speed": std_speed,
            "std_datapoint": 14,
        }
        tgc = generate_trajectories(
            waypoints = waypoints, 
            start_datetime = datetime.now(),
            **kwargs,
        )
        main_df = tgc.to_dataframe()
        batches.append(main_df.to_json(orient = "split"))
    return batches

In [8]:
batches0 = generate_batches_deviating(n_batches = 5)
with open("main_data_deviating.json", "w") as f:
    f.write(json.dumps(batches0))

In [9]:
batches1 = generate_batches_looping(n_batches = 5)
with open("main_data_looping.json", "w") as f:
    f.write(json.dumps(batches1))