In [1]:
import urllib
from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql import Row
from functools import reduce
from pyspark.sql import DataFrame
import matplotlib.pyplot as plt
ACCESS_KEY = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXX"
ENCODED_SECRET_KEY = urllib.quote(SECRET_KEY, "")
AWS_BUCKET_NAME = "telematicsdata"
MOUNT_NAME = "telefonica"
#dbutils.fs.mount("s3n://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
sc=SparkContext.getOrCreate()
sqlContext = SQLContext.getOrCreate(sc)

def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax
pass




In [2]:
display(dbutils.fs.ls("/mnt/"+MOUNT_NAME+"/data/1"))

In [3]:
DATA_BASE_DIR="/mnt/"+MOUNT_NAME+"/data/"
trip_df_list=[]
#cnt=0
for d in dbutils.fs.ls(DATA_BASE_DIR):
  driver=int(d.path.split(DATA_BASE_DIR,1)[1][:-1])
  for f in dbutils.fs.ls(DATA_BASE_DIR+str(driver)+"/"):
    trip=int(f.path.split(DATA_BASE_DIR+str(driver)+"/",1)[1][:-4])
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(DATA_BASE_DIR+str(driver)+"/"+str(trip)+".csv")
    schema  = StructType(df.schema.fields[:] + [StructField("t", IntegerType(), False),StructField("driver", IntegerType(), True),StructField("trip", IntegerType(), True),StructField("driver_trip", StringType(), True),StructField("x_y", ArrayType(DoubleType()), True)])
    df = (df.rdd # Extract rdd
      .zipWithIndex() # Add index
      .map(lambda ri: Row(*list(ri[0]) + [ri[1],driver,trip,str(driver)+"_"+str(trip),[ri[0].x,ri[0].y]])) # Map to rows
      .toDF(schema))
    trip_df_list.append(df)
    """if cnt==0:
      df.saveAsTable("ALLCSVS")
    else:
      df.write.insertInto("ALLCSVS", overwrite=False)
    cnt=cnt+1"""


In [4]:
def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)



In [5]:
print(len(DF_list))

In [6]:
DF_list=[]
for i in range(len(trip_df_list)/1000):
  DF_list.append(unionAll(trip_df_list[i*1000:((i+1)*1000)]))

In [7]:
DF=unionAll(DF_list)

In [8]:
DF=DF.repartition(4,["driver"])
DF=DF.sortWithinPartitions(["driver","trip",'t'],ascending=True)
DF.cache()

In [9]:
DF.write.saveAsTable("ALLCSVS")

In [10]:
DF=sqlContext.sql("SELECT * FROM ALLCSVS")

In [11]:
display(DF)

In [12]:

import pyspark.sql.functions as f
counts=DF.groupBy("driver").agg(f.countDistinct(DF.trip))
display(counts)

In [13]:
display(counts[counts['count(trip)']<200])

In [14]:
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

def reduce_by(self, by, cols, f, schema=None):
    """
    :param self DataFrame
    :param by a list of grouping columns 
    :param cols a list of columns to aggregate
    :param aggregation function Row => Row
    :return DataFrame
    """
    def merge_kv(kv):
        key, value = kv
        return Row(**OrderedDict(zip(
            key.__fields__ + value.__fields__, key + value)
        ))

    return (self
        .select(struct(*by), struct(*cols))
        .rdd
        .reduceByKey(f)
        .map(merge_kv)
        .toDF(schema))

DataFrame.reduce_by = reduce_by

In [15]:
from pyspark.sql.types import *
def foo(row1, row2):
    """ A dummy function
    >>> foo(Row(x=1, y=None), Row(x=None, y=2))
    Row(x=1, y=2)
    """
    return Row(**OrderedDict(zip(
      row1.__fields__, (str(x)+","+str(y) for (x, y) in zip(row1, row2))
    )))
field = [StructField("driver", IntegerType(), True),StructField("trip", IntegerType(), True),StructField("trip_array", StringType(), True)]
schema = StructType(field)

grouped=DF.reduce_by(by=["driver","trip"], cols=["x_y"], f=foo,schema=schema)

def makeMatrix(x):
  return eval("["+x+"]")

mMat=f.udf(lambda x : makeMatrix(x),ArrayType(ArrayType(DoubleType())))
grouped=grouped.withColumn("trip_array",mMat(grouped.trip_array))


In [16]:
grouped.write.format("com.databricks.spark.avro").save("/mnt/all-output")

In [17]:
grouped=sqlContext.read.format("com.databricks.spark.avro").load("/mnt/all-output")
display(grouped)

In [18]:
#Calculate Distances 
from scipy.spatial import distance

