## Initial Data Prep for Element 3
This file will create a sqlite database with 8 tables.  Each table will correspond to a single message type from the raw data.  The sqlite database will then be used in the next file to downsample the data.  The downsampled data is what is used in the visualization for Element3.

### Notes:
* Removed DjGan1X, FrcTowX, MarkX, ForceX as the raw data does not appear to match the information in the documentation.  As such we can not make assumptions about what is represented in the raw data.
* Kept MdtX, it is in docs however the data doesnt match docs, assumed GPS Quality and Course were swapped in data

In [8]:
#import needed libraries
import os
import csv
import sqlalchemy
import dataset
from dateutil.parser import parse

#Set up the database (deleting an old version of it if it exists)
database = "fhwa_mn.sqlite"
path_database = os.path.join(os.getcwd(),database)

try:
    os.remove(path_database)
except:
    pass

database_uri = "sqlite:///" + path_database

db = dataset.connect(database_uri)

#Specify the paths to the directories of the extracted raw data
vehicle_path  = "RDE/"
weather_path  = "WxDE/"

weather_files = [os.path.join(weather_path, f) for f in os.listdir(weather_path) 
                 if os.path.isfile(os.path.join(weather_path, f)) and ".csv" in f]

vehicle_files = [os.path.join(vehicle_path, f) for f in os.listdir(vehicle_path) 
                 if os.path.isfile(os.path.join(vehicle_path, f))]

In [9]:
#Table headers from weather file and vehicle files
weather_header = ["SourceId","ObsTypeID","ObsTypeName","SensorID","SensorIndex","StationID","SiteID","Category", \
    "ContribID","Contributor","StationCode","Timestamp","Latitude","Longitude","Elevation","Observation", "Units",\
    "EnglishValue","EnglishUnits","ConfValue","Flags"]

vaix_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', \
                'Mode', 'RoadTemp', 'AirTemp', 'DewPoint', 'Humidity']
actx_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', 'IgnitionSignal', \
                'ActiveSignal1', 'ActiveSignal2', 'ActiveSignal3', 'ActiveSignal4']
rainx_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', \
                 'PrecipitationStatus']
canx_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', \
                'SPN_List']
mdtx_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'GPS_Quality', 'Course', 'Velocity', \
                'CurrentLane', 'Material', 'PlowStatus', 'RoadCondition', 'WeatherCondition']
obdy_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', \
                'CoolantTemp', 'FuelPressure', 'EngineRPM', 'VehicleSpeed', 'IntakeAirTemp', 'AbsoluteThrottlePosition', \
                'TimeSinceEngineStart', 'FuelLevel', 'BarometricPressure', 'AmbientAirTemp', 'EngineFuelRate', 'BrakePedal', \
                'TractionAssist', 'LateralAcceleration', 'LongitudinalAcceleration', 'SteeringAngle', 'Yaw', 'Roll']
update_headers = ['Agency', 'ESN', 'VehicleName', 'Timestamp', 'Latitude', 'Longitude', 'Course', 'GPS_Quality', 'Velocity', \
                  'EventCode', 'IdleStatus']

In [10]:
# Create Tables
table_vaix = db.create_table('vaix')
table_actx = db.create_table('actx')
table_rainx = db.create_table('rainx')
table_canx = db.create_table('canx')
table_mdtx = db.create_table('mdtx')
table_obdy = db.create_table('obdy')
table_update = db.create_table('update')
table_weather = db.create_table('weather')

In [11]:
# Insert Columns into tables
table_vaix.create_column("Agency", sqlalchemy.String)
table_vaix.create_column("ESN", sqlalchemy.Integer)
table_vaix.create_column("VehicleName", sqlalchemy.String)
table_vaix.create_column("Timestamp", sqlalchemy.DateTime)
table_vaix.create_column("Latitude", sqlalchemy.Float)
table_vaix.create_column("Longitude", sqlalchemy.Float)
table_vaix.create_column("Course", sqlalchemy.Float)
table_vaix.create_column("GPS_Quality", sqlalchemy.Integer)
table_vaix.create_column("Velocity", sqlalchemy.Float)
table_vaix.create_column("Mode", sqlalchemy.String)
table_vaix.create_column("RoadTemp", sqlalchemy.Float)
table_vaix.create_column("AirTemp", sqlalchemy.Integer)
table_vaix.create_column("DewPoint", sqlalchemy.Float)
table_vaix.create_column("Humidity", sqlalchemy.Integer)

