# Kaggle AXA Telematics analysis notebook

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

## General purpose functions

In [None]:
# Gets the angle from y to x
def angleOfVectors( x, y ):
    mx = numpy.linalg.norm(x)
    if mx == 0:
        return 0
    my = numpy.linalg.norm(y)
    if my == 0:
        return 0
    mxy = mx * my
    sint = ( x[0]*y[1] - x[1]*y[0] ) / mxy
    if sint > 1: sint = 1
    if sint < -1: sint = -1
    cost = ( x[0]*y[0] + x[1]*y[1] ) / mxy
    if cost > 1: cost = 1
    if cost < -1: cost = -1
    if cost >= 0:
        return numpy.arcsin( sint )
    else:
        if sint > 0:
            return numpy.pi - numpy.arcsin( sint )
        else:
            if cost > 1: cost=1
            if cost < -1: cost=-1
            return -( numpy.arccos( cost ) )

## Class definitions

In [None]:
import os
import os.path
import sys
import numpy
import pickle
import pandas
import multiprocessing
from queue import Empty
import scipy.fftpack

#######################################
# ProcessLogger class
#######################################

class ProcessLogger:
    def __init__( self, numberOfTasks, outputStream = sys.stdout ):
        self.__numberOfTasks = numberOfTasks
        self.__tasksCompleted = 0
        self.__outputStream = outputStream
        return

    def taskEnded( self ):
        if self.__tasksCompleted == self.__numberOfTasks:
            return
        self.__tasksCompleted += 1
        percentage = numpy.around( 100.0 * self.__tasksCompleted / self.__numberOfTasks, 1 )
        if self.__tasksCompleted > 1:
            self.__outputStream.write( '\r' )
        self.__outputStream.write( "Tasks processed: " + str(self.__tasksCompleted) + " / " + str( self.__numberOfTasks) + " ( " + str(percentage) + "% )" )
        if self.__tasksCompleted == self.__numberOfTasks:
            self.__outputStream.write( '\n' )
        self.__outputStream.flush()
        return


#######################################
# Segment class
#######################################
class Segment:
    def __init__(self, coordinates ):
        self.__origin = coordinates[0]
        self.__v = numpy.diff( coordinates, axis=0)
        return
    
    def rawData( self ):
        result = []
        p = self.__origin
        result.append( p )
        for v in self.__v:
            p = p + v
            result.append(p)
        return numpy.array(result)            

    # The time duration of the segment
    def duration( self ):
        return len( self.__v )
    
    # The distance (and speed) vector
    def speedValues( self ):
        return numpy.apply_along_axis( lambda x: numpy.sqrt( x[0]**2+x[1]**2), 1, self.__v )

    # The distance traveled
    def distanceTraveled( self ):
        return numpy.sum( self.speedValues() )

    # The acceleration values
    def accelerationValues( self ):
        speedValues = self.speedValues()
        return numpy.diff( speedValues )

    # The angular values
    def angularValues( self ):
        angles = numpy.zeros( len(self.__v) - 1 )
        for i in range(len(angles)):
            angles[i] = anglesOfVectors( self.__v[i], self.__v[i+1] )
        return angles
            
    # Fourier transformation of speed values
    def speedFFT( self ):
        sf = numpy.abs( scipy.fftpack.fft( self.speedValues() ) )
        n = int( numpy.around(0.1 + len(sf) / 2.0) )
        return sf[0:n]

    # Fourier transformation of anglular values
    def angleFFT( self ):
        af = numpy.abs( scipy.fftpack.fft( self.angularValues() ) )
        n = int( numpy.around(0.1 + len(af) / 2.0) )
        return af[0:n]


    
#######################################
# Trip class
#######################################