def calcDistance(trip_array):
  distances=[]
  for i,x_y in enumerate(trip_array):
    if i>0:
      distances.append(distance.euclidean(trip_array[i-1],trip_array[i]))
  return distances

get_distances=f.udf(calcDistance,ArrayType(DoubleType()))
get_abs_distance=f.udf(lambda arr:distance.euclidean(arr[0],arr[len(arr)-1]),DoubleType())
grouped=grouped.withColumn("trip_distances",get_distances(grouped.trip_array))
grouped=grouped.withColumn("trip_absolute_distance",get_abs_distance(grouped.trip_array))
display(grouped.select(grouped.driver,grouped.trip,grouped.trip_distances))

In [19]:
#Smoothed Speed (with MA)
def running_mean(x,N=10):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return list(map(float,(cumsum[N:] - cumsum[:-N]) / N))

calc_smoothed_speed = f.udf(lambda x:running_mean(x),ArrayType(FloatType()))

grouped=grouped.withColumn("smoothed_speed",calc_smoothed_speed(grouped.trip_distances))
display(grouped.select(grouped.driver,grouped.trip,grouped.smoothed_speed))

In [20]:
#Plot speed & smoothed speed
import matplotlib.pyplot as plt
cols=grouped.select(grouped.driver,grouped.trip,grouped.trip_distances.alias("speed_per_sec"),grouped.smoothed_speed).take(1)[0]
speed_per_sec=cols.speed_per_sec
smoothed_speed=list(np.zeros(9))+cols.smoothed_speed
t=range(len(speed_per_sec))
driver=str(cols.driver)
trip=str(cols.trip)
fig, ax = plt.subplots()
fig.set_figheight(5)
ax.plot(t, speed_per_sec,label="speed per second")
ax.plot(t, smoothed_speed,label="smoothed speed")
ax.set_title('Speed & 10 periods MA Smoothed Speed vs. time for driver '+driver+" trip "+trip)
ax.set_xlabel('time in seconds')
ax.set_ylabel('speed (m/s)')
ax.legend()
display(fig)

In [21]:
#Calculate Durations per Trip

get_length = f.udf(lambda x:len(x),IntegerType())
grouped=grouped.withColumn("trip_duration",get_length(grouped.trip_array))
#display(grouped.select(grouped.driver,grouped.trip,grouped.trip_duration))
#STOPs
def stops(bits):   
  # make sure all runs of ones are well-bounded
  bounded = np.hstack(([1], bits, [1]))
  log = (bounded<0+0.5)*1
  # get 1 at run starts and -1 at run ends
  diffs = np.diff(log)    

  # get indices if starts and ends
  run_starts = np.where(diffs > 0)[0]
  run_ends = np.where(diffs < 0)[0]
  return np.array([run_starts,run_ends,run_ends-run_starts]).T.tolist()
get_info_array = f.udf(stops,ArrayType(ArrayType(IntegerType())))

get_stop_duration = f.udf(lambda x:sum([i[2] for i in x]),IntegerType())
grouped=grouped.withColumn("stop_info_array",get_info_array(grouped.smoothed_speed))
grouped=grouped.withColumn("stop_no",get_length(grouped.stop_info_array))
grouped=grouped.withColumn("total_stop_duration",get_stop_duration(grouped.stop_info_array))
grouped=grouped.withColumn("stop_ratio",grouped.total_stop_duration/grouped.trip_duration)
display(grouped.select(grouped.driver,grouped.trip,grouped.stop_no,grouped.total_stop_duration,grouped.trip_duration,grouped.stop_ratio))
  

In [22]:
#Acceleration
def get_accel(x):
    return list(map(float,(np.diff(x))))
def get_neg_accel(accel_s):
  accel_s=np.array(accel_s)
  return list(map(float,(accel_s[accel_s<0])))
def get_pos_accel(accel_s):
  accel_s=np.array(accel_s)
  return list(map(float,(accel_s[accel_s>0])))
