In [4]:
import numpy as np
import os as os
import re as re

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType, StringType, MapType
from pyspark.ml import Pipeline
from pyspark.sql import Row

__author__ = "Su-Young Hong AKA Da Masta Killa AKA Synth Pop Rocks of Locks AKA Intergalactic Chilympian"
__status__ = "Prototype"

# read directory of files and return a list of all driverIDs from csv's insid directory
def get_drivers(dirpath):
    """
    :param dirpath: string, path to directory containing driver csv's
    :return: list, contains all driverIDs as strings
    """
    try:
        allfiles = os.listdir(dirpath)
        drivers = [re.sub(r'[^0-9]', '', i) for i in allfiles]
        drivers.remove('')
        return drivers
    except Exception as e:
        print e

# produces random samples of driverIDs and tripIDs in two separate lists
def random_samples(targ_driv, driv_list, K=200):
    """
    :param targ_driv: str, driverID we want to make false trips for
    :param driv_list: list, list of all drivers, produced by get_drivers()
    :param K: number of trips we want to make for targ_driv
    :return: tuple of lists, first list is random driverIDs, second list is list of tripIDs, both are strings
    """
    try:
        driv_list.remove(targ_driv) #removes the target driver from list of drivers to sample from
        drivers = np.random.choice(driv_list, K, True)
        trips = np.random.choice(np.arange(1,K+1).astype(str), K, True)
        return (drivers, trips)
    except Exception as e:
        print e

# reads directory of files and returns RDD of observations from trips in the sample (driverID, tripID combo)
# NOTE: this function is VERY SLOW, it is what slows the entire workflow down
def sample_data(path, driverIDs, tripIDs):
    """
    :param path: string, path to directory containing driver.csv's
    :param driverIDs: list, list of randomly sampled driverIDs as strings, produced by random_sample()
    :param tripIDs: list, list of randomly sampled tripIDs as strings, produced by random_samples()
        NOTE: the above two zip into a list of (driverID, tripID) tuples, with each tuple being a single item in the
        sample
    :return: RDD, contains only observations from the sample
    """
    try:
        combos = zip(driverIDs, tripIDs)
        samplefiles = [path + '/' + 'driver_' + i + '.csv' for i in driverIDs]
        samplefiles = ','.join(set(samplefiles))  #### NOTE: this set() action is a hack for small num. files
        RDD = sc.textFile(samplefiles)   #### NOTE: with large num. files, might need to set num. partitions
        RDDsplit = RDD.map(lambda x: x.split(','))
        RDDsamples = RDDsplit.filter(lambda x: (x[2],x[3]) in combos)
        RDDsamples.cache()
        return RDDsamples
    except Exception as e:
        print e

# takes RDD of samples and assigns new driverID and tripID to observations in a new RDD
def ID_Data(targ_driver, RDD, K = 200):
    """
    :param targ_driver: string, target driver we used to generate samples
    :param RDD: RDD, trip data RDD produced by sample_data(), format will be original form (x,y,driverID,tripID,step)
    :param K: int, number of trips we sampled
    :return: RDD, in original format, but with driverID and tripID changed to look like new observations of the target
    driver
    """
    try:
        newID1 = [targ_driver] * K
        newID2 = np.arange(200, 201+K).astype(str)
        newID = zip(newID1, newID2)
        oldID = RDD.map(lambda x: (x[2],x[3])).distinct().collect()
        glossary = sc.parallelize(zip(oldID, newID))
        newRDD = RDD.map(lambda x: ((x[2],x[3]), ([x[0],x[1],x[4]]))).join(glossary)
        newID_RDD = newRDD.map(lambda x: (x[1][0][0], x[1][0][1], x[1][1][0], x[1][1][1], x[1][0][2]))
        return newID_RDD
    except Exception as e:
        print e


# takes RDD in original form and converts it into key-value tuple with values being x,y,step,label
def processRDD(RDD, label):
    """
    :param RDD: RDD in original format (x,y,driverID,tripID,step)
    :param label: category of observation, 1 for positive, 0 for negative
    # note, not sure if it needs to be int or float
    :return: RDD, RDD returned in new key/value format: (driverID, tripID), (x, y, step, label)
    # note, x, y, step, and label will be floats
    """
    try:
        newRDD = RDD.map(lambda x: ((x[2],x[3]),(float(x[0]),float(x[1]),float(x[4]),label)))
        return newRDD
    except Exception as e:
        print e

# takes a driver to target, path to directory of driver.csv's, and returns an RDD labeled with
# (driverID, tripID),(x,y,step,label), where a label 1 is from an actual trip, and label 0 is from
# a trip randomly sampled from other drivers
def labelRDDs(targ_driv, path, K=200):
    """
    :param targ_driv: string, driver we want to create positive and negative labeled data for
    :param path: string, path to directory where driver.csvs are stored
    :param K: int, number of negative (manufactured) trips to sample
    :return: RDD with key, value tuple where key is (driverID, tripID) and value is (x,y,step,label)
    """
    try:
        full_path = path + '/' + 'driver_' + targ_driv + '.csv'
        #print full_path
        target = sc.textFile(path + '/' + 'driver_' + targ_driv + '.csv') #load target driver's data
        target2 = target.map(lambda x: x.split(',')) #convert from string to list of strings
        positives = processRDD(target2, 1.0) #label target driver's RDD
        driv_lis = get_drivers(path) #get python list of all possible drivers to sample from
        #print driv_lis
        sampdriv, samptrip = random_samples(targ_driv, driv_lis, K) #generate random samples of drivers and tripIDs
        samples = sample_data(path, sampdriv, samptrip) #generate RDD of random samples
        #print "GETS HERE"
        samplesRDD = ID_Data(targ_driv, samples, K) #relabel samples to look like target driver's trips
        #print "GETS HERE TOO"
        negatives = processRDD(samplesRDD, 0.0) #label samples
        finalRDD = positives.union(negatives).cache() #join target driver and samples together
        return finalRDD
    except Exception as e:
        print e

