In [1]:
# imports

import numpy as np
import csv
from utils import absolute_path
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from position_fix_utils import filter_by_date, smooth_trajectory
from gb_spm import characteristic_indices, significant_place_mining
import webbrowser
from LabelPlot import LabelPlot
from ipyleaflet import (
    Map,
    CircleMarker,
    Polyline,
    Popup,
    Marker,
)
import ipywidgets as widgets
from MapPlot import MapPlot
from scipy import stats
from traitlets import directional_link


%reload_ext autoreload
%autoreload 2

In [2]:
# load data from csv

position_fix_dtype = np.dtype([
    ('lat', np.float64),
    ('lon', np.float64),
    ('time', np.float64),
    ('altitude', np.float64),
    ('bearing', np.float64),
    ('speed', np.float64),
    ('accuracy', np.float64),
    ('vertical_accuracy', np.float64),
    ('bearing_accuracy', np.float64),
    ('speed_accuracy', np.float64),
])


def position_fix_from_csv(file_path, remove_duplicates=True):
    data = []
    with open(file_path, 'r') as file:
        reader = csv.DictReader(file)
        prior = None
        for row in reader:
            point = (
                float(row['latitude']),
                float(row['longitude']),
                float(row['create_time_epoch']),
                float(row['altitude']),
                float(row['bearing']),
                float(row['speed']),
                float(row['accuracy']),
                float(row['vertical_accuracy']),
                float(row['bearing_accuracy']),
                float(row['speed_accuracy']),
            )
            if remove_duplicates:
                if prior is not None and prior == point:
                    continue
                else:
                    prior = point
            data.append(point)
    return np.array(data, dtype=position_fix_dtype)


data_path = absolute_path("andrew-device-locations-all.csv")
location_data = position_fix_from_csv(data_path)
print('done')

In [3]:
# Show covariance of features

# RESULTS: altitude and vertical accuracy may be unnecessary.
def show_covariance_matrix(data):
    data_matrix = np.column_stack([data[field] for field in position_fix_dtype.names])
    data_matrix_standardized = (data_matrix - np.mean(data_matrix, axis=0)) / np.std(data_matrix, axis=0)

    cov_matrix = np.cov(data_matrix_standardized, rowvar=False)
    # Plot covariance matrix as heatmap
    plt.figure(figsize=(8, 6))
    plt.imshow(cov_matrix, cmap='viridis', interpolation='nearest')
    plt.colorbar(label='Covariance')
    plt.title('Covariance Matrix')
    plt.xticks(np.arange(len(position_fix_dtype.names)), position_fix_dtype.names, rotation=45)
    plt.yticks(np.arange(len(position_fix_dtype.names)), position_fix_dtype.names)
    plt.show()


show_covariance_matrix(location_data)

In [4]:
# gb-spm to find significant places

def gb_spm(data):
    smoothed = smooth_trajectory(data, s=5e-11 * len(data), weight='inverse')
    cp_indices = characteristic_indices(smoothed, 4, 1)  # [45:47]
    characteristic_points = data[cp_indices]
    significant_places = significant_place_mining(smoothed, cp_indices, 3, 0.25, 120, 60)

    return significant_places


In [5]:
"""
Recommended workflow for labelling: 
    1) Set date to the datetime you would like to label
    2) Run map generation
    3) Select stop locations with rectangle tool (hold Alt to disable snapping to points)
    4) Select specific points by clicking on them
    5) Mark done locations by dragging the marker off of them
    6) export using the save button
"""
pass

In [6]:
# Generate labels for a given day via map

# Load appropriate data
date = datetime(2024, 5, 11)
def load_data(date):
    day_data = filter_by_date(location_data, date)
    file_path = date.strftime("%Y-%m-%d") + "-labels.csv"
    try:
        labels = np.loadtxt(file_path, delimiter=',').astype(int)
    except:
        labels = np.zeros(len(day_data))
    assert len(day_data) == len(labels)
    return day_data, labels
day_data, day_labels = load_data(date)


label_plot_output = widgets.Output()
label_plot = LabelPlot()