from scipy.signal import savgol_filter
def get_circular_acceleration(ride):
  ride = np.array(ride)
  ride = savgol_filter(ride.T, 7, 3).T

  # http://stackoverflow.com/questions/28269379/curve-curvature-in-numpy
  dx_dt = np.gradient(ride[:, 0])
  dy_dt = np.gradient(ride[:, 1])
  velocity = np.vstack((dx_dt, dy_dt)).T
  ds_dt = np.linalg.norm(velocity, axis=1)
  np.seterr(all='ignore')
  tangent = np.array([1/ds_dt] * 2).T
  np.seterr(all='print')
  tangent = np.nan_to_num(tangent)
  tangent = tangent * velocity
  tangent_x = tangent[:, 0]
  tangent_y = tangent[:, 1]

  deriv_tangent_x = np.gradient(tangent_x)
  deriv_tangent_y = np.gradient(tangent_y)
  dT_dt = np.vstack((deriv_tangent_x, deriv_tangent_y)).T
  length_dT_dt = np.linalg.norm(dT_dt, axis=1)

  np.seterr(all='ignore')
  normal = np.array([1/length_dT_dt] * 2).T
  np.seterr(all='print')
  normal = np.nan_to_num(normal)
  normal = normal * dT_dt
  d2s_dt2 = np.gradient(ds_dt)
  d2x_dt2 = np.gradient(dx_dt)
  d2y_dt2 = np.gradient(dy_dt)

  np.seterr(all='ignore')
  curvature = np.abs(d2x_dt2 * dy_dt - dx_dt * d2y_dt2) / (dx_dt * dx_dt + dy_dt * dy_dt)**1.5
  np.seterr(all='print')
  curvature = np.nan_to_num(curvature)

  t_comp = d2s_dt2
  n_comp = curvature * ds_dt * ds_dt
  t_component = np.array([t_comp] * 2).T
  n_component = np.array([n_comp] * 2).T
  
  #acceleration = [float(np.linalg.norm(v,2)) for v in (t_component * tangent + n_component * normal)]
  #Calculating magnitude of the acceleration vectors!!!
  acceleration = [float(np.linalg.norm(v)) for v in (t_component * tangent + n_component * normal)]
  return acceleration
calc_acceleration = f.udf(get_accel,ArrayType(FloatType()))
calc_neg_acceleration = f.udf(get_neg_accel,ArrayType(FloatType()))
calc_pos_acceleration = f.udf(get_pos_accel,ArrayType(FloatType()))
calc_circular_acceleration = f.udf(get_circular_acceleration,ArrayType(FloatType()))
grouped=grouped.withColumn("accelerations",calc_acceleration(grouped.smoothed_speed))
grouped=grouped.withColumn("pos_accelerations",calc_pos_acceleration(grouped.accelerations))
grouped=grouped.withColumn("neg_accelerations",calc_neg_acceleration(grouped.accelerations))
grouped=grouped.withColumn("circular_accelerations",calc_circular_acceleration(grouped.trip_array))
display(grouped.select(grouped.driver,grouped.trip,grouped.circular_accelerations))

In [23]:
#Speed Descriptive Statistics
import numpy as np

avg=udf(lambda xs: float(np.mean(xs)) if len(xs)>0 else 0, FloatType())
median=udf(lambda xs: float(np.median(xs)) if len(xs)>0 else 0, FloatType())
max_udf = udf(lambda xs: float(np.max(xs))if len(xs)>0 else 0, FloatType())
min_udf = udf(lambda xs: float(np.min(xs))if len(xs)>0 else 0, FloatType())
std_udf = udf(lambda xs: float(np.std(xs))if len(xs)>0 else 0, FloatType())
calc_total_trip_length = udf(lambda xs: float(np.sum(xs)), FloatType())

grouped=grouped.select(grouped.driver,grouped.trip,grouped.trip_array,grouped.trip_distances
                       ,grouped.accelerations,grouped.pos_accelerations,grouped.neg_accelerations,grouped.circular_accelerations
                       ,grouped.smoothed_speed,grouped.trip_absolute_distance,grouped.trip_duration
                       ,grouped.stop_info_array,grouped.stop_no,grouped.total_stop_duration,grouped.stop_ratio
                       ,avg(grouped.smoothed_speed).alias("average_trip_speed")
                       ,median(grouped.smoothed_speed).alias("median_trip_speed")
                       ,max_udf(grouped.trip_distances).alias("max_trip_speed")
                      ,std_udf(grouped.trip_distances).alias("std_trip_speed_per_sec")
                      ,calc_total_trip_length(grouped.trip_distances).alias("total_trip_distance")
                       ,avg(grouped.pos_accelerations).alias("average_acceleration")
                       ,median(grouped.pos_accelerations).alias("median_acceleration")
                       ,max_udf(grouped.pos_accelerations).alias("max_acceleration")
                       ,std_udf(grouped.pos_accelerations).alias("std_acceleration")
                       ,avg(grouped.neg_accelerations).alias("average_breaking")
                       ,median(grouped.neg_accelerations).alias("median_breaking")
                       ,min_udf(grouped.neg_accelerations).alias("max_breaking")
                       ,std_udf(grouped.neg_accelerations).alias("std_breaking")
                      ,avg(grouped.circular_accelerations).alias("average_circular_acceleration")
                       ,median(grouped.circular_accelerations).alias("median_circular_acceleration")
                       ,max_udf(grouped.circular_accelerations).alias("max_circular_acceleration")
                       ,std_udf(grouped.circular_accelerations).alias("std_circular_acceleration"))
