## 1b Trip Segmentation Notebook
Project: clustering-analysis-domain-agnostic-features-2018

Authors: Jordan Perr-Sauer, Caleb Phillips

License: BSD 3-Clause

Copyright (c) 2021 Alliance for Sustainable Energy LLC

## Description

This notebook uses a Spark context to add trip labels to the fleetdna dataset. The notbook must be run with a spark context. The easiet way to do this is to boot up Spark using start_spark_jupyter_notebook.sh

In [1]:
# Slide Map collection of functions for RDD-level processing
# Author: Jordan Perr-Sauer <jordan.perr-sauer@nrel.gov>


from itertools import islice, count, chain
from pyspark.sql import Row
from collections import deque


def nextIndexNotNone(lst, index):
    while index < len(lst) and lst[index][1] == None:
        index = index + 1
    return index


def grabNextN(lst, index, n):
    if index >= len(lst) or n == 0:
        return []
    ans = lst[index][:n]
    n = n - len(ans)
    ans.extend(grabNextN(lst, index+1, n))
    return ans


def firstNPartitionMapper(i, it, n):
    return [ list(islice(it, 0, n)) ]


def slidingMapper(i, it, headsListBcast, n, mapFun):
    headsList = headsListBcast.value
    head = grabNextN(headsList, i+1, n-1)
    iteratorWithOverlap = chain(it, head)
    slider = deque([])
    for el in iteratorWithOverlap:
        slider.append(el)
        if len(slider) == n:
            if mapFun:
                yield mapFun(list(slider))
            else:
                yield list(slider)
            old = slider.popleft()


def sliding(rdd, n, mapFun = None):
    # Speciffic Routine to calculate delta t
    headsRDD = rdd.mapPartitionsWithIndex(lambda i, it: firstNPartitionMapper(i, it, n))
    headsList = headsRDD.collect()
    headsListBcast = sc.broadcast(headsList)
    groupedRDD = rdd.mapPartitionsWithIndex(lambda i, it: slidingMapper(i, it, headsListBcast, n, mapFun))
    return groupedRDD


def leadMap(s, colMap, partitionColumn = None):
    row = s[0].asDict()
    if partitionColumn == None or row[partitionColumn] == s[1][partitionColumn]:
        for key in colMap:
            row[colMap[key]] = s[1][key]
    else:
        for key in colMap:
            row[colMap[key]] = None
    return Row(**row)


def lead(df, colMap, partitionColumn = None):
    mapFun = lambda row: leadMap(row, colMap, partitionColumn)
    rdd = sliding(df.rdd, 2, mapFun)
    return rdd.toDF()


def lagMap(s, colMap, partitionColumn = None):
    row = s[1].asDict()
    if partitionColumn == None or row[partitionColumn] == s[1][partitionColumn]:
        for key in colMap:
            row[colMap[key]] = s[0][key]
    else:
        for key in colMap:
            row[colMap[key]] = None
    return Row(**row)


def lag(df, colMap, partitionColumn = None):
    mapFun = lambda row: lagMap(row, colMap, partitionColumn)
    firstRow = df.rdd.take(1)[0].asDict()
    for key in colMap:
        firstRow[colMap[key]] = None
    firstRowRDD = sc.parallelize([Row(**firstRow)])
    rdd = sliding(df.rdd, 2, mapFun)
    return (firstRowRDD.union(rdd)).toDF()

## Routines to fill column with last known good value across all partitions
# Inspired by https://stackoverflow.com/a/33622083/832770


def getLastNonNull(i, iter, col):
    lastGoodValue = None
    for i in iter:
        row = i.asDict()
        if row[col] != None:
            lastGoodValue = row[col]
    return [ lastGoodValue ]


def fill(i, iter, col, carry):
    lastGoodValue = None
    for j in reversed(range(i+1)):
        lastGoodValue = carry.value[j]
        if lastGoodValue != None: break
    for i in iter:
        row = i.asDict()
        if row[col] != None:
            lastGoodValue = row[col]
            yield Row(**row)
        else:
            row[col] = lastGoodValue
            yield Row(**row)