In [12]:
def combine_tuples(row):
    data = row[1]
    x = []
    y = []
    steps = []
    label = None
    for d in data:
        x.append(d[0])
        y.append(d[1])
        steps.append(d[2])
        label = d[3]
    return Row(label=label, signature={"x":x, "y":y, "steps":steps})

In [13]:
path = '/Users/mayankkedia/code/kaggle/axa_telematics/sample_drivers'
driver = '1'
d1_RDD = labelRDDs('1', path).groupByKey().map(lambda x: combine_tuples(x))
d1_df = sqlContext.createDataFrame(d1_RDD)

In [56]:
d1_df.printSchema()

root
 |-- label: double (nullable = true)
 |-- signature: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: double (containsNull = true)



In [51]:
import math
def add_polar_coords(signature):
    x = signature["x"]
    y = signature["y"]
    
    r = np.sqrt(np.square(x) + np.square(y)).tolist()
    theta = map(lambda p:  math.atan2(p[1],p[0]), zip(x,y))
    
    return {"x":x, "y":y, "r":r, "theta":theta}

In [53]:
sparkUDF = udf(add_polar_coords, MapType(StringType(), ArrayType(FloatType())))
d1_df_nc = d1_df.withColumn("new_signature", sparkUDF("signature"))
d1_df_n = d1_df_nc.select(["label", "new_signature"])

In [54]:
d1_df_n.printSchema()

root
 |-- label: double (nullable = true)
 |-- new_signature: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: float (containsNull = true)



In [57]:
def add_step_level_features(new_signature):
    x = new_signature["x"]
    y = new_signature["y"]
    r = new_signature["r"]
    theta = new_signature["theta"]  
    # x[0] represents the (x[i] - x[i-1])^2 
    # x[1] represents (y[i] - y[i-1])^2
    v = map(lambda x: (x[0] + x[1]) ** 0.5, 
                zip(
                    # Zipping the x coordinates with itself (one row behind) and then 
                    # calculating the velocity by calculating distance traveled in time step 1
                    map(lambda x: (x[0] - x[1]) ** 2, zip(x, [0.0] + x[:-1])), 
                    map(lambda x: (x[0] - x[1]) ** 2, zip(y, [0.0] + y[:-1]))
                   )
               )
    a =  map(lambda x: x[0] - x[1],
            zip(v, [0.0] + v[:-1]))
    
    return {"x":x, "y":y, "r":r, "theta":theta, "v":v, "a":a}

In [58]:
sparkUDF = udf(add_step_level_features, MapType(StringType(), ArrayType(FloatType())))
d1_df_sl = d1_df_n.withColumn("signature", sparkUDF("new_signature")).select(["label", "signature"])

In [59]:
d1_df_sl.printSchema()

root
 |-- label: double (nullable = true)
 |-- signature: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: float (containsNull = true)



In [61]:
def create_trip_level_features(signature):
    
    

def get_velocity_percentiles(row):
    """
    Generates the percentiles for 5, 10, 15 ... 95 for Velocity
    """
    v = row[1][4]
    return np.percentile(v, range(5, 100, 5))


def get_acceleration_percentiles(row):
    """
    Generates the percentiles for 5, 10, 15 ... 95 for Acceleration
    """
    a = row[1][5]
    return np.percentile(a, range(5, 100, 5))


def trip_features(x):
    """
    Calculates the features of the trip from a row which is of the form
    ((driver_id, trip_id), ([x coordinates], [y coordinates],
    [r coordinates], [theta coordinates], [v coordinates], [step numbers], label))
    This is the form of the rows of the output from step_level_features.

    :@param x:
    """
    min_v = min(x[1][4])
    max_v = max(x[1][4])
    min_a = min(x[1][5])
    max_a = max(x[1][5])
    trip_length = len(x[1][0])
    mean_v = np.mean(x[1][4])
    std_v = np.std(x[1][4])
    mean_a = np.mean(x[1][5])
    std_a = np.std(x[1][5])
    time_stop = sum([elem < 0.5 for elem in x[1][4]])
    label = x[1][7]

    numerical_features = (min_v, max_v,
                   min_a, max_a,
                   trip_length,
                   mean_v, std_v,
                   mean_a, std_a,
                   time_stop,
                   label)

    v_percentiles = get_velocity_percentiles(x)
    a_percentiles = get_acceleration_percentiles(x)
    percentiles = np.append(v_percentiles, a_percentiles)
    second_tuple = np.append(percentiles, numerical_features).tolist()
    return (x[0], second_tuple)

+-----+--------------------+
|label|           signature|
+-----+--------------------+
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  1.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
|  0.0|Map(x -> WrappedA...|
+-----+--------------------+
only showing top 20 rows