display(grouped.select(grouped.driver,grouped.trip,grouped.average_trip_speed,grouped.median_trip_speed
                       ,grouped.max_trip_speed,grouped.std_trip_speed_per_sec,grouped.total_trip_distance
                      ,grouped.average_acceleration,grouped.median_acceleration,grouped.max_acceleration,grouped.std_acceleration
                      ,grouped.average_breaking,grouped.median_breaking,grouped.max_breaking,grouped.std_breaking
                      ,grouped.average_circular_acceleration, grouped.median_circular_acceleration, grouped.max_circular_acceleration,
                       grouped.std_circular_acceleration))

In [24]:
#ANGLES (Changed)

import math
def get_angle(p1, p2, p3):
  dot_product = (p1[0] - p2[0]) * (p3[0] - p2[0]) + (p1[1] - p2[1]) * (p3[1] - p2[1])
  denominator = max(distance.euclidean(p1, p2) * distance.euclidean(p2, p3), 0.1)

  # just in case dot_product is infinitesimaly larger than denominator
  ratio = dot_product / denominator
  if ratio > 1:
    ratio = 1
  if ratio < -1:
    ratio = -1
  angle = math.acos(ratio)

  return angle * 180 / math.pi

def calcAngles(trip_array):
  angles=[]
  for i,x_y in enumerate(trip_array):
    if i>1:
      angles.append(get_angle(trip_array[i-2],trip_array[i-1],trip_array[i]))
  return angles

get_angles=f.udf(calcAngles,ArrayType(DoubleType()))
get_angle_changes=f.udf(lambda x:np.abs(np.diff(x)).tolist(),ArrayType(DoubleType()))
def getTurn(x):
  x=np.array(x[10:-10])
  return len(x[x>15])

get_turn_no=f.udf(getTurn,IntegerType())
grouped=grouped.withColumn("trip_angles",get_angles(grouped.trip_array))
grouped=grouped.withColumn("trip_angle_changes",get_angle_changes(grouped.trip_angles))
grouped=grouped.withColumn("avg_trip_angle_changes",avg(grouped.trip_angles))
grouped=grouped.withColumn("turn_no",get_turn_no(grouped.trip_angle_changes))
grouped=grouped.withColumn("turn_ratio",grouped.turn_no.cast("float")/grouped.trip_duration)

display(grouped.select(grouped.driver,grouped.trip,grouped.trip_angle_changes,grouped.turn_no,grouped.turn_ratio))

In [25]:
#Calculate RDP Smoothed Distances to match the Trips, 
#it might be better to keep EPSILON THRESHOLD HIGHER
EPSILON=10
from rdp import rdp 
get_distances=f.udf(lambda arr:rdp(arr,epsilon=EPSILON),ArrayType(ArrayType(DoubleType())))
grouped=grouped.withColumn("rdp_smt_trip_array",get_distances(grouped.trip_array))
display(grouped.select(grouped.driver,grouped.trip,grouped.rdp_smt_trip_array))

In [26]:
#ROTATE RDP smoothed Routes
def removeRotation(XY):
    """ change of basis matrix so that the horizontal (x) axis is the vector between the first
        and last point

        Param: XY must be an N x 2 numpy array
        Return: Nx2 array of vectors in new basis

        Assumes all XY vectors start at origin (obvious from fn name)
    """
    # calc the unit vectors of the new basis
    xdash = XY[-1]

    ydash = np.array( [-xdash[1], xdash[0] ])

    normXdash = np.linalg.norm(xdash)
    normYdash = np.linalg.norm(ydash)

    # adapt for round trip!!! 
    if normXdash > 0:
        u = xdash /normXdash
    else:
        u = np.array([1,0])
    if normYdash > 0:
        v = ydash / normYdash
    else:
        v = np.array([0,1])

    # change of basis 'matrix' - (x',y') = M(inv)(x,y)
    # Minv is just transpose of the new basis matrix M since rotn about origin
    Mdash = np.array([[u[0],u[1]],[v[0],v[1]]])

    # now transform aall the points t the new basis
    # Mdash * XY -> 2x2 x (2xN) hence transpose 
    XYnew = np.dot(Mdash, np.array(XY).T)

    # return it back as Nx2
    return (XYnew.T).tolist()
def rotate_path( route, angle_to_rotate=90): 
  rotation_matrix = [   [ np.cos(angle_to_rotate), -1 * np.sin(angle_to_rotate) ], 
                            [ np.sin(angle_to_rotate),      np.cos(angle_to_rotate) ]  ]
  return np.dot( route, rotation_matrix).tolist()