def filldown(df, col):
    rdd = df.rdd
    lastGoodMapper = lambda i, it: getLastNonNull(i, it, col)
    lastGood = sc.broadcast(rdd.mapPartitionsWithIndex(lastGoodMapper).collect())
    #print lastGood.value
    fillMapper = lambda i, it: fill(i, it, col, lastGood)
    newrdd = rdd.mapPartitionsWithIndex(fillMapper)
    return newrdd.toDF()


In [3]:
from pyspark.sql import functions as func
from pyspark.sql.functions import col

INPUT = "./data/fullRes";
OUTPUT = "./data/FleetDNAETL_CoDA_epaprime";

df1 = sqlContext.read.parquet(INPUT)

### Extract to Unix Timestamp
fdna_to_ts = func.unix_timestamp(
    func.concat(
        func.substring(col('ts'), 1, 19), #Take care of timestamps with miliseconds
        func.lit(" UTC")
    ),
    "yyyy-MM-dd'Z'HH:mm:ss z"
)
df2 = df1.withColumn('ts', fdna_to_ts).cache()

### Create Lagged Columns
columns = ['ts', 'speed', 'vdir']
df3 = df2.orderBy(col('vdir').asc(), col('ts').asc()).cache()
df4 = lag(df3, {c: "lag_" + c for c in columns}, "vdir").cache()

### Calculate Trips
timeFilter = col("lag_ts") < col("ts") - 300
vidFilter = col("lag_vdir") != col("vdir")
noPrevVid = func.isnull(col("lag_vdir"))
isTripStart = timeFilter | vidFilter | noPrevVid
df5 = df4.withColumn("trip", func.when(isTripStart, col("ts")).otherwise(None)).cache()
df6 = df5.orderBy(col('vdir').asc(), col('ts').asc()).cache()
df7 = filldown(df6, "trip")