class Trip:
    
    # Constructor
    def __init__( self, tripId ):
        self.__lastPoint = None
        self.__segments = []
        self.__id = tripId
        self.__segmentsGenerated = False
        self.__extraTravelDuration = 0
        self.__extraTravelLength = 0
        return

    def __del__( self ):
        return

    # Returns the trip id
    def id( self ):
        return self.__id

    # Sets directly the raw data from a numpy array
    def setData( self, data ):
        self.__lastPoint = data[ len(data)-1]
        self.__tripDuration = len(data)-1
        self.__extraTravelDuration = 0
        self.__extraTravelLength = 0
        self.__generateSegments( data )
        return

    # The time duration of the trip
    def tripDuration( self ):
        result = 0
        for segment in self.__segments:
            result += segment.duration()        
        return result + self.__extraTravelDuration

    # The trip segments
    def segments( self ):
        return self.__segments

    # Returns the number of segments
    def numberOfSegments( self ):
        return len(self.__segments)

    # Returns the distance to the ending point
    def distanceToEndPoint( self ):
        return numpy.sqrt( self.__endPoint[0]**2 + self.__endPoint[1]**2 )

    # The total distance traveled, derived from the segments
    def distanceTraveled( self ):
        result = 0
        for segment in self.__segments:
            result += segment.distanceTraveled()
        return result + self.__extraTravelLength

    # The mean spead derived by the segments
    def meanSpeed( self ):
        dS = 0
        dt = 0
        for segment in self.__segments:
            dS += segment.distanceTraveled()
            dt += segment.duration()
        if dt == 0:
            return 0
        else:
            return dS/dt

    # The speed values
    def speedValues( self ):
        speedValues = numpy.array([])
        for segment in self.__segments:
            speedValues = numpy.hstack( (speedValues, segment.speedValues() ) )
        return speedValues

    # The speed values distribution
    def speedStatistics( self ):
        if len( speedValues ) < 3:
            return numpy.zeros(5)
        else:
            return numpy.percentile(speedValues,[5,25,50,75,95])

    # The acceleration values
    def accelerationValues( self ):
        accelerationValues = numpy.array([])
        for segment in self.__segments:
            accelerationValues = numpy.hstack( (accelerationValues, segment.accelerationValues() ) )
        return accelerationValues

    # The acceleration values distribution
    def accelerationStatistics( self ):
        accelerationValues = self.accelerationValues()
        if len( accelerationValues ) < 3:
            return numpy.zeros(5)
        else:
            return numpy.percentile(accelerationValues,[5,25,50,75,95])

    # The direction angles
    def angularValues( self ):
        angularValues = numpy.array([])
        for segment in self.__segments:
            angularValues = numpy.hstack( (angularValues, segment.angularValues() ) )
        return angularValues
        

    # The direction angles distribution
    def angularStatistics( self ):
        angularValues = self.angularValues()
        if len( angularValues ) < 3:
            return numpy.zeros(5)
        else:
            return numpy.percentile(angularValues,[5,25,50,75,95])        

    # Private function that generates the trip segments
    def __generateSegments(self, rawData ):
        if self.__segmentsGenerated :
            return
        
        self.__extraTravelDuration = 0
        self.__extraTravelLength = 0
        
        # First pass : remove the zero speed segments
        segments = self.__removeZeroSpeedSegments( rawData )

        # Second pass : identify gaps and correct speed jitter
        segments2 = []
        for segment in segments:
            subsegments = self.__identifyGapsCorrectJitter( segment )
            for subsegment in subsegments: segments2.append( subsegment )

        # Third pass : remove angular jitter
        segments = []
        for segment in segments2:
            subsegments = self.__removeAccuteAngleSegments( segment )
            for subsegment in subsegments: segments.append( subsegment )
        
        # Fourth pass : treat the gap and the jitters again
        segments2 = []
        for segment in segments:
            subsegments = self.__identifyGapsCorrectJitter( segment )
            for subsegment in subsegments: segments2.append( subsegment )        

        # Create the segment objects
        segments = segments2
        
        self.__segments = []
        for segment in segments:
            self.__segments.append( Segment( segment ) )
        
        self.__segmentsGenerated = True
        return

    # Function to apply the removal of the zero speed segments
    def __removeZeroSpeedSegments( self, tripData ):
        zeroSpeedTolerance = 1.5

        segments=[]
        segentStartingIndex = 0
        zeroSpeedCounter = 0
        p_previous = tripData[0]
        for i in range(1,len(tripData)):
            p_current = tripData[i]
            v_current = p_current - p_previous
            v_current = numpy.sqrt(v_current[0]**2 + v_current[1]**2)

            if v_current < zeroSpeedTolerance:
                if zeroSpeedCounter == 0 : # Mark the end of a segment and beginning of a zero speed sequence
                    if i - segentStartingIndex > 1:
                        segments.append( tripData[ segentStartingIndex : i] )
                zeroSpeedCounter += 1
                segentStartingIndex = i
            else:
                zeroSpeedCounter = 0

            p_previous = p_current

        if len(tripData) - segentStartingIndex > 2:
            segments.append( tripData[segentStartingIndex : len(tripData)] )

        return segments
    
    # Function to identify the gaps and corrects for the speed jitter
    def __identifyGapsCorrectJitter( self, tripData ):
        maxAcceleration = 5
        speedToTrigger = 10
        segments = []
        
        v_previous = 0
        p_previous = tripData[0]
        i = 1
        segmentStartingIndex = 0
        
        while i < len(tripData) :
            p_current = tripData[i]
            v_current = numpy.linalg.norm( p_current - p_previous )
            
            # Check for abrupt high speed and high acceleration in combination with no low speed
            if ( numpy.abs( v_current - v_previous ) > maxAcceleration ) & ( v_current > speedToTrigger ):
                # Signal the end of the previous segment
                nDiff = i - segmentStartingIndex
                if nDiff > 1:
                    segments.append( tripData[ segmentStartingIndex : i ] )
                else:
                    # Correct trip duration and length for the skipped mini-segment
                    if ( i < len(tripData) - 1 ):
                        p_next = tripData[i]
                        v_next = numpy.linalg.norm( p_next - p_current )
                        if numpy.abs( v_current - v_next ) < maxAcceleration:
                            self.__extraTravelDuration += 1
                            self.__extraTravelLength += numpy.lingalg.norm( p_next - p_previous )
                
                # Check whether this is a jitter or a gap and adjust the starting index accordingly, as well as the correction to the travel length and time.
                if i < len(tripData) -1 :
                    p_next = tripData[i+1]
                    v_next = numpy.linalg.norm( p_next - p_current )
                    if numpy.abs( v_current - v_next ) > maxAcceleration:
                        averageSpeedInGap = 0.5 * ( v_next + v_previous)
                        distanceSpentInGap = numpy.linalg.norm( p_current - p_previous )
                        self.__extraTravelLength += distanceSpentInGap
                        self.__extraTravelDuration += int( numpy.around( distanceSpentInGap / averageSpeedInGap ) )
                        segmentStartingIndex = i
                        i += 1
                        p_current = p_next
                        v_current = v_next
                    else: # This is a jitter. Check whether we have a sequence of spikes and skip them.
                        j = i + 2
                        endOfSpikesFound = False
                        while j < len(tripData) -1 :
                            p_nnext = tripData[j]
                            v_nnext = numpy.linalg.norm( p_next - p_nnext )
                            p_nnnext = tripData[j+1]
                            v_nnnext = numpy.linalg.norm( p_nnnext - p_nnext )
                            if numpy.abs( v_nnnext - v_nnext ) < maxAcceleration :
                                averageSpeedInGap = 0.5 * ( v_nnext + v_previous )
                                distanceSpentInGap = numpy.linalg.norm( p_next - p_previous )
                                self.__extraTravelLength += distanceSpentInGap
                                self.__extraTravelDuration += int( numpy.around( distanceSpentInGap / averageSpeedInGap ) )
                                i = j
                                segmentStartingIndex = i - 1
                                p_current = p_next
                                v_current = averageSpeedInGap
                                endOfSpikesFound = True
                                break
                            j += 1
                            p_next = p_nnext

                        if endOfSpikesFound == False :
                            i = j
                            segmentStartingIndex = i
            
            p_previous = p_current
            v_previous = v_current
            i += 1
        
        if len(tripData) - segmentStartingIndex > 2 :
            segments.append( tripData[ segmentStartingIndex : len(tripData) ] )
        
        return segments

    
    def __removeAccuteAngleSegments( self, tripData ):
        maxAngle = 100 * numpy.pi / 180
        segments = []
        segmentStartingIndex = 0
        v_previous = tripData[1] - tripData[0]
        i = 2
        while i < len(tripData) :
            v_current = tripData[i] - tripData[i-1]
            angle = angleOfVectors( v_current, v_previous )
            if numpy.abs(angle) > maxAngle:
                if i - segmentStartingIndex > 2:
                    segments.append( tripData[ segmentStartingIndex: i-1 ] )
                self.__extraTravelDuration += 2
                self.__extraTravelLength += numpy.linalg.norm( tripData[i-2]-tripData[i] )
                segmentStartingIndex = i
                i += 2
                if i < len(tripData) + 1:
                    v_previous = tripData[i-1] - tripData[i-2]
                continue
            v_previous = v_current
            i += 1
            
        if len(tripData) - segmentStartingIndex > 2:
            segments.append( tripData[ segmentStartingIndex : len(tripData) ] )
    
        return segments
    

    