get_rotated_array=f.udf(lambda x:removeRotation(np.array(x)),ArrayType(ArrayType(DoubleType())))
grouped=grouped.withColumn("rotated_rdp_smt_trip_array",get_rotated_array(grouped.rdp_smt_trip_array))
display(grouped.select(grouped.driver,grouped.trip,grouped.rotated_rdp_smt_trip_array))

In [27]:
grouped.write.format("com.databricks.spark.avro").save("/mnt/all-feat-output2")

In [28]:
featurized=sqlContext.read.format("com.databricks.spark.avro").load("/mnt/all-feat-output2")
display(featurized)

In [29]:
#CORRELATION ANALYSIS BETWEEN FEATURES

drop=['rdp_smt_trip_array',"trip_distances","trip_angles", "trip_array","accelerations","pos_accelerations","neg_accelerations","circular_accelerations","smoothed_speed",'stop_info_array',"trip_angle_changes",'rotated_rdp_smt_trip_array']
drop.append("driver")
drop.append("trip")
drop.append("driver_trip")
drop.append("dbucket")
drop.append("target")
keep = [c for c in featurized.columns if c not in drop]
onlyfeatureDF=featurized.select(keep)


In [30]:
#Write Features
onlyfeatureDF.write.saveAsTable("all_feats2")

In [31]:
drop=['rdp_smt_trip_array',"trip_distances","trip_angles", "trip_array","accelerations","pos_accelerations","neg_accelerations","circular_accelerations","smoothed_speed",'stop_info_array',"trip_angle_changes",'rotated_rdp_smt_trip_array']
keep = [c for c in featurized.columns if c not in drop]
featurized=featurized.select(keep)

In [32]:
#CREATING TRAINING AND TEST SETS
import itertools
#SEED=123
driver_trip_count=200
zeros_sample_count=200
train_ratio=0.9
all_drivers_trip_count=featurized.select(["trip"]).count()
other_drivers_trip_count=all_drivers_trip_count-driver_trip_count
trip_arr=range(1,driver_trip_count+1)
makeStr=f.udf(lambda x,y:str(x)+"_"+str(y),StringType())
train_df_list=[]
test_df_list=[]
driver_list=[d.driver for d in featurized.select(["driver"]).distinct().collect()]
for driver in driver_list:
  driverdf=(featurized.filter(featurized.driver==driver).withColumn("driver_trip",makeStr(featurized.driver,featurized.trip))
            .withColumn("dbucket",f.lit(driver)).withColumn("target",f.lit(1)))
  train_trips=list(np.random.choice(trip_arr, int(train_ratio*driver_trip_count),replace=False))
  test_trips=list(set(trip_arr)-set(train_trips))
  train_onesdf=driverdf.filter(driverdf.trip.isin(train_trips))
  test_onesdf=driverdf.filter(driverdf.trip.isin(test_trips))
  #test_onesdf.take(1)
  other_driv_trips=[str(e[0])+"_"+str(e[1]) for e in itertools.product(*[driver_list,trip_arr]) if e[0]!=driver]
  
  random_other_driv_trips=list(np.random.choice(other_driv_trips, int(driver_trip_count),replace=False))
  train_other_driv_trips=list(np.random.choice(random_other_driv_trips, int(driver_trip_count*train_ratio),replace=False))
  test_other_driv_trips=list(set(random_other_driv_trips)-set(train_other_driv_trips))
  zerosdf=(featurized.filter(featurized.driver!=driver).withColumn("driver_trip",makeStr(featurized.driver,featurized.trip))
           .withColumn("dbucket",f.lit(driver)).withColumn("target",f.lit(0)))
  train_zerosdf=zerosdf.filter(zerosdf.driver_trip.isin(train_other_driv_trips))
  test_zerosdf=zerosdf.filter(zerosdf.driver_trip.isin(test_other_driv_trips))
  train_df_list.append(train_onesdf)
  train_df_list.append(train_zerosdf)
  test_df_list.append(test_onesdf)
  test_df_list.append(test_zerosdf)

from pyspark.sql import DataFrame
def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)
TRAIN=unionAll(train_df_list)
TEST=unionAll(test_df_list)
#TRAIN.cache()
#TEST.cache()

In [33]:
TRAIN.saveAsTable("TRAIN")
TEST.saveAsTable("TEST")


In [36]:
TRAIN=sqlContext.sql("SELECT * FROM TRAIN")
TEST=sqlContext.sql("SELECT * FROM TEST")
#IMPORTANT HERE I used table caching in Databricks for these two tables!!!!