# set labels of all indices to the opposite of the mode of the indices
def click_indices(indices):
    mode = stats.mode(label_plot.labels[indices], keepdims=False)
    new_label = 0 if mode[0] == 1 else 1
    label_plot.labels[indices] = new_label
    color = 'red' if new_label == 1 else 'green'
    opacity = 1.0   # if new_label == 1 else 0.3
    for index in indices:
        label_plot.markers[index].color = color
        label_plot.markers[index].opacity = opacity

def handle_click(**kwargs):
    with label_plot_output:
        coords = kwargs.get('coordinates')
        # min of taxicab distance because its good enough
        distance = np.abs(label_plot.points['lat'] - coords[0]) + np.abs(label_plot.points['lon'] - coords[1])
        index = np.argmin(distance)
        if label_plot.markers[index].visible:
            click_indices([index])

def handle_rectangle(b, action, geo_json):
    with label_plot_output:
        if action == 'create':
            coordinates = geo_json[0]['geometry']['coordinates'][0]
            lon_min = min(coord[0] for coord in coordinates)
            lon_max = max(coord[0] for coord in coordinates)
            lat_min = min(coord[1] for coord in coordinates)
            lat_max = max(coord[1] for coord in coordinates)

            selected_indices = []
            # iterate over currently shown points
            for i, point in enumerate(label_plot.points[label_plot.visible_start_index:label_plot.visible_end_index]):
                if lon_min <= point['lon'] <= lon_max and lat_min <= point['lat'] <= lat_max:
                    selected_indices.append(i + label_plot.visible_start_index)
            click_indices(selected_indices)
        label_plot.draw_control.clear()


label_plot.add_rectangle_tool(handle_rectangle)
label_plot.add_curve(day_data)
label_plot.add_stop_regions(gb_spm(day_data), markers=False, color='magenta', draggable=True)
label_plot.add_points_clickable(day_data, handle_click, labels=day_labels)


# Labelling tools
# button to save labels to csv
def save_labels(b):
    with label_plot_output:
        file_path = date.strftime("%Y-%m-%d") + "-labels.csv"
        np.savetxt(file_path, label_plot.labels.astype(int), fmt='%d', delimiter=',')
        print("Saved labels to", file_path)
save_button = widgets.Button(description="Save Labels")
save_button.on_click(save_labels)

# Range slider
time_slider = widgets.IntRangeSlider(
    value=[label_plot.points[0]['time'], label_plot.points[0]['time']],
    min=label_plot.points[0]['time'],
    max=label_plot.points[-1]['time'],
    step=1,
    continuous_update=True,
    readout=False
)
start_label = widgets.Text(
    description='Start:',
    continuous_update=False
)
end_label = widgets.Text(
    description='End:',
    continuous_update=False
)
slider_label = widgets.HBox([start_label, end_label])

# Link slider and line edits
base_date = datetime.utcfromtimestamp(label_plot.points[0]['time']).date()
def get_range_epoch(x):
    start = int(datetime.strptime(start_label.value, '%Y-%m-%d %H:%M:%S').timestamp())
    end = int(datetime.strptime(end_label.value, '%Y-%m-%d %H:%M:%S').timestamp())
    return (start, end)
def epoch_to_string(epoch_time):
    epoch_time -= 4 * 60 * 60
    return datetime.utcfromtimestamp(epoch_time).strftime('%Y-%m-%d %H:%M:%S')
    
start_link_label = directional_link((time_slider, "value"), (start_label, "value"), lambda x: epoch_to_string(x[0]))
end_link_label = directional_link((time_slider, "value"), (end_label, "value"), lambda x: epoch_to_string(x[1]))
start_link_slider = directional_link((start_label, "value"), (time_slider, "value"), get_range_epoch)
end_link_slider = directional_link((end_label, "value"), (time_slider, "value"), get_range_epoch)

def show_range(change):
    with label_plot_output:
        start_time = change['new'][0]
        end_time = change['new'][1]
        label_plot.show_time(start_time, end_time)

time_slider.style = {'description_width': 'initial'}
time_slider.observe(show_range, names='value')
time_slider.layout = widgets.Layout(width='100%')

display(label_plot_output, label_plot, time_slider, slider_label, save_button)