# Data Pre-Processing

Is your data too messy to be utilized in 02? Look no further! This notebook walks through the data pre-processing methodology for our datasets, particularly BDD100K. We also include some helpful tips to make your data more compatible with these notebooks.


In [3]:
import networkx as nx
import osmnx as ox 
import time
from shapely.geometry import Polygon
import os, io, sys
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
from algorithms import mm_utils

%matplotlib inline
#ox.__version__



# Importing your Data

If you are seriously testing your algorithm against data, chances are your dataset is huge. Blindly trying to import it into a Pandas (Geo)DataFrame is going to cause some issues, because it will attempt to load it all into memory (which is likely impossible).

In our case, we use Dask to handle this.

(In general, if you are exclusively using Pandas, Modin might be easier, as it is drop-in compatible. But Dask can handle GeoDataFrames (unlike Modin), so we will use that here) 

In [4]:
# Fast JSON library
import ujson as json

## Note for future self: If you need to muck around with JSON formatting, jq might be the way to go
## Transforming to/from line-delimited, for example, is far simpler

# Import the Dask libraries we need
import dask.bag as db

In [5]:
# If you try to load the BDD100K files directly into Dask, you'll run into some issues.
# Dask Bags assume every line in a json file is a distinct json object.
# So because BDD100K uses pretty json formatting, it will not load properly.
# So we have to first remove all the newline characters in all of the files.

# Note that the database size is huge, so make sure you have adequate disk space

path = 'BDD100K/train/'

for filename in os.listdir(path):
    if ('json' in filename and not 'processed' in filename):
        filepath = os.path.join(path, filename)
        f = open(filepath, 'r')
        f = f.read().replace('\n', '')
        if 'gps' in f.read(): # Some of the files don't have GPS data.
# These are unusuable to us, so we don't want them
            if os.path.getsize(os.path.join(path,filename)) == 0: 
    # Fun fact: the BDD100K info dataset has a corrupted (empty) JSON file
    # This caused an enormous headache on my end while debugging
    # So now we will only process it if it's non-empty.
    # Otherwise, we skip the processing part, but still delete the file
    # (A more robust method would be to `try: json.loads`, but this would greatly increase processing time)
                with open(path + 'processed-' + filename, 'w') as fp:
                    print(f, file=fp)
        # These files take up a lot of space on my harddrive, so I will remove them here.
        # You may wish not to do this
        os.remove(filepath)


In [10]:

for filename in os.listdir(path):
        filepath = os.path.join(path, filename)
        if 'ipynb' in filepath:
            pass
        else:
            f = open(filepath, 'r')
            if 'FeatureCollection' not in f.read(): # Some of the files don't have GPS data.
    # These are unusuable to us, so we don't want them
                os.remove(filepath)

In [11]:
path = 'BDD100K/train/'

for filename in os.listdir(path):
        if not 'postprocessed' in filename:
            filepath = os.path.join(path, filename)
            if 'ipynb' in filepath:
                pass
            else:
                f = open(filepath, 'r')
                if 'gps' not in f.read(): # Some of the files don't have GPS data.
        # These are unusuable to us, so we don't want them
        # In BDD100K, there's about 20000 files like this... quite unfortunate
                    os.remove(filepath)


In [12]:
# Now we load all the JSON files

dfbag = db.read_text('BDD100K/train/processed-*.json').map(json.loads)

# This is a helper function for reformatting

def bdd_reformat(jsonf):        
    listf = []
    if jsonf.get('gps') != None:
        for item in jsonf['gps']:
            listf.append({"type": "Feature",
          "geometry": {
            "type": "Point",
            "coordinates": [item["longitude"], item["latitude"]]
          },
          "properties": {
            "timestamp": item["timestamp"],
            "altitude": item["altitude"],
            "speed": item["speed"],
            "vertical accuracy": item["vertical accuracy"],
            "horizontal accuracy": item["horizontal accuracy"]
          }})
        geojsonf = {"type": "FeatureCollection", "features": listf}
        return geojsonf
    else:
        return jsonf

ValueError: ('No files found', 'BDD100K/train/processed-*.json')

In [13]:
path = 'BDD100K/train/'
files = 'postprocessed-*.json'

# create a text trap and redirect stdout
text_trap = io.StringIO()
sys.stdout = text_trap

### TODO: Consider outputting to better file format, e.g. Avro, Parquet

dfbag.map(bdd_reformat).map(json.dumps).to_textfiles(path + files)

# now restore stdout function
sys.stdout = sys.__stdout__

for filename in os.listdir(path):
    if (not 'postprocessed' in filename):
        filepath = os.path.join(path, filename)
        os.remove(filepath)

# Note-- now may be a good time to zip and compress the processed files, in case something happens

NameError: name 'dfbag' is not defined