table_actx.create_column("Agency", sqlalchemy.String)
table_actx.create_column("ESN", sqlalchemy.Integer)
table_actx.create_column("VehicleName", sqlalchemy.String)
table_actx.create_column("Timestamp", sqlalchemy.DateTime)
table_actx.create_column("Latitude", sqlalchemy.Float)
table_actx.create_column("Longitude", sqlalchemy.Float)
table_actx.create_column("Course", sqlalchemy.Float)
table_actx.create_column("GPS_Quality", sqlalchemy.Integer)
table_actx.create_column("Velocity", sqlalchemy.Float)
table_actx.create_column("IgnitionSignal", sqlalchemy.Integer)
table_actx.create_column("ActiveSignal1", sqlalchemy.Integer)
table_actx.create_column("ActiveSignal2", sqlalchemy.Integer)
table_actx.create_column("ActiveSignal3", sqlalchemy.Integer)
table_actx.create_column("ActiveSignal4", sqlalchemy.Integer)

table_rainx.create_column("Agency", sqlalchemy.String)
table_rainx.create_column("ESN", sqlalchemy.Integer)
table_rainx.create_column("VehicleName", sqlalchemy.String)
table_rainx.create_column("Timestamp", sqlalchemy.DateTime)
table_rainx.create_column("Latitude", sqlalchemy.Float)
table_rainx.create_column("Longitude", sqlalchemy.Float)
table_rainx.create_column("Course", sqlalchemy.Float)
table_rainx.create_column("GPS_Quality", sqlalchemy.Integer)
table_rainx.create_column("Velocity", sqlalchemy.Float)
table_rainx.create_column("PrecipitationStatus", sqlalchemy.String)

table_canx.create_column("Agency", sqlalchemy.String)
table_canx.create_column("ESN", sqlalchemy.Integer)
table_canx.create_column("VehicleName", sqlalchemy.String)
table_canx.create_column("Timestamp", sqlalchemy.DateTime)
table_canx.create_column("Latitude", sqlalchemy.Float)
table_canx.create_column("Longitude", sqlalchemy.Float)
table_canx.create_column("Course", sqlalchemy.Float)
table_canx.create_column("GPS_Quality", sqlalchemy.Integer)
table_canx.create_column("Velocity", sqlalchemy.Float)
table_canx.create_column("SPN_List", sqlalchemy.String)

table_mdtx.create_column("Agency", sqlalchemy.String)
table_mdtx.create_column("ESN", sqlalchemy.Integer)
table_mdtx.create_column("VehicleName", sqlalchemy.String)
table_mdtx.create_column("Timestamp", sqlalchemy.DateTime)
table_mdtx.create_column("Latitude", sqlalchemy.Float)
table_mdtx.create_column("Longitude", sqlalchemy.Float)
table_mdtx.create_column("GPS_Quality", sqlalchemy.Integer)
table_mdtx.create_column("Course", sqlalchemy.Float)
table_mdtx.create_column("Velocity", sqlalchemy.Float)
table_mdtx.create_column("CurrentLane", sqlalchemy.Integer)
table_mdtx.create_column("Material", sqlalchemy.Integer)
table_mdtx.create_column("PlowStatus", sqlalchemy.Integer)
table_mdtx.create_column("RoadCondition", sqlalchemy.Integer)
table_mdtx.create_column("WeatherCondition", sqlalchemy.Integer)