#######################################
# Driver class
#######################################

class Driver:
    def __init__( self, driverId ):
        self.__id = driverId
        self.__trips = {}
        self.__zeroSegmentTrips = 0
        return

    def __del__( self ):
        self.__clearTrips()
        return

    def id( self ):
        return self.__id

    def __clearTrips( self ):
        for i in self.__trips:
            o = self.__trips[i]
            self.__trips[i] = None
            del o
        self.__trips = {}
        self.__zeroSegmentTrips = 0

    def readTripsFromDirectory( self, directoryName ):
        self.__clearTrips()
        tripFile = os.path.join( directoryName, str(self.__id) + ".data" )
        file = open(tripFile, 'rb')
        while file.read(1):
            file.seek(-1,1)
            (tripId, tripData) = pickle.load( file )
            trip = Trip(tripId)
            trip.setData( tripData )
            if len(trip.segments()) == 0:
                self.__zeroSegmentTrips += 1
            self.__trips[tripId] = trip
        file.close()
        return

    def numberOfTrips( self ):
        return len(self.__trips)

    def getTrip( self, tripId ):
        return self.__trips[ tripId ]

    # Returns the number of zero segment trips
    def zeroSegmentTrips( self ):
        return self.__zeroSegmentTrips

    # Returns the total number of trips
    def numberOfTrips( self ):
        return len( self.__trips )

    # Collects the speed statistics
    def speedStatistics( self ):
        quantiles = numpy.zeros([(self.numberOfTrips() - self.zeroSegmentTrips() ), 5] )
        j = 0
        for i in self.__trips:
            trip = self.__trips[i]
            if trip.numberOfSegments() == 0:
                continue
            else:
                quantiles[j] = trip.speedStatistics()
                j += 1        
        return numpy.array([( numpy.mean( quantiles[:,k]), numpy.std( quantiles[:,k]) ) for k in range(5)]).reshape(10)

    # Collects the acceleration statistics
    def accelerationStatistics( self ):
        quantiles = numpy.zeros([(self.numberOfTrips() - self.zeroSegmentTrips() ), 5] )
        j = 0
        for i in self.__trips:
            trip = self.__trips[i]
            if trip.numberOfSegments() == 0:
                continue
            else:
                quantiles[j] = trip.accelerationStatistics()
                j += 1        
        return numpy.array([( numpy.mean( quantiles[:,k]), numpy.std( quantiles[:,k]) ) for k in range(5)]).reshape(10)

    # Collects the acceleration statistics
    def angularStatistics( self ):
        quantiles = numpy.zeros([(self.numberOfTrips() - self.zeroSegmentTrips() ), 5] )
        j = 0
        for i in self.__trips:
            trip = self.__trips[i]
            if trip.numberOfSegments() == 0:
                continue
            else:
                quantiles[j] = trip.angularStatistics()
                j += 1        
        return numpy.array([( numpy.mean( quantiles[:,k]), numpy.std( quantiles[:,k]) ) for k in range(5)]).reshape(10)

    