Now our data has been post-processed to a format that is compatible with map matching algorithms. The simplest way to utilize the data is to load it all into a Dask Bag, and `take(n,npartitions=n)` as needed (alternatively, you can load each GeoJSON as a partition in a Dask GeoDataFrame-- but caution needs to be exercised here). However, if you wish to do more in-depth data analysis on the dataset, thousands of JSON files aren't exactly optimal. We could try to apply functions on Dask Bags, but the simpler solution is to store the files into a SQLite database. Then we can access the database as needed and access filtered data quickly. 

Note: if you have no interest in utilizing the GeoJSON structure, you should create a database from the unprocessed files

In [14]:
# In our case, it makes more sense to store it into a SQLite database, but MySQL, MariaDB, or other formats work perfectly well.

# Fortunately, there are a lot of tools to convert GeoJSON to a spatially informed database
# So instead of trying to do it ourselves, we will use an external tool to do the heavy lifting
# Aren't you glad we processed the data into a more standard format?

# Run this only once
#! pip install geojson-to-sqlite
#! sudo pamac install spatialite-gui # Optional, but improves our database

In [None]:
path = 'BDD100K/train/'

# This for-loop takes a long time (maybe 24 hours?)
# I don't know if you can parallelize creating tables in SQL, but if so, you probably should
for filename in os.listdir(path):
    ! geojson-to-sqlite BDD100K/postprocessed-BDD100K.db {filename[:-5]} {path+filename} --spatial-index --spatialite_mod=/usr/lib/mod_spatialite.so 

In order to keep each track separate, we made them all individual tables. If you want them all in one table, simply change `{filename[:-5]}` to the name you would like.

Let's run a query to make sure it works.

In [None]:
import sqlite3
conn = sqlite3.connect('BDD100K/postprocessed-BDD100K.db')
conn.enable_load_extension(True)

# Now we load spatialite
conn.execute('SELECT load_extension("mod_spatialite")')
conn.execute('SELECT InitSpatialMetaData(1);')

# libspatialite
conn.execute('SELECT load_extension("libspatialite")')
conn.execute('SELECT InitSpatialMetaData();')

cur = conn.cursor()
cur.execute('SELECT ')

#conn.commit()
#conn.close()
#del conn

All done, right? Not quite. For example: is your data fused?

In [None]:
## Display data and see if fused

# Note: If you chose to store all your data in one SQL Table,
# Dask DataFrames can import from that.

dfbag = db.read_text('BDD100K/train/postprocessed-*.json').map(json.loads)
dfbag = dfbag.map(gpd.GeoDataFrame.from_features)
dfbag.take(1)[0]

In our case, our data is already fused. But often you will have several datasets with asynchronous data that you will have to fuse first. We implemented a barebones method in mm_utils to handle this; here is an example of how to apply it.

Note that your data needs to be a (Geo)DataFrame or GeoJSON. Also, the first column of all the datasets needs to be the time, and must all share the same time formatting. If you aren't sure your time format will work, we recommmend converting it all to Unix time (most languages have a built-in method to do this)

In [None]:
# We create simulated asynchronous data

# Get columns of data
df = dfbag.take(1)[0]
speed = origdf[['timestamp','speed']]

# Create rng to create noisy data
rng = np.random.default_rng()
for i in speed.index[:-1]:
    # Add a fake row consisting of a time between the timestamps, and a speed normally distributed around the average
    fakerow = pd.DataFrame([[speed.iloc[i][0] + rng.random()*1000,(speed.iloc[i][1] + speed.iloc[i+1][1])/2 + np.random.normal(0, (np.abs(speed.iloc[i][1] - speed.iloc[i+1][1]))/4)]], columns = ['timestamp','speed'], index = [i+0.5])
    speed = pd.concat([speed, fakerow])
# Reset indices
speed = speed.sort_index().reset_index(drop=True)
speed


Now we wish to fuse this DataFrame with our original dataframe (excluding the original speed column)

In [None]:
df1 = origdf[['geometry', 'timestamp', 'altitude', 'vertical accuracy', 'horizontal accuracy']]

df_prox = mm_utils.fuse(df1,speed)
df_avg = mm_utils.fuse(df1,speed)
df_blah = mm_utils.fuse(df1,speed)

Now let's see what the speed columns look like side-by-side

In [None]:
# This cell sets up styling to facilitate comparison
from IPython.display import display_html

df_sty = df.loc['speed'].style.set_table_attributes("style='display:inline'").set_caption('original df')
df_prox_sty = df_prox.loc['speed'].style.set_table_attributes("style='display:inline'").set_caption('proximity fuse')
df_avg_sty = df_prox.loc['speed'].style.set_table_attributes("style='display:inline'").set_caption('average fuse')
df_blah_sty = df_blah.loc['speed'].style.set_table_attributes("style='display:inline'").set_caption('df_blah')

In [None]:
space = "\xa0" * 10
display_html(df_sty._repr_html_() + space
             + df_prox_sty._repr_html_() + space
             + df_avg_sty._repr_html_() + space
             + df_blah_sty._repr_html_(), raw=True)