table_obdy.create_column("Agency", sqlalchemy.String)
table_obdy.create_column("ESN", sqlalchemy.Integer)
table_obdy.create_column("VehicleName", sqlalchemy.String)
table_obdy.create_column("Timestamp", sqlalchemy.DateTime)
table_obdy.create_column("Latitude", sqlalchemy.Float)
table_obdy.create_column("Longitude", sqlalchemy.Float)
table_obdy.create_column("Course", sqlalchemy.Float)
table_obdy.create_column("GPS_Quality", sqlalchemy.Integer)
table_obdy.create_column("Velocity", sqlalchemy.Float)
table_obdy.create_column("CoolantTemp", sqlalchemy.Float)
table_obdy.create_column("FuelPressure", sqlalchemy.Float)
table_obdy.create_column("EngineRPM", sqlalchemy.Float)
table_obdy.create_column("VehicleSpeed", sqlalchemy.Float)
table_obdy.create_column("IntakeAirTemp", sqlalchemy.Float)
table_obdy.create_column("AbsoluteThrottlePosition", sqlalchemy.Float)
table_obdy.create_column("TimeSinceEngineStart", sqlalchemy.Float)
table_obdy.create_column("FuelLevel", sqlalchemy.Float)
table_obdy.create_column("BarometricPressure", sqlalchemy.Float)
table_obdy.create_column("AmbientAirTemp", sqlalchemy.Float)
table_obdy.create_column("EngineFuelRate", sqlalchemy.Float)
table_obdy.create_column("BrakePedal", sqlalchemy.Float)
table_obdy.create_column("TractionAssist", sqlalchemy.Float)
table_obdy.create_column("LateralAcceleration", sqlalchemy.Float)
table_obdy.create_column("LongitudinalAcceleration", sqlalchemy.Float)
table_obdy.create_column("SteeringAngle", sqlalchemy.Float)
table_obdy.create_column("Yaw", sqlalchemy.Float)
table_obdy.create_column("Roll", sqlalchemy.Float)

table_update.create_column("Agency", sqlalchemy.String)
table_update.create_column("ESN", sqlalchemy.Integer)
table_update.create_column("VehicleName", sqlalchemy.String)
table_update.create_column("Timestamp", sqlalchemy.DateTime)
table_update.create_column("Latitude", sqlalchemy.Float)
table_update.create_column("Longitude", sqlalchemy.Float)
table_update.create_column("Course", sqlalchemy.Float)
table_update.create_column("GPS_Quality", sqlalchemy.Integer)
table_update.create_column("Velocity", sqlalchemy.Float)
table_update.create_column("EventCode", sqlalchemy.Integer)
table_update.create_column("IdleStatus", sqlalchemy.Integer)

table_weather.create_column("SourceId", sqlalchemy.Integer)
table_weather.create_column("ObsTypeID", sqlalchemy.Integer)
table_weather.create_column("ObsTypeName", sqlalchemy.String)
table_weather.create_column("SensorID", sqlalchemy.Integer)
table_weather.create_column("SensorIndex", sqlalchemy.Integer)
table_weather.create_column("StationID", sqlalchemy.Integer)
table_weather.create_column("SiteID", sqlalchemy.Integer)
table_weather.create_column("Category", sqlalchemy.String)
table_weather.create_column("ObsTypeID", sqlalchemy.Integer)
table_weather.create_column("ContribID", sqlalchemy.Integer)
table_weather.create_column("Contributor", sqlalchemy.String)
table_weather.create_column("StationCode", sqlalchemy.String)
table_weather.create_column("Timestamp", sqlalchemy.DateTime)
table_weather.create_column("Latitude", sqlalchemy.Float)
table_weather.create_column("Longitude", sqlalchemy.Float)
table_weather.create_column("Elevation", sqlalchemy.Integer)
table_weather.create_column("Observation", sqlalchemy.Float)
table_weather.create_column("Units", sqlalchemy.String)
table_weather.create_column("EnglishValue", sqlalchemy.Float)
table_weather.create_column("EnglishUnits", sqlalchemy.String)
table_weather.create_column("ConfValue", sqlalchemy.Float)
table_weather.create_column("Flags", sqlalchemy.String)