### There are vehicles in fullRes which are not in the original study. Filter out these vehicles so the datasets are comparable.
diff_to_old_study = '7,13,42,58,70,74,83,87,115,124,154,187,216,230,268,274,286,309,437,438,489,498,545,546,547,569,4972,4973,4974,4975,4976,4977,4978,4979,4980,4981,4982,4983,4984,4985,4986,4987,4988,4989,4990,4991,4992,4993,4994,4995,4996,4997,4998,4999,5000,5001,5002,5003,5004,5005,5006,5007,5008,5009,5010,5011,5012,5013,5014,5015,5016,5017,5018,5019,5020,5021,5022,5023,5024,5025,5026,5027,5028,5029,5030,5031,5032,5033,5034,5035,5036,5037,5038,5039,5040,5041,5042,5043,5044,5045,5046,5047,5048,5049,5050,5051,5052,5053,5054,5055,5056,5057,5058,5059,5060,5061,5062,5063,5064,5065,5066,5067,5068,5069,5070,5071,5072,5073,5074,5075,5076,5077,5078,5079,5080,5081,5082,5083,5084,5085,5086,5087,5088,5089,5090,5091,5092,5093,5094,5095,5096,5097,5098,5099,5100,5101,5102,5103,5104,5105,5106,5107,5108,5109,5110,5111,5112,5113,5114,5115,5116,5117,5118,5119,5120,5121,5122,5123,5124,5125,5126,5127,5128,5129,5130,5131,5132,5133,5134,5135,5136,5137,5138,5139,5140,5141,5142,5143,5144,5145,5146,5147,5148,5149,5150,5151,5152,5153,5154,5155,5156,5157,5158,5159,5160,5161,5162,5163,5164,5165,5166,5167,5168,5169,5170,5171,5172,5173,5174,5175,5176,5177,5178,5179,5180,5181,5182,5183,5184,5185,5186,5187,5188,5189,5190,5191,5192,5193,5194,5195,5196,5197,5198,5199,5200,5201,5202,5203,5204,5205,5206,5207,5208,5209,5210,5211,5212,5213,5214,5215,5216,5217,5218,5219,5220,5221,5222,5223,5224,5225,5226,5227,5228,5229,5230,5231,5232,5233,5234,5235,5236,5237,5238,5239,5240,5241,5242,5243,5244,5245,5246,5247,5248,5249,5250,5251,5252,5253,5254,5255,5256,5257,5258,5259,5260,5261,5262,5263,5264,5265,5266,5267,5268,5269,5270,5271,5272,5273,5274,5275,5276,5277,5278,5279,5280,5281,5282,5283,5284,5285,5286,5287,5288,5289,5290,5291,5292,5293,5294,5295,5296,5297,5298,5299,5300,5301,5302,5303,5304,5305,5306,5307,5308,5309,5310,5311,5312,5313,5314,5315,5316,5317,5318,5319,5320,5321,5322,5323,5324,5325,5326,5327,5328,5329,5330,5331,5332,5333,5334,5335,5336,5337,5338,5339,5340,5341,5342,5343,5344,5345,5346,5347,5348,5349,5350,5351,5352,5353,5354,5355,5356,5357,5358,5359,5360,5361,5362,5363,5364,5365,5366,5367,5368,5369,5370,5371,5372,5373,5374,5375,5376,5377,5378,5379,9867,9868,9879,9911,9912,9913,9914,9915,9916,9917,9918,9919,9920,9921,9922,9923,9924,9925,9926,9927,9928,9929,9930,9931,9932,9933,9934,9935,9936,9937,9938,9939,9940,9941,9942,9943,9944,9945,9946,9947,9948,9949,9950,9951,9952,9953,9954,9955,9956,9957,9958,9959,9960,9961,9962,9963,9964,9965,9966,9967,9968,9969,9970,9971,9972,9973,9974,9975,9976,9977,9978,9979,9980,9981,9982,9983,9984,9985,9986,9987,9988,9989,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999,10000,10001,10002,10003,10004,10005,10006,10007,10008,10009,10010,10011,10012,10013,10014,10015,10016,10017,10018,10019,10020,10021,10022,10023,10024,10025,10026,10027,10028,10029,10030,10031,10032,10033,10034,10035,10036,10037,10038,10039,10040,10041,10042,10043,10044,10045,10046,10047,10048,10049,10050,10051,10052,10053,10054,10055,10056,10057,10058,10059,10060,10061,10062,10063,10064,10065,10066,10067,10068,10069,10070,10071,10072,10073,10074,10075,10076,10077,10078,10079,10080,10081,10082,10083,10084,10085,10086,10087,10088,10089,10090,10091,10092,10093,10094,10095,10096,10097,10098,10099,10100,10101,10102,10103,10104,10105,10106,10107,10108,10109,10110,10111,10112,10113,10114,10115,10116,10117,10118,10119,10120,10121,10122,10890,10891,10897,10903,10905,10907,10908,10910,10917,10920,10923,10925,10930,10931,10932,10934,10935,10936,10939,10940,10941,10942,10943,10944,10945,10948,10949,10950,10953,10954,10956,10957,10962,10963,10968,10970,10974,10975,10976,10977,10979,10980,10984,10985,10994,10995,10996,10997,10998,10999,11000,11002,11003,11005,11006,11593,11594,11616,11639,11640,11641,11642,11643,11644,11645,11646,11647,11648,11805,11813,12108,12121,12122,12129,12130,12131,12132,12133,12134,12135,12151,12152,12153,12154,12155,12156,12157,12158,12159,12160,12161,12162,12164,12165,12166,12167,12168,12169,12170,12171,12172,12173,12174,12175,12176,12177,12178,12179,12180,12181,12182,12183,12184,12185,12186,12187,12188,12189,12190,12191,12192,12194,12195,12196,12197,12198,12199,12200,12201,12202,12203,12204,12205,12206,12207,12208,12209,12210,12211,12212,12213,12214,12215,12216,12217,12218,12219,12220,12222,12223,12224,12225,12226,12227,12228,12229,12210000'
df8 = df7.filter(~col("vdir").isin(["v_{}".format(i) for i in diff_to_old_study.split(',')]))

### Save as parquet file for archive and analysis
df8.write.format("parquet").mode("overwrite").save(OUTPUT)


In [None]:
df8.count() # -> 746507968