#######################################
# DriverData class
#######################################

class DriverData:
    def __init__( self, driversDirectory ):
        self.__dir = driversDirectory
        return

    # Loads all the data from the files for all drivers
    def loadAllData( self, numberOfThreads = 7 ):
        
        # Put the driver directories in a queue
        ctx = multiprocessing.get_context('fork')
        driversInQueue = ctx.Queue()
        driverFiles = os.listdir( self.__dir )
        numberOfDriversToProcess = 0
        for driverFile in driverFiles:
            driverId = int(int(driverFile.split('.')[0]))
            driver = Driver( driverId )
            driversInQueue.put( driver )
            numberOfDriversToProcess += 1

        # The thread reading function
        def readDataFunction( inputQueue, outputQueue, driverTopDir ):
            while True:
                driver = inputQueue.get()
                driver.readTripsFromDirectory( self.__dir )
                outputQueue.put( driver )
            return

        # Start the reading threads
        threads = []
        driversOutQueue = ctx.Queue()

        for i in range( numberOfThreads):
            thread = ctx.Process( target = readDataFunction, args = (driversInQueue, driversOutQueue, self.__dir ) )
            thread.start()
            threads.append( thread )

        drivers = []
        log = ProcessLogger( numberOfDriversToProcess )
        for i in range( numberOfDriversToProcess ):
            drivers.append( driversOutQueue.get() )
            log.taskEnded()

        for t in threads:
            t.terminate()
            
        return drivers


    # Loads the speed statistics data per driver
    def loadSpeedStatisticsData( self, numberOfThreads = 7 ):
        
        # Put the driver directories in a queue
        ctx = multiprocessing.get_context('fork')
        driversInQueue = ctx.Queue()
        driverFiles = os.listdir( self.__dir )
        numberOfDriversToProcess = 0
        for driverFile in driverFiles:
            driverId = int(int(driverFile.split('.')[0]))
            driver = Driver( driverId )
            driversInQueue.put( driver )
            numberOfDriversToProcess += 1
        
        # The thread reading function
        def readDataFunction( inputQueue, outputQueue, driverTopDir ):
            while True:
                driver = inputQueue.get()
                driver.readTripsFromDirectory( self.__dir )
                speedStatistics = driver.speedStatistics()
                accelerationStatistics = driver.accelerationStatistics()
                angularStatistics = driver.angularStatistics()
                outputQueue.put(numpy.hstack((driver.id(), driver.numberOfTrips(), driver.zeroSegmentTrips(), speedStatistics, accelerationStatistics, angularStatistics)))
            return

        # Start the reading threads
        threads = []
        driversOutQueue = ctx.Queue()

        for i in range( numberOfThreads):
            thread = ctx.Process( target = readDataFunction, args = (driversInQueue, driversOutQueue, self.__dir ) )
            thread.start()
            threads.append( thread )

        drivers = numpy.zeros([numberOfDriversToProcess, 33])
        log = ProcessLogger( numberOfDriversToProcess )
        for i in range( numberOfDriversToProcess ):
            drivers[i] = driversOutQueue.get()
            log.taskEnded()

        for t in threads:
            t.terminate()

        columns=('id','ntrips','nosegs',
                 'sm05','ss05','sm25','ss25','sm50','ss50','sm75','ss75','sm95','ss95',
                 'am05','as05','am25','as25','am50','as50','am75','as75','am95','as95',
                 'tm05','ts05','tm25','ts25','tm50','ts50','tm75','ts75','tm95','ts95')
        return pandas.DataFrame(drivers, columns=columns)