In [12]:
%%time
# Put weather data into database
for file in weather_files:
    weather_data = []
    with open(file) as source:
        csvreader = csv.reader(source, delimiter=',', quotechar='"')
        row = 1
        while row < 7:
            next(csvreader)
            row += 1
        
        for line in csvreader:
            if len(line) == 1:
                break
            newline = line[:20]
            flags = ",".join(line[20:])
            newline.append(flags)
            linedict = dict(zip(weather_header,newline))
            linedict["SourceId"] = int(linedict["SourceId"])
            linedict["ObsTypeID"] = int(linedict["ObsTypeID"])
            linedict["SensorID"]  = int(linedict["SensorID"])
            linedict["SensorIndex"] = int(linedict["SensorIndex"])
            linedict["StationID"]   = int(linedict["StationID"])
            linedict["SiteID"]      = int(linedict["SiteID"])
            linedict["ObsTypeID"]   = int(linedict["ObsTypeID"])
            linedict["ContribID"]   = int(linedict["ContribID"])
            linedict["Timestamp"]   = parse(linedict["Timestamp"])
            linedict["Latitude"]    = float(linedict["Latitude"])
            linedict["Longitude"]   = float(linedict["Longitude"])
            linedict["Elevation"]   = int(linedict["Elevation"])
            linedict["Observation"] = float(linedict["Observation"])
            if linedict["Units"] == "null":
                linedict["Units"] = None
            linedict["EnglishValue"] = float(linedict["EnglishValue"])
            if linedict["EnglishUnits"] == "null":
                linedict["EnglishUnits"] = None
            linedict["ConfValue"]    = float(linedict["ConfValue"])
            
            weather_data.append(linedict)
            
    table_weather.insert_many(weather_data)

CPU times: user 6min 16s, sys: 1.82 s, total: 6min 18s
Wall time: 6min 21s


In [13]:
%%time
# Put vehicle data into database
obdy_check = ['CoolantTemp', 'FuelPressure', 'EngineRPM', 'VehicleSpeed', 'IntakeAirTemp', 'AbsoluteThrottlePosition', \
              'TimeSinceEngineStart', 'FuelLevel', 'BarometricPressure', 'AmbientAirTemp', 'EngineFuelRate', 'BrakePedal', \
              'TractionAssist', 'LateralAcceleration', 'LongitudinalAcceleration', 'SteeringAngle', 'Yaw', 'Roll']