In [37]:
# CHECK AGAIN
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

feature_cols=TRAIN.columns
#REMOVING non-feature columns
feature_cols.remove("driver")
feature_cols.remove("trip")
feature_cols.remove("driver_trip")
feature_cols.remove("dbucket")
feature_cols.remove("target")

dbucket_list=[d.dbucket for d in TRAIN.select(["dbucket"]).distinct().collect()]
LabelledTRAIN_DF_list=[]
LabelledTEST_DF_list=[]
for dbucket in dbucket_list:
  #dbucket_TRAIN=TRAIN.filter(TRAIN.dbucket==dbucket)
  dbucket_TRAIN=sqlContext.sql("SELECT * FROM TRAIN WHERE dbucket="+str(dbucket))
  #dbucket_TEST=TEST.filter(TEST.dbucket==dbucket)
  dbucket_TEST=sqlContext.sql("SELECT * FROM TEST WHERE dbucket="+str(dbucket))
  assembler = VectorAssembler(
      inputCols=feature_cols,
      outputCol="features")

  transformedTRAIN = assembler.transform(dbucket_TRAIN)
  LabelledTRAIN=(transformedTRAIN.select(col("target").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features)))
  LabelledTRAIN_DF_list.append((dbucket,LabelledTRAIN.toDF()))
  #LabelledTRAIN.take(1)

  transformedTEST = assembler.transform(dbucket_TEST)
  LabelledTEST=(transformedTEST.select(col("target").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features)))
  LabelledTEST_DF_list.append((dbucket,LabelledTEST.toDF()))
  #LabelledTEST.take(1)

In [38]:
#RANDOM FOREST CLASSIFIER
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,DecisionTreeClassifier,GBTClassifier,GBTClassificationModel
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
#from pyspark.mllib.tree import RandomForest
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer

def train_driver_model(LabelledTRAIN_DF):
  numFolds = 10
  #http://stackoverflow.com/questions/28818692/pyspark-mllib-class-probabilities-of-random-forest-predictions
  #http://spark.apache.org/docs/latest/ml-classification-regression.html#output-columns-predictions
  """rf = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                       numTrees=3, featureSubsetStrategy="auto",
                                       impurity='gini', maxDepth=4, maxBins=32)"""
  # Index labels, adding metadata to the label column
  labelIndexer = StringIndexer(inputCol='label',
                               outputCol='indexedLabel').fit(LabelledTRAIN_DF)

  # Automatically identify categorical features and index them
  featureIndexer = VectorIndexer(inputCol='features',
                                 outputCol='indexedFeatures',
                                 maxCategories=2).fit(LabelledTRAIN_DF)
  rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",impurity='gini')
  #dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures',impurity='gini')

  paramGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [5,6,7]).addGrid(rf.numTrees, range(9,15,2)).addGrid(rf.maxBins,[100])
               #.addGrid(dTree.maxDepth,[3,4,5,6]).addGrid(dTree.maxBins,[100])
               .build())

  #
  #paramGrid.build()
  #https://www.mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages
  #evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="indexedLabel",metricName="precision") # + other params as in Scala    
  evaluator = BinaryClassificationEvaluator(labelCol='indexedLabel', metricName='areaUnderROC')   
  #pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])
  pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
  crossval = CrossValidator(
      estimator=pipeline,
     estimatorParamMaps=paramGrid,
      evaluator=evaluator,
      numFolds=numFolds)

  cv_model = crossval.fit(LabelledTRAIN_DF)
  best_model = cv_model.bestModel.stages[2]
  print(best_model)
  return cv_model,evaluator

In [40]:
#MULTI THREAD TRAINING-->try AGAIN
from multiprocessing.pool import ThreadPool
N_THREADS=5
tpool = ThreadPool(processes=N_THREADS)
def execute_training_thread(DF_tuple):
  cv_model,evaluator=train_driver_model(DF_tuple[1])
  return (DF_tuple[0],(cv_model,evaluator))

CV_MODELS_LIST = tpool.map(execute_training_thread,LabelledTRAIN_DF_list)
CV_MODELS_LIST

In [42]:
#EVALUATION -->Try this
#vectorized_test_data = vectorizeData(final_test_data)
def validate_driver_model(cv_model,LabelledTEST_DF,dbucket,evaluator):
  #vectorized_data=vectorizeData(LabelledTEST_DF)
  transformed_data = cv_model.transform(LabelledTEST_DF)
  auc=evaluator.evaluate(transformed_data)
  print "Driver Bucket:",dbucket,' ROC_AUC:', auc
  getOneProb=f.udf(lambda x:x["values"][1],StringType())
  #predictions = transformed_data.select('indexedLabel', 'prediction', getOneProb(transformed_data.probability).alias("probability"))
  predictions = transformed_data.select( 'prediction', "probability")
  predictions=predictions.toPandas()
  predictions["probability"] =predictions.apply(lambda x:x[1][1], axis=1)
  predictions=sqlContext.createDataFrame(predictions)
  print(predictions.toPandas().head(5))
  return auc

