# Preprocessor
In data science parlance, this notebook performs the Extract, Transform, and Load portion of our workflow. If there is ever a change to our raw data format, we will only ever have to make changes here, or possibly create a fork of this file to support different raw data formats.

Extract, Transform, and Load is often shortened to ETL. In our ETL workflow, we will:
* Extract the data from the raw csv.
* Transform the data into a workable format by converting to SI units
* Transform the data by splitting the rolls into multiple datasets.
* Load the data into a standard format for our analysis notebooks to consume.

### Organization
Data is always read from `data/raw` and written out to `data/parsed`. 

It is important to always preserve the raw data so you can reprocess it if necessary. Your data is very valuable, and disk space is cheap! You never know what changes you or someone else might want to make to the analysis code in the future, and there could be bugs in the code that require reprocessing the data. 

In [None]:
%matplotlib widget
import os
import json
import numpy as np
import pandas as pd

import matplotlib.cm as cm
import matplotlib.pyplot as plt

from collections import defaultdict
from datetime import datetime
from ipywidgets import *
from pyproj import Proj

from constants import *
from plot_utils import *
MPH_TO_MPS = 0.44704 # convert miles / hour to meters / second
CHUTE_THRESHOLD = START_OF_BACK_HILLS[0]

In [None]:
# List all of the raw data files we will process
from glob import glob
file_patterns = [
    f'{DATA_DIR}/raw/yyyy-mm-dd_*.csv',
]
raw_files = [file for pattern in file_patterns for file in glob(pattern)]
print(f'Found {len(raw_files)} raw data files:')
for file in raw_files:
    print(file)

In [None]:
# Read each file and convert data to SI units
# Plot each file in its own figure to sanity check the data
%matplotlib widget
data = {}

pittsburgh = 17
utm_converter = Proj(proj='utm', zone=pittsburgh, ellps='WGS84', preserve_units=False)

for raw_file in raw_files:
    label = os.path.splitext(os.path.basename(raw_file))[0]
    raw = pd.read_csv(raw_file)
    df = pd.DataFrame()
    data[label] = df
    
    # convert data to SI units 
    df[T] = raw['timestamp']
    x, y = utm_converter(raw['longitude'], raw['latitude'])
    df[X] = x
    df[Y] = y
    # Z = altitude is already in meters
    df[Z] = raw['altitude']
    df[SPEED] = raw['ground_speed'] * MPH_TO_MPS
    df[STD_X] = raw['horizontal_accuracy'] # already in meters
    df[STD_Y] = raw['horizontal_accuracy'] # already in meters
    df[STD_Z] = raw['vertical_accuracy'] # already in meters
    df[STD_SPEED] = raw['speed_accuracy'] * MPH_TO_MPS
    df[FIX_TYPE] = raw['rtktype']
    
    t = df[T].to_numpy()
    s = df[SPEED].to_numpy()
    
    fig, (birds_eye, speed) = plt.subplots(nrows=2)
    fig.suptitle(label)
    add_birds_eye_view(birds_eye, x, y, df[T])
    birds_eye.axvline(x=CHUTE_THRESHOLD,ls='--', color='gray', lw=0.8)
    speed.plot(t,s)
    add_crosshairs(speed)
    
plt.show()

# Split the data into individual rolls
Rolls are counted by the number of times a buggy completes the chute turn. Conveniently, the chute is the westernmost point on the buggy course, so it is easy to detect by looking for points west of CHUTE_THRESHOLD. 

Knowing 1 data point in the chute, we step through time until a moving average of the buggy's speed is below the threshold to be considered stopped. This identifies the approximate start and end of the roll.

Don't worry too much about exactly where the finish line is, or exactly when the buggy starts moving. If you are having trouble finding suitable values for STOPPED_THRESHOLD and DECAY, consider stepping until X is less than some threshold.

In [None]:
rolls = defaultdict(list)

STOPPED_THRESHOLD = 1 # m/s
DECAY = 0.9 # values closer to 1.0 will take a wider average

for label in data:
    df = data[label]
    count = len(df)
    mask = [True] * count
    
    def step_until_stopped(idx, step):
        avg_speed = df[SPEED][idx]
        while avg_speed > STOPPED_THRESHOLD and 0 <= idx+step < count:
            idx += step
            avg_speed = DECAY * avg_speed + (1.0 - DECAY) * df[SPEED][idx]
        return idx
    
    while(df[X][mask].min() < CHUTE_THRESHOLD):
        idx = df[X][mask].idxmin()
        start = step_until_stopped(idx, -1)
        end = step_until_stopped(idx, 1)
        rolls[label].append((start, end))
        mask[start:end] = [False] * (end-start)
    
    rolls[label].sort()
    print(f'{label} => {len(rolls[label])} rolls')
    for idx in range(len(rolls[label])):
        start, end = rolls[label][idx]
        print(f'Roll {idx} start={start}, end={end}') 


# Sanity Check
Before we save the roll data to the output folder, look over each roll and check that it makes sense.

At this point, the roll data is just a start/end index into the raw dataframe. If there is an artifact you don't like, consider retuning STOPPED_THRESHOLD and DECAY to fix it programatically, but don't be afraid to create a scratch cell and manually edit the start/end index as you see fit. 

In [None]:
%matplotlib widget
for label in rolls:
    for idx, (start, end) in enumerate(rolls[label]):
        df = data[label]
        
        fig, (birds_eye, speed) = plt.subplots(nrows=2)
        fig.suptitle(f'{label} roll {idx}')

        x = df[X][start:end].to_numpy()
        y = df[Y][start:end].to_numpy()
        t = df[T][start:end].to_numpy()
        s = df[SPEED][start:end].to_numpy()
        speed.plot(t, s)

        add_birds_eye_view(birds_eye, x, y, t)
        add_crosshairs(speed)
plt.show()

# CSV Output
We're ready to write the roll data out to disk. This duplicates some information from the raw data file, but it allows our analysis code to make a bunch of assumptions about the parsed data.

By default this code will not overwrite an existing file. You must set `DANGEROUS = True` if you want to overwrite. 

Note the metadata that is included to provide clues to when and how these refrence points were generated.

In [None]:
# Write the roll data out with header info so we know when and how it was processed
output_dir = f'{DATA_DIR}/parsed'
os.makedirs(output_dir, exist_ok=True)
DANGEROUS = False
mode = 'w' if DANGEROUS else 'x'
for raw in data:
    df = data[raw]
    for roll_number, (start, end) in enumerate(rolls[raw]):
        label = f'{raw}_{roll_number}'
        with open(f'{output_dir}/{label}.csv', mode) as f:
            newline = '\n'
            f.write(newline.join([
                f'# Single Roll Data',
                f'# version: alpha',
                f'# label: {label}',
                f'# sample_start: {start}',
                f'# sample_end: {end}',
                f'# generated: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}',
                f'# user: {os.getlogin()}',
            ]))
            f.write(newline)
            df[start:end].to_csv(f, header=True, lineterminator='\n', index=False)
            print(f'Successfully wrote {label}')