#For every vehicle file add a line to a list for each message type. Then add add the lines in the list
#To the correct database table
for file in vehicle_files:
    vaix_data = []
    actx_data = []
    rainx_data = []
    canx_data = []
    mdtx_data = []
    obdy_data = []
    update_data = []

    with open(file) as source:
        csvreader=csv.reader((x.replace('\0', '') for x in source), delimiter=',',quotechar='"')

        for line in csvreader:
            if len(line) == 1:
                break
                
            #Some lines are just garbage lines and do not include all the data they are supposed to
            corrupted_line = False
            for part in line[1:]:
                if ">" in part:
                    corrupted_line = True
                    break
            if "ActX" in line[0]:
                if "0-0" in line[-1]:
                    corrupted_line = True
                for part in line[-5:]:
                    if "." in part:
                        corrupted_line = True
                        break

            #Ignore any lines that include DjGan1X, FrcTowX, MarkX, ForceX as these message types do not match anything
            #In the documentation.  Therefore can not make assumptions about what the data in that lines message means
            #Also ignore corrupted lines.
            if "DjGran1X" not in line[0] and "FrcTowX" not in line[0] and "MarkX" not in line[0] and "ForceX" not in line[0] and not corrupted_line:
                #Build up the string that we want to put into the database
                newline = ["MnDot"]
                line[-1] = line[-1].replace(';', '')
                #Message updates are in different order than all the other message types
                if "Update" in line[0]:
                    newline.append(line[0][8:])
                    newline.extend(line[1:])
                else:
                    newline.extend(line[1:])
                
                #Covert the strings unique to each message type 
                if "Update" in line[0]:
                    linedict = dict(zip(update_headers,newline))
                    linedict["EventCode"] = int(linedict["EventCode"])
                    linedict["IdleStatus"] = int(linedict["IdleStatus"])
                elif "VaiX" in line[0]:
                    linedict = dict(zip(vaix_headers,newline))
                    linedict["RoadTemp"] = float(linedict["RoadTemp"])
                    linedict["AirTemp"] = int(linedict["AirTemp"])
                    linedict["DewPoint"] = float(linedict["DewPoint"])
                    linedict["Humidity"] = int(linedict["Humidity"])
                elif "ActX" in line[0]:
                    linedict = dict(zip(actx_headers,newline))
                    if linedict["IgnitionSignal"] == 'null':
                        linedict["IgnitionSignal"] = None
                    else:
                        linedict["IgnitionSignal"] = int(linedict["IgnitionSignal"])
                    if linedict["ActiveSignal1"] == 'null':
                        linedict["ActiveSignal1"] = None
                    else:
                        linedict["ActiveSignal1"] = int(linedict["ActiveSignal1"])
                    if linedict["ActiveSignal2"] == 'null':
                        linedict["ActiveSignal2"] = None
                    else:
                        linedict["ActiveSignal2"] = int(linedict["ActiveSignal2"])
                    if linedict["ActiveSignal3"] == 'null':
                        linedict["ActiveSignal3"] = None
                    else:
                        linedict["ActiveSignal3"] = int(linedict["ActiveSignal3"])
                    if linedict["ActiveSignal4"] == 'null':
                        linedict["ActiveSignal4"] = None
                    else:
                        linedict["ActiveSignal4"] = int(linedict["ActiveSignal4"])
                elif "RainX" in line[0]:
                    linedict = dict(zip(rainx_headers,newline))
                elif "CanX" in line[0]:
                    linedict = dict(zip(canx_headers,newline))
                elif "MdtX" in line[0]:
                    linedict = dict(zip(mdtx_headers,newline))
                    linedict["CurrentLane"] = int(linedict["CurrentLane"])
                    linedict["Material"] = int(linedict["Material"])
                    linedict["PlowStatus"] = int(linedict["PlowStatus"])
                    linedict["RoadCondition"] = int(linedict["RoadCondition"])
                    linedict["WeatherCondition"] = int(linedict["WeatherCondition"])
                elif "ObdY" in line[0]:
                    #
                    linedict = dict(zip(obdy_headers,newline))
                    for k in obdy_check:
                        if linedict[k] == '*':
                            linedict[k] = None
                        else:
                            linedict[k] = float(linedict[k])
                else:
                    #Sanity check to make sure that the line is from one of the message types that we want to keep
                    print('error: ',line)
                    
                #Now convert the strings that are the same for all message types to the correct datatype
                linedict["ESN"] = int(linedict["ESN"])
                linedict["Timestamp"] = parse(linedict["Timestamp"])
                if linedict["Latitude"] == 'null':
                    linedict["Latitude"] = None
                else:
                    linedict["Latitude"]    = float(linedict["Latitude"])
                if linedict["Longitude"] == 'null':
                    linedict["Longitude"] = None
                else:
                    linedict["Longitude"]    = float(linedict["Longitude"])
                if linedict["Course"] == 'null':
                    linedict["Course"] = None
                else:
                    linedict["Course"]    = float(linedict["Course"])
                if linedict["Velocity"] == 'null':
                    linedict["Velocity"] = None
                else:
                    linedict["Velocity"]    = float(linedict["Velocity"])
                linedict["GPS_Quality"]   = int(linedict["GPS_Quality"])
            
                #Append the newly created line to the correct message type list
                if "VaiX" in line[0]:
                    vaix_data.append(linedict)
                elif "ActX" in line[0]:
                    actx_data.append(linedict)
                elif "RainX" in line[0]:
                    rainx_data.append(linedict)
                elif "CanX" in line[0]:
                    canx_data.append(linedict)
                elif "MdtX" in line[0]:
                    mdtx_data.append(linedict)
                elif "ObdY" in line[0]:
                    obdy_data.append(linedict)
                elif "Update" in line[0]:
                    update_data.append(linedict)
                else:
                    print('error: ',line)
                    
    #Insert all the lines from the current file into the correct tables
    table_vaix.insert_many(vaix_data)
    table_actx.insert_many(actx_data)
    table_rainx.insert_many(rainx_data)
    table_canx.insert_many(canx_data)
    table_mdtx.insert_many(mdtx_data)
    table_obdy.insert_many(obdy_data)
    table_update.insert_many(update_data)

CPU times: user 22min 42s, sys: 9.15 s, total: 22min 51s
Wall time: 23min 14s