In [None]:
# Function for plotting the raw data of the segments
def plotSegmentData( segments ):
    
    tripData = numpy.array([]).reshape([0,2])
    tAngles = numpy.array([])
    tSpeed = numpy.array([])
    tAcceleration = numpy.array([])
    
    for segment in segments:
        speedVectors = numpy.diff( segment, axis = 0 )
        speedValues = 3.6 * numpy.apply_along_axis( lambda x: numpy.sqrt( x[0]**2+x[1]**2), 1, speedVectors )
        if len(speedValues) > 1:
            accelerationValues = numpy.diff( speedValues ) / 3.6
            angles = numpy.zeros( len(speedVectors) - 1 )
            for i in range(len(angles)):
                v1 = speedVectors[i]
                v2 = speedVectors[i+1]
                angles[i] = angleOfVectors(v1,v2) * 180 / numpy.pi
            tAngles = numpy.hstack( (tAngles, angles) )
            tAcceleration = numpy.hstack( (tAcceleration, accelerationValues) )

        tSpeed = numpy.hstack( (tSpeed, speedValues) )
        tripData = numpy.vstack( (tripData, segment) )

    # Draw the raw data
    plt.figure( figsize = (10,10), facecolor='lightgrey')
    ax = plt.subplot(321)
    ax.plot( tripData[0:,0], tripData[0:,1], 'r.', alpha = 0.1 )
    ax.set_title("path")
    ax.grid(True)
    ax = plt.subplot(322)
    ax.plot( numpy.arange(0,len(tripData)), tripData[0:,1], 'b.', alpha = 0.1 )
    ax.set_title("x coordinate")
    ax.grid(True)
    ax = plt.subplot(323)
    ax.plot( tripData[0:,0], numpy.arange(0,len(tripData)), 'b.', alpha = 0.1 )
    ax.set_title("y coordinate")
    ax.grid(True)
    ax = plt.subplot(324)
    ax.plot( numpy.arange(0,len(tAngles)), tAngles, 'r-' )
    ax.plot( numpy.arange(0,len(tAngles)), tAngles, 'r.' )
    ax.set_title("direction")
    ax.grid(True)
    ax = plt.subplot(325)
    ax.plot( numpy.arange(0,len(tSpeed)), tSpeed, 'g-' )
    ax.plot( numpy.arange(0,len(tSpeed)), tSpeed, 'g.', alpha=0.15 )
    ax.set_title("speed")
    ax.grid(True)
    ax = plt.subplot(326)
    ax.plot( numpy.arange(0,len(tAcceleration)), tAcceleration, 'b-' )
    ax.plot( numpy.arange(0,len(tAcceleration)), tAcceleration, 'b.', alpha=0.15 )
    ax.set_title("acceleration")
    ax.grid(True)
    return

In [None]:
driverTopDir = 'drivers_compressed_data'
driverId = 1
driverFileName = os.path.join( driverTopDir, str(driverId)+".data")
file = open(driverFileName, 'rb')
trips=[]
while file.read(1):
    file.seek(-1,1)
    trips.append( pickle.load(file) )
file.close()

In [None]:
tripD = trips[7]
trip = Trip( tripD[0] )
trip.setData( tripD[1] )
segments = trip.segments()
print(str(len(segments)) + " segments for trip " + str(tripD[0]) )
segmentData = []
for segment in segments:
    segmentData.append( segment.rawData() )

plotSegmentData( segmentData )


In [None]:
driverData = DriverData(driverTopDir)
drivers = driverData.loadAllData()

In [None]:
len(drivers)

In [None]:
trip = driver.getTrip(14)
segments = trip.segments()
print(str(len(segments)) + " segments for trip " + str(trip.id()) )
segmentData = []
for segment in segments:
    segmentData.append( segment.rawData() )
plotSegmentData( segmentData )