In [43]:
#MULTI THREADED VALIDATION-->Try This
from multiprocessing.pool import ThreadPool
N_THREADS=5
tpool = ThreadPool(processes=N_THREADS)
def execute_validation_thread(CV_MODELS_LIST,DF_tuple):
  model_tuple=[v for k,v  in CV_MODELS_LIST if k==DF_tuple[0]][0]
  cv_model=model_tuple[0]
  evaluator=model_tuple[1]
  auc=validate_driver_model(cv_model,DF_tuple[1],DF_tuple[0],evaluator)
  return (DF_tuple[0],auc)
AUC_LIST = tpool.map(lambda x:execute_validation_thread(CV_MODELS_LIST,x),LabelledTEST_DF_list)
print(AUC_LIST)
AVG_AUC=np.mean([v for k,v in AUC_LIST])
print("Average AUC",str(AVG_AUC))

In [44]:
#PREDICTING ALL PROBABILITIES AND CREATING OUTPUT DF

from pyspark.sql import DataFrame
assembler = VectorAssembler(
      inputCols=feature_cols,
      outputCol="features")
driver_list=[d.driver for d in featurized.select(["driver"]).distinct().collect()]
driver_final_df_list=[]
for driver in driver_list:
  driver_featurized=featurized.filter(featurized.driver==driver)
  driver_transformed = assembler.transform(driver_featurized)
  labelled_driver=(driver_transformed.select(f.lit(1).alias("label"), col("features"))
      .map(lambda row: LabeledPoint(row.label, row.features)))
  
  cv_model=[v[0] for k,v  in CV_MODELS_LIST if k==driver][0]
  print(cv_model)
  transformed_data=cv_model.transform(labelled_driver.toDF())
  print(transformed_data)
  predictions = transformed_data.select('indexedLabel', 'prediction', "probability")
  print(transformed_data.columns)
  predictions=predictions.toPandas()
  predictions["probability"] =predictions.apply(lambda x:str(x[2][1]), axis=1)
  #predictions=sqlContext.createDataFrame(predictions).select(['indexedLabel',"probability"])
  pan_driver_featurized=driver_featurized.toPandas()
  pan_driver_featurized["driver_trip"] =pan_driver_featurized[["driver","trip"]].apply(lambda x: str(x[0])+'_'+str(x[1]), axis=1)
  driver_final=sqlContext.createDataFrame(pan_driver_featurized.join(predictions))
  #driver_final=joined.withColumn("driver_trip",makeStr(featurized.driver,featurized.trip))
  driver_final=driver_final.select(["driver_trip","probability"])
  driver_final_df_list.append(driver_final)

def unionAll(dfs):
  return reduce(DataFrame.unionAll, dfs)
finalDF=unionAll(driver_final_df_list)
finalDF.cache()
display(finalDF)

In [45]:
#TRAIN.repartition(4,["dbucket"])
#TRAIN.cache()

In [46]:
dbucket_list=[d.dbucket for d in TRAIN.select(["dbucket"]).distinct().collect()]
for i in range(len(dbucket_list)/4):
  print("Start Driver Bucket Batch no:",str(i+1))
  print(dbucket_list[i*4:((i+1)*4)])
 
  print("End Driver Bucket Batch no:",str(i+1))

In [47]:
#MULTI THREAD TRAINING
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col
import itertools
feature_cols=TRAIN.columns
#REMOVING non-feature columns
feature_cols.remove("driver")
feature_cols.remove("trip")
feature_cols.remove("driver_trip")
feature_cols.remove("dbucket")
feature_cols.remove("target")

#REMOVING some features with high correlation
feature_cols.remove("max_breaking")
feature_cols.remove("average_circular_acceleration")
feature_cols.remove("average_trip_speed")
feature_cols.remove("max_circular_acceleration")
#feature_cols.remove("std_circular_acceleration")

assembler = VectorAssembler(
      inputCols=feature_cols,
      outputCol="features")
########################################
def execute_training_thread(dbucket):
  dbucket_TRAIN=sqlContext.sql("SELECT * FROM TRAIN WHERE dbucket="+str(dbucket))
  transformedTRAIN = assembler.transform(dbucket_TRAIN)
  LabelledTRAIN=(transformedTRAIN.select(col("target").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features))).toDF()
  cv_model,evaluator=train_driver_model(LabelledTRAIN)
  return (dbucket,(cv_model,evaluator))
#########################################

dbucket_list=[d.dbucket for d in TRAIN.select(["dbucket"]).distinct().collect()]
models_agg_list=[]
from multiprocessing.pool import ThreadPool
N_THREADS=4
N=4
for i in range(len(dbucket_list)/N):
  print("Start Driver Bucket Batch no:",str(i+1))
  tpool = ThreadPool(processes=N_THREADS)
  models_sub_list=tpool.map(execute_training_thread,dbucket_list[i*N:((i+1)*N)])
  models_agg_list.append(models_sub_list)
  print("End Driver Bucket Batch no:",str(i+1))
CV_MODELS_LIST = [e for e in itertools.chain(*models_agg_list)]
CV_MODELS_LIST


In [48]:
#MULTI THREADED VALIDATION
def execute_validation_thread(CV_MODELS_LIST,dbucket):
  dbucket_TEST=sqlContext.sql("SELECT * FROM TEST WHERE dbucket="+str(dbucket))
  transformedTEST = assembler.transform(dbucket_TEST)
  LabelledTEST=(transformedTEST.select(col("target").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features))).toDF()
  model_tuple=[v for k,v  in CV_MODELS_LIST if k==dbucket][0]
  cv_model=model_tuple[0]
  evaluator=model_tuple[1]
  auc=validate_driver_model(cv_model,LabelledTEST,dbucket,evaluator)
  return (dbucket,auc)
AUC_LIST = tpool.map(lambda x:execute_validation_thread(CV_MODELS_LIST,x),dbucket_list)
print(AUC_LIST)
AVG_AUC=np.mean([v for k,v in AUC_LIST])
print(AVG_AUC)

In [50]:
#Correlation Matrix to observe the correlated features
from pyspark.mllib.stat import Statistics
import pandas as pd
featTRAIN=LabelledTRAIN.map(lambda lp:lp.features)
correlation_matrix = Statistics.corr(featTRAIN, method="spearman")
#display(correlation_matrix)

pd.set_option('display.max_columns', 50)

corr_df = pd.DataFrame(correlation_matrix, index=feature_cols, columns=feature_cols)
corr_disp_df=corr_df
corr_disp_df.insert(0, 'features',corr_disp_df.index)
display(sqlContext.createDataFrame(corr_disp_df))

In [51]:
# get a boolean dataframe where true means that a pair of variables is highly correlated
highly_correlated_df = (abs(corr_df[feature_cols]) > .8) & (corr_df[feature_cols] < 1.0)
# get the names of the variables so we can use them to slice the dataframe
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
# slice it
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

In [57]:
#PREDICTING ALL PROBABILITIES AND CREATING OUTPUT DF

from pyspark.sql import DataFrame
assembler = VectorAssembler(
      inputCols=feature_cols,
      outputCol="features")
driver_list=[d.driver for d in featurized.select(["driver"]).distinct().collect()]
driver_final_df_list=[]
for driver in driver_list:
  driver_featurized=featurized.filter(featurized.driver==driver)
  driver_transformed = assembler.transform(driver_featurized)
  labelled_driver=(driver_transformed.select(f.lit(1).alias("label"), col("features"))
      .map(lambda row: LabeledPoint(row.label, row.features)))
  
  cv_model=[v[0] for k,v  in CV_MODELS_LIST if k==driver][0]
  print(cv_model)
  transformed_data=cv_model.transform(labelled_driver.toDF())
  print(transformed_data)
  predictions = transformed_data.select('indexedLabel', 'prediction', "probability")
  print(transformed_data.columns)
  predictions=predictions.toPandas()
  predictions["probability"] =predictions.apply(lambda x:str(x[2][1]), axis=1)
  #predictions=sqlContext.createDataFrame(predictions).select(['indexedLabel',"probability"])
  pan_driver_featurized=driver_featurized.toPandas()
  pan_driver_featurized["driver_trip"] =pan_driver_featurized[["driver","trip"]].apply(lambda x: str(x[0])+'_'+str(x[1]), axis=1)
  driver_final=sqlContext.createDataFrame(pan_driver_featurized.join(predictions))
  #driver_final=joined.withColumn("driver_trip",makeStr(featurized.driver,featurized.trip))
  driver_final=driver_final.select(["driver_trip","probability"])
  driver_final_df_list.append(driver_final)

def unionAll(dfs):
  return reduce(DataFrame.unionAll, dfs)
finalDF=unionAll(driver_final_df_list)
finalDF.cache()
display(finalDF)