In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql import functions as F
from pyspark.mllib.stat import Statistics
from datetime import datetime, timedelta
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from math import floor
import time

from GPSProcessing import *
from AccProcessing import *

In [3]:
# REFERENCE: https://spark.apache.org/docs/latest/configuration.html

conf = SparkConf().setAll([('spark.memory.fraction','0.6'),
                           ('spark.executor.memory', '16g'),
                           ('spark.driver.memory','16g'),
                           ('spark.sql.shuffle.partitions','20'),
                           ('spark.memory.offHeap.enabled', True),
                           ('spark.memory.offHeap.size','16g'),
                           ('spark.cleaner.referenceTracking.cleanCheckpoints', True)]#,
                           #('spark.driver.cores', '4'),
                           #('spark.executor.cores', '4'),
                           #('spark.worker.cleanup.enabled','true'),
                           #('spark.sql.session.timeZone', 'UTC')])
                         )

spark  = SparkSession.builder.config(conf=conf).master("local[*]").appName("GPS+ACC").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sc.setCheckpointDir('checkpoints')
sc.getConf().getAll()  # or sc._conf.getAll()

[('spark.sql.shuffle.partitions', '20'),
 ('spark.driver.port', '54863'),
 ('spark.cleaner.referenceTracking.cleanCheckpoints', 'True'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '16g'),
 ('spark.driver.host', '10.0.1.4'),
 ('spark.app.name', 'GPS+ACC'),
 ('spark.executor.memory', '16g'),
 ('spark.app.id', 'local-1557158181246'),
 ('spark.rdd.compress', 'True'),
 ('spark.memory.fraction', '0.6'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.memory.offHeap.enabled', 'True'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.memory.offHeap.size', '16g')]

In [4]:
sc.defaultParallelism

8

In [5]:
sc.defaultMinPartitions

2

In [6]:
sc

## PALMS output

In [None]:
palms_path = '/Users/molinaro/Documents/GITHUB/PALMS/Calgary/PALMS_output.csv'

In [None]:
palms_out = spark.read.csv(palms_path, header=True, inferSchema=True, )
palms_out.cache();

In [None]:
palms_out.rdd.getNumPartitions()

In [None]:
palms_out.filter(palms_out['identifier']=='A01').select('identifier','lat','lon','dateTime').show(20,False)

In [None]:
palms_out.filter(palms_out['identifier']=='A01').count()

In [None]:
palms_out.filter(palms_out['identifier']=='A01').printSchema()

In [None]:
date_format = '%Y-%m-%d'
time_format = '%H:%M:%S'
datetime_format = date_format + ' ' + time_format
startdate = datetime.strptime('2016-08-16 18:23:25', datetime_format) 

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.activity==-2))\
.select('identifier','lat','lon','dateTime','activity','activityIntensity','activityBoutNumber').show(29000)

In [None]:
df3.filter((F.col('activityIntensity')==-2) & (F.col('timestamp')>=startdate)).orderBy('timestamp').show(29000)

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.activityBoutNumber==7))\
.select('lat','lon','dateTime','activity','activityIntensity',
        'activityBoutNumber','sedentaryBoutNumber').show(2000)

In [None]:
df3.filter((F.col('activityBoutNumber')==7) & (F.col('timestamp')>=startdate)).orderBy('timestamp').show(2000)

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.sedentaryBoutNumber == 14))\
.select('identifier','lat','lon','dateTime','activity',
        'activityBoutNumber','sedentaryBoutNumber').show(20000)

In [None]:
df3.filter((F.col('sedentaryBoutNumber')==14) & (F.col('timestamp')>=startdate)).show(29000)

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.fixTypeCode==3))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').show(2000)

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.fixTypeCode==4))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
gps_data.filter(F.col('fixTypeCode')==3).orderBy('timestamp').show(2000)

In [None]:
gps_data.filter(F.col('fixTypeCode')==4).orderBy('timestamp').count()

In [None]:
palms_out.filter((palms_out.identifier=='A01') )\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').show(20000,False)

In [None]:
palms_out.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').show(20000,False)

In [None]:
gps_data.show(20000,False)

In [None]:
palms_out.filter((palms_out.identifier=='A01') & (palms_out.tripType==1))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').show(2000)

In [None]:
df.drop(*['height','speed','heading','dow','lat','lon'])\
.filter((F.col('tripType')==4)).show(2000)

In [None]:
df.filter(F.col('tripType')==1).show(2000)

In [None]:
spark.catalog.clearCache()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==5))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==3))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==2))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==1))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==4))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==6))\
.select('lat','lon','dateTime','fixTypeCode','tripType','activity',
        'activityIntensity','activityBoutNumber').count()

In [None]:
palms_out.filter((palms_out.identifier=='A01')).count()

## GPS data processing

In [43]:
gps_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/Calgary/gps/A01r.csv'

In [None]:
gps_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/PARC_data/NYC/GPS/2001.csv'

rdd = sc.textFile(gps_path_raw, 4)
gps_data_raw = spark.read.csv(rdd)
gps_data_raw.take(10)

In [None]:
#gps_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/Barcelona_youth/gps/G01FWS6.csv'

In [44]:
gps_data_raw = spark.read.csv(gps_path_raw, header=True, inferSchema=True)

In [45]:
date_format = 'yyyy/MM/dd'
time_format = 'HH:mm:ss'
datetime_format = date_format + ' ' + time_format

gps_data = gen_gps_dataframe(gps_data_raw, datetime_format)
gps_data.cache()

DataFrame[timestamp: timestamp, dow: string, lat: double, lon: double, distance: double, height: double, speed: double]

In [46]:
gps_data.show(20,False)

+-------------------+---+---------+-----------+--------+------+-----+
|timestamp          |dow|lat      |lon        |distance|height|speed|
+-------------------+---+---------+-----------+--------+------+-----+
|2016-08-16 18:23:29|2  |51.01609 |-114.099305|0.0     |0.0   |0.121|
|2016-08-16 18:23:34|2  |51.016075|-114.09925 |0.0     |1084.0|2.127|
|2016-08-16 18:23:39|2  |51.016078|-114.099263|0.0     |1076.0|0.312|
|2016-08-16 18:23:44|2  |51.016072|-114.09925 |0.0     |1077.0|0.544|
|2016-08-16 18:23:49|2  |51.016083|-114.099242|0.0     |1077.0|0.061|
|2016-08-16 18:23:54|2  |51.016085|-114.09924 |0.0     |1078.0|0.538|
|2016-08-16 18:23:59|2  |51.016088|-114.099247|0.0     |1077.0|0.515|
|2016-08-16 18:24:04|2  |51.016088|-114.099247|0.0     |1077.0|0.174|
|2016-08-16 18:24:09|2  |51.016092|-114.099217|0.0     |1076.0|4.102|
|2016-08-16 18:24:14|2  |51.016152|-114.099205|0.0     |1077.0|5.964|
|2016-08-16 18:24:19|2  |51.016205|-114.099188|0.0     |1077.0|4.868|
|2016-08-16 18:24:24

In [None]:
date_format = '%Y-%m-%d'
time_format = '%H:%M:%S'
datetime_format = date_format + ' ' + time_format
startdate = datetime.strptime('2016-06-29 13:00:00', datetime_format) 
gps_data.filter((F.col('timestamp')>=startdate)).show(20000,False)

In [None]:
gps_data.filter((F.col('timestamp')>=startdate)).show(20000,False)

In [None]:
gps_data.filter(F.col('fixTypeCode') == 1).count()

In [None]:
gps_data.filter(F.col('fixTypeCode') == 2).count()

In [None]:
gps_data.filter(F.col('fixTypeCode') == 3).count()

In [None]:
gps_data.filter(F.col('fixTypeCode') == 4).count()

In [None]:
gps_data.filter(F.col('fixTypeCode') == 6).count()

In [None]:
gps_data.filter(F.col('fixTypeCode') == -1).count()

In [None]:
list1 = gps_data.filter(F.col('fixTypeCode') == 6).select('timestamp').collect()

In [None]:
list2 = palms_out.filter((palms_out.identifier=='A02') & (palms_out.fixTypeCode==6))\
.select('dateTime').collect()

In [None]:
def Diff(li1, li2): 
    li_dif = [i for i in li1 + li2 if i not in li2] #or i not in li2] 
    return li_dif 
Diff(list1,list2)

In [None]:
gps_data.printSchema()

In [None]:
gps_data.count()

In [47]:
# Round seconds in timestamps according to the interval

interval = 5 # seconds
ts_name = 'timestamp'
ws = 600 # seconds

print("====> align timestamps...")
start_time = time.time()
gps_data = round_timestamp(gps_data, ts_name, interval).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> align timestamps...
      time elapsed: 00:00:04


In [48]:
# Set fix type

ts_name = 'timestamp'
ws=600
print("====> set fix type...")
start_time = time.time()
gps_data = set_fix_type(gps_data, ts_name, ws).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> set fix type...
      time elapsed: 00:00:12


In [70]:
def set_distance_and_speed(df, dcol, scol, tscol):
    """
        Calculate distance and speed at each epoch
       
    """
    
    # Calculate distance between two fixes
    app_fun = F.udf(lambda a,b,c,d: calc_distance(a,b,c,d))
    
    # Define a window over timestamps
    w = Window.orderBy(tscol)
    #df = df.drop_duplicates()
    
    minp = df.select(F.min(tscol).cast('long')).first()[0]
    
    df2 = df.withColumn('total_sec', F.col(tscol).cast('long'))
    
    # Define duration of current fix
    df2 = df2.withColumn('duration', F.col(tscol).cast(IntegerType())-
                                     F.lag(F.col(tscol).cast(IntegerType()),1,minp)
                                      .over(w)
                        )
    
    cond = (F.col('fixTypeCode') == 1)
    
    first_lat = df2.select('lat').first()[0]
    first_lon = df2.select('lon').first()[0]
    last_lat = df2.select(F.last('lat')).first()[0]
    last_lon = df2.select(F.last('lon')).first()[0]
    
    lat0 = F.col('lat').cast(DoubleType())
    lon0 = F.col('lon').cast(DoubleType())
    lat1 = F.lead(F.col('lat'),1,last_lat).over(w)
    lon1 = F.lead(F.col('lon'),1,last_lon).over(w)
    lat2 = F.lag(F.col('lat'),1,first_lat).over(w)
    lon2 = F.lag(F.col('lon'),1,first_lon).over(w)
    
    # Calculate the distance traveled from last fix
    df2 = df2.withColumn(dcol, F.when((F.col('lat') != first_lat) & (F.col('lat') != last_lat),
                                      app_fun(lat0,lon0,lat2,lon2)
                                     ).otherwise(0.0)
                        ).orderBy(tscol)
    
    # Calculate velocity
    df2 = df2.withColumn(scol, F.when(#(F.col(dcol) != 0.0) &
                                      cond,
                                      3.6*F.col(dcol)/F.col('duration')
                                     ).otherwise(0.0)
                        )
    # filter points where the velocity is not defined (points with the same timestamp)
    df2 = df2.filter(F.col(scol).isNotNull())
    
    df2 = df2.drop(*['duration','total_sec'])
    
    return df2

In [58]:
date_format = '%Y-%m-%d'
time_format = '%H:%M:%S'
datetime_format = date_format + ' ' + time_format
startdate = datetime.strptime('2016-08-23 13:00:00', datetime_format) 
gps_data.filter((F.col('timestamp')>=startdate)).show(20000,False)

+-------------------+---+---------+-----------+--------+------+------+-----------+
|timestamp          |dow|lat      |lon        |distance|height|speed |fixTypeCode|
+-------------------+---+---------+-----------+--------+------+------+-----------+
|2016-08-23 13:00:00|2  |51.04613 |-114.076952|0.0     |1052.0|1.433 |1          |
|2016-08-23 13:00:05|2  |51.046107|-114.076978|0.0     |1051.0|1.009 |1          |
|2016-08-23 13:00:10|2  |51.046085|-114.07701 |0.0     |1049.0|0.493 |1          |
|2016-08-23 13:00:15|2  |51.046083|-114.076998|0.0     |1048.0|0.492 |1          |
|2016-08-23 13:00:20|2  |51.046085|-114.076993|0.0     |1047.0|0.145 |1          |
|2016-08-23 13:00:25|2  |51.046087|-114.076992|0.0     |1046.0|0.645 |1          |
|2016-08-23 13:00:30|2  |51.04609 |-114.076978|0.0     |1045.0|0.906 |1          |
|2016-08-23 13:00:35|2  |51.046093|-114.076968|0.0     |1044.0|0.172 |1          |
|2016-08-23 13:00:40|2  |51.04609 |-114.076978|0.0     |1044.0|0.933 |1          |
|201

In [73]:
set_distance_and_speed(gps_data, 'distance', 'speed', 'timestamp').filter(F.col('fixTypeCode') == 3).show(20000,False)
set_distance_and_speed(gps_data, 'distance', 'speed', 'timestamp').filter(F.col('speed').isNull()).show(20000,False)
set_distance_and_speed(gps_data, 'distance', 'speed', 'timestamp').filter(F.col('speed')>130).show(20000,False)
gps_data.filter(F.col('speed')>130).show(20000,False)






+-------------------+---+---------+-----------+------------------+------+-----+-----------+
|timestamp          |dow|lat      |lon        |distance          |height|speed|fixTypeCode|
+-------------------+---+---------+-----------+------------------+------+-----+-----------+
|2016-08-17 13:17:00|3  |51.047448|-114.078842|27.485353313596434|975.0 |0.0  |3          |
|2016-08-18 13:03:35|4  |51.05041 |-114.084027|44.52968151596462 |1104.0|0.0  |3          |
|2016-08-19 16:02:25|5  |51.045728|-114.077383|4.783157412793543 |1008.0|0.0  |3          |
|2016-08-21 19:28:05|7  |51.01619 |-114.099638|0.0               |1070.0|0.0  |3          |
|2016-08-23 08:01:20|2  |51.045433|-114.077097|3.383119221354725 |1052.0|0.0  |3          |
|2016-08-23 15:07:00|2  |51.045995|-114.078078|2.5568322839888666|1071.0|0.0  |3          |
+-------------------+---+---------+-----------+------------------+------+-----+-----------+

+---------+---+---+---+--------+------+-----+-----------+
|timestamp|dow|lat|lo

In [31]:
# Apply filter on the velocity

vmax = 130 # km/h

print("====> apply velocity filter...")
start_time = time.time()
gps_data = filter_speed(gps_data, 'speed', vmax).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> apply velocity filter...
      time elapsed: 00:00:00


In [32]:
# Apply filter over max acceleration
                                             
scol = 'speed'
tscol = 'timestamp'

print("====> apply accelaration filter...")
start_time = time.time()
gps_data = filter_acceleration(gps_data, scol, tscol).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> apply accelaration filter...
      time elapsed: 00:00:01


In [33]:
# Apply filter on the height variation

dhmax=1000
print("====> apply height variation filter...")
start_time = time.time()
gps_data = filter_height(gps_data, 'height', 'timestamp', dhmax).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> apply height variation filter...
      time elapsed: 00:00:03


In [34]:
gps_data.count()

115122

In [35]:
# Apply filter over three fixes (it also recalculates distance column)

dcol = 'distance'
tscol = 'timestamp'
dmin = 10

print("====> apply three fixes filter...")
start_time = time.time()
gps_data = filter_change_dist_3_fixes(gps_data, dcol, tscol, dmin).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> apply three fixes filter...
      time elapsed: 00:00:04


In [36]:
gps_data.count()

115045

In [None]:
# Generate missing values up to maximum signal loss

print("====> fill in missing value...")
start_time = time.time()
gps_data = fill_timestamp(gps_data, 'timestamp', 'fixTypeCode', interval, ws).cache()
gps_data.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

In [None]:
gps_data.show(20,False)

In [None]:
gps_data.printSchema()

In [None]:
# Filter timestamps over given interval
INTERVAL = 5
ts_name = 'timestamp'
print("====> filter GPS data every {} seconds...".format(str(INTERVAL)))
start_time = time.time()
gps_data = select_gps_intervals(gps_data, ts_name, INTERVAL)
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

In [37]:
gps_data = gps_data.limit(100000)

In [18]:
gps_data = gps_data.limit(1000)

In [None]:
gps_data.printSchema()

In [38]:
##%%time

vmax = 130 # km/h
max_dist_per_min = vmax * 1000/60 # meters
min_dist_per_min = 25 # meters
min_pause_duration = 120 # second
max_pause_time = 180 # seconds

ts_name = 'timestamp'
dist_name = 'distance'
speed_name = 'speed'
fix_type_name = 'fixTypeCode'


###TEST#PARAMETERS###
#min_dist_per_min = 5 # meters
#min_pause_duration = 12 # second
#max_pause_time = 36 # seconds
####################

print("====> detect trips...")
start_time = time.time()
gps_data2 = detect_trips(gps_data, ts_name, dist_name, speed_name, fix_type_name, min_dist_per_min, 
                 min_pause_duration, max_pause_time, vmax).cache()
gps_data2.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

#gps_data2.show()

====> detect trips...
      time elapsed: 03:07:40


In [None]:
vmax = 130 # km/h
max_dist_per_min = vmax * 1000/60 # meters
min_dist_per_min = 25 # meters
min_pause_duration = 120 # second
max_pause_time = 180 # seconds

ts_name = 'timestamp'
dist_name = 'distance'
speed_name = 'speed'
fix_type_name = 'fixTypeCode'

detect_trips(gps_data, ts_name, dist_name, speed_name, fix_type_name, min_dist_per_min, 
                 min_pause_duration, max_pause_time, vmax).printSchema()

In [None]:
gps_data2.printSchema()

In [None]:
gps_data.drop(*['lat','lon','dow','distance',
         'cum_pause','total_sec','height','speed','heading']).show(40000, False)
#.filter(F.col('tripType')==4)
gps_data.persist()

In [None]:
#df.write.csv('partial.csv')
#gps_data.coalesce(1).write.format('com.databricks.spark.csv').save('partial.csv',header = 'true')
gps_data.coalesce(1).write.option("header",True).option("inferSchema","true").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv("mydata.csv")

In [None]:
gps_data.printSchema()

In [None]:
gps_data_path = '/Users/molinaro/Documents/GITHUB/PALMS/partial/partial.csv'
gps_data = spark.read.csv(gps_data_path, header=True, inferSchema=True)
#gps_data = gps_data.limit(40000)
print(gps_data.printSchema())

In [39]:
def classify_trips(df, ts_name, dist_name, speed_name, vehicle_speed_cutoff, bicycle_speed_cutoff, 
                   walk_speed_cutoff, min_trip_length, min_trip_duration, speed_segment_length, speed_percentile):
    
    """
    
    
    """
    w = Window.orderBy(ts_name).rowsBetween(0, Window.unboundedFollowing)
    w1 = Window.orderBy(ts_name).rowsBetween(Window.unboundedPreceding, 0)
    w2 = Window.partitionBy('segment').orderBy(ts_name)
    w3 = Window.partitionBy('tripMOT').orderBy(ts_name)
    
    udf_round = F.udf(lambda x: floor(x+0.5)) # floor(x+0.5) == Math.round(x) in JavaScript
    
    app_fun = F.udf(lambda x: trip_mode_type(x, vehicle_speed_cutoff, bicycle_speed_cutoff, walk_speed_cutoff))
    
    df2 = df.withColumn('tripMOT', F.lit(0)) 
    
    df2 = df2.withColumn('trip', F.when(F.col('tripType') == 1,
                                         1)
                        )
    
    df2 = df2.withColumn('trip', F.when(F.col('tripType') == 4,
                                         4).otherwise(F.col('trip'))
                        )
    
    df2 = df2.withColumn('trip', F.when(F.col('trip').isNull() &
                                        (F.col('tripType') == 2),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    
    # set trip start according to speed (recalculate the speed)
    df2 = df2.withColumn(speed_name, F.when((F.col(dist_name) != 0.0),
                                          3.6*F.col(dist_name)/F.col('duration')
                                         ).otherwise(0.0)
                        ).orderBy(ts_name)
  
    df2 = df2.withColumn('roundSpeed', F.when(F.col('trip').isNotNull(), 
                                              udf_round(F.col(speed_name)).cast(IntegerType())))
                                              
    """
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 2) &
                                         (F.lag('roundSpeed',1).over(Window.orderBy(ts_name)) == 0),
                                         4).otherwise(F.col('trip'))
                        ).orderBy(ts_name) ## use this block instead of the following if do not recalculate the speed
    """
    #######
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 2) &
                                         (F.col('roundSpeed') == 0),
                                         4).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    #######
    
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 4) &
                                        (F.lag('trip',1).over(Window.orderBy(ts_name)) == 4),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        )
    
    df2 = df2.withColumn('trip', F.when((F.col('tripType') == 3) &
                                         (F.lag('tripType',1).over(Window.orderBy(ts_name)) == 2),
                                        4).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 2) &
                                         (F.lag('trip',1).over(Window.orderBy(ts_name)) == 4),
                                         1).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    df2 = df2.withColumn('trip', F.when(F.col('trip').isNotNull() &
                                        (F.col('tripType') == 2) &
                                        (F.lag('tripType',1).over(Window.orderBy(ts_name)) == 3),
                                        1).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    df2 = df2.withColumn('trip', F.when(F.col('trip').isNotNull() &
                                        (F.col('tripType') == 4) &
                                        (F.lead('tripType',1).over(Window.orderBy(ts_name)) == 0),
                                        4).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    df2 = trip_segmentation(df2, ts_name, speed_segment_length).checkpoint()
    
    stop = (F.col('trip') == 1)
    
    ct = df2.filter(stop).count()
    s = ct
    s_ = -1
    
    while (s - s_ != 0):

        s_ = s
        
        ct_ = ct
        
        # identify segments within a trip
        df2 = trip_segmentation(df2, ts_name, speed_segment_length).checkpoint()
        
        ct = df2.filter(stop).count()
        
        s = ct+ct_
        
    df2 = trip_segmentation(df2, ts_name, speed_segment_length).checkpoint()
    
    # set trip mode
    #n_percentile = F.expr('percentile_approx(roundSpeed, {})'.format(str(speed_percentile*0.01)))
    n_percentile = F.expr('percentile(roundSpeed, {})'.format(str(speed_percentile*0.01)))
    
      
    df2b = df2.select(ts_name,'tripType','trip','segment','roundSpeed')
    df2b = df2b.withColumn('tmp', F.when((F.col('roundSpeed') == 0) &
                                         (F.col('trip') != 4) &
                                         (F.col('tripType') != 0),
                                         0)
                          )
    df2b = df2b.filter(F.col('tmp').isNull()).drop('tmp').orderBy(ts_name)
    
    df2b = df2b.withColumn('tmp', n_percentile.over(Window.partitionBy('segment'))).orderBy(ts_name)
    
     
    df2 = df2.join(df2b, [ts_name,'tripType','trip','roundSpeed','segment'], how='left').orderBy(ts_name)
    df2 = df2.withColumn('tmp', F.when(F.col('segment').isNotNull() &
                                       F.col('tmp').isNull(),
                                       F.first('tmp', ignorenulls=True)
                                        .over(Window.partitionBy('segment')
                                                    .rowsBetween(0, Window.unboundedFollowing)
                                             )
                                      ).otherwise(F.col('tmp'))
                         ).orderBy(ts_name)
   
    df2 = df2.withColumn('tripMOT', F.when(F.col('tmp').isNotNull(),
                                           app_fun(F.col('tmp'))
                                          ).otherwise(F.col('tripMOT'))
                        )
    
    df2 = df2.withColumn('tripMOT', F.when(F.col('tripType') == 3, None).otherwise(F.col('tripMOT')))
   
    df2 = df2.drop(*['segment','pause','pause_dist','tmp']).orderBy(ts_name)
    
    df3 = df2.select(ts_name,'lat','lon','duration','distance','cum_pause','tripType','trip','tripMOT')\
             .filter(F.col('tripType') != 0).orderBy(ts_name)
    df2 = df2.drop(*['trip','tripMOT'])
    df3 = df3.filter(F.col('tripMOT').isNotNull()).orderBy(ts_name)
    
    df3 = df3.withColumn('ch', F.when(F.col('tripType') == 1, F.monotonically_increasing_id())
                        ).orderBy(ts_name)
    df3 = df3.withColumn('ch', F.when(F.col('ch').isNull() &
                                       (F.col('tripType') != 0),
                                       F.last('ch', ignorenulls=True)
                                        .over(Window.orderBy(ts_name).rowsBetween(Window.unboundedPreceding, 0))
                                      ).otherwise(F.col('ch'))
                        ).orderBy(F.col('ch'))
    
    # merge adjacent segments with equal tripMOT
    w4 = Window.partitionBy('ch').orderBy(ts_name)
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 4) &
                                            (F.lead('trip',1).over(w4) == 1) &
                                            (F.col('tripMOT') == F.lead('tripMOT',1).over(w4)),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 1) &
                                            (F.lag('trip',1).over(w4) == F.lag('tripType',1).over(w4)) &
                                            (F.col('tripMOT') == F.lag('tripMOT',1).over(w4)),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 1) &
                                            (F.lag('tripType',1).over(w4) == 3),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when(F.col('trip').isNull() &
                                            (F.col('tripType') == 3),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    # trip segmentation
    df3 = trip_segmentation(df3, ts_name, speed_segment_length)
    
    # remove short trips
    df3 = df3.withColumn('cum_dist', F.sum(dist_name).over(w4.rowsBetween(Window.unboundedPreceding,0))
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('tmp', F.when((F.col('tripType') == 4) &
                                       ((F.col('cum_dist') < min_trip_length) |
                                        (F.col('pause') < min_trip_duration)),
                                       0)
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('tmp', F.when(F.col('tmp').isNull(),
                                       F.last('tmp', ignorenulls=True)
                                        .over(w4.rowsBetween(0, Window.unboundedFollowing))
                                      ).otherwise(F.col('tmp'))
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('tmp', F.when(F.col('tmp').isNull() &
                                       (F.col('trip') == 4) &
                                       ((F.col('cum_dist') < min_trip_length) |
                                        (F.col('pause') < min_trip_duration)),
                                       1).otherwise(F.col('tmp'))
                        ).orderBy(ts_name)
                        

    df3 = df3.withColumn('tmp', F.when(F.col('tmp').isNull(),
                                       F.last('tmp', ignorenulls=True)
                                        .over(w2.rowsBetween(0, Window.unboundedFollowing))
                                      ).otherwise(F.col('tmp'))
                        ).orderBy(ts_name)
    
    ## reset short isolated trips
    df3 = df3.withColumn('trip', F.when(F.col('tmp') == 0, 0).otherwise(F.col('trip')))
    df3 = df3.withColumn('tripMOT', F.when(F.col('tmp') == 0, 0).otherwise(F.col('tripMOT')))
    
    df3 = df3.withColumn('tmp2', F.when((F.col('tmp') == 1) &
                                        (F.col('tripType') == 1),
                                       2)
                        )
    df3 = df3.withColumn('tmp2', F.when(F.col('tmp2').isNull(),
                                        F.last('tmp2', ignorenulls=True)
                                         .over(w2.rowsBetween(Window.unboundedPreceding, 0))
                                       ).otherwise(F.col('tmp2'))
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when(F.col('tmp2') == 2, 0).otherwise(F.col('trip')))
    df3 = df3.withColumn('tripMOT', F.when(F.col('tmp2') == 2, 0).otherwise(F.col('tripMOT')))
    df3 = df3.withColumn('tmp', F.when(F.col('tmp2') == 2, None).otherwise(F.col('tmp')))
    df3 = df3.drop('tmp2')
    
    ## merge short trip segments
    df3 = df3.withColumn('tripMOT', F.when(F.col('tmp') == 1, None).otherwise(F.col('tripMOT')))
    
    df3 = df3.withColumn('tripMOT', F.when((F.col('tmp') == 1) &
                                           (F.col('trip') == 1),
                                           F.lag('tripMOT',1).over(Window.orderBy(ts_name))
                                          ).otherwise(F.col('tripMOT'))
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('tripMOT', F.when((F.col('tmp') == 1) &
                                           F.col('tripMOT').isNull(),
                                           F.last('tripMOT', ignorenulls=True)
                                            .over(w2.rowsBetween(Window.unboundedPreceding,0))
                                          ).otherwise(F.col('tripMOT'))
                        ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when((F.col('tmp') == 1) &
                                        (F.col('trip') == 1),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        )
    
    df3 = df3.withColumn('trip', F.when(F.col('tmp').isNull() & 
                                        (F.lead('tmp',1).over(Window.orderBy(ts_name)) == 1) &
                                        (F.col('trip') == 4),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        )
  
    ## merge adjacent segments
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 4) &
                                            (F.lead('trip',1).over(w4) == 1) &
                                            (F.col('tripMOT') == F.lead('tripMOT',1).over(w4)),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 1) &
                                            (F.lag('trip',1).over(w4) == F.lag('tripType',1).over(w4)) &
                                            (F.col('tripMOT') == F.lag('tripMOT',1).over(w4)),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when((F.col('trip') == 1) &
                                            (F.lag('tripType',1).over(w4) == 3),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    df3 = df3.withColumn('trip', F.when(F.col('trip').isNull() &
                                            (F.col('tripType') == 3),
                                            F.col('tripType')
                                           ).otherwise(F.col('trip'))
                            ).orderBy(ts_name)
    
    # trip segmentation
    df3 = trip_segmentation(df3, ts_name, speed_segment_length)
    df3 = df3.drop('ch').orderBy(ts_name).cache()
    
    df2 = df2.join(df3, [ts_name,'lat','lon','duration','distance','cum_pause',
                         'tripType'], how='left').orderBy(ts_name)
    df2 = df2.withColumn('tripMOT', F.when(F.col('tripType') == 3, 0).otherwise(F.col('tripMOT')))
    df2 = df2.withColumn('tripMOT', F.when(F.col('tripMOT').isNull(), 0).otherwise(F.col('tripMOT')))
    df2 = df2.withColumn('trip', F.when(F.col('trip').isNull(), F.col('tripType')).otherwise(F.col('trip')))
    
    df3.unpersist()  
    
    df2 = df2.withColumn('trip', F.when((F.col('tmp') == 1) &
                                        (F.col('tripMOT') == 0),
                                        0).otherwise(F.col('trip'))
                        )
    
    #df2.select(ts_name,'trip','tripMOT','tmp').show(10000,False)
                         
    df2 = df2.drop(*['tmp','cum_dist','roundSpeed','pause','pause_dist','segment'])
    
    # compute trip number
    df2 = df2.withColumn('tripNumber', F.when(F.col('trip') == 1, F.monotonically_increasing_id()))
    df2 = df2.withColumn('tripNumber', F.when(F.col('tripNumber').isNull() &
                                              F.col('trip').isNotNull(),
                                              F.last('tripNumber', ignorenulls=True).over(w1)
                                             ).otherwise(F.col('tripNumber'))
                        ).orderBy(ts_name)
    df2 = df2.withColumn('tripNumber', F.when(F.col('tripNumber').isNotNull(),
                                              F.col('tripNumber') + F.lit(1)
                                             ).otherwise(F.col('tripNumber'))
                        )
    df2 = df2.withColumn('tripNumber', F.when(F.col('tripType') == 0, 0).otherwise(F.col('tripNumber')))
    df2 = df2.withColumn('tripNumber', F.when(F.col('tripNumber').isNull(), 0).otherwise(F.col('tripNumber')))
    
    # reset tripType
    df2 = df2.withColumn('tripType', F.col('trip')).orderBy(ts_name)
    df2 = df2.withColumn('tripNumber', F.when((F.col('tripMOT') == 0) &
                                              (F.col('tripType') == 0),
                                              0).otherwise(F.col('tripNumber'))
                         ).orderBy(ts_name)

    df2 = df2.select(ts_name,'dow','lat','lon','fixTypeCode','tripNumber','tripType','tripMOT')

    return df2

In [40]:
def trip_segmentation(df, ts_name, speed_segment_length):
    
    """
    
    """
    
    w1 = Window.orderBy(ts_name).rowsBetween(Window.unboundedPreceding, 0)
    w2 = Window.partitionBy('segment').orderBy(ts_name)
    
    app_fun = F.udf(lambda a,b,c,d: calc_distance(a,b,c,d))
    
    # trip segmentation
    
    df2 = df.withColumn('segment', F.when(F.col('trip') == 1, F.monotonically_increasing_id()))
    df2 = df2.withColumn('segment', F.when(F.col('segment').isNull() &
                                           F.col('trip').isNotNull(),
                                           F.last('segment', ignorenulls=True).over(w1)
                                           ).otherwise(F.col('segment'))
                        ).orderBy(ts_name)
    
    # compute duration and traveled distance for each segment
    
    df2 = df2.withColumn('pause', F.when(F.col('trip') == 1,
                                         F.col('duration')
                                        )
                        )
    
    df2 = df2.withColumn('pause_dist', F.when(F.col('trip') == 1,
                                              0.0
                                             )
                        )
    
    df2 = df2.withColumn('pause', F.when(F.col('pause') == F.col('duration'),
                                         F.col('cum_pause') - F.col('duration')
                                        ).otherwise(F.col('pause'))
                        ).orderBy(ts_name)
        
    df2 = df2.withColumn('pause', F.when(F.col('segment').isNotNull() &
                                         F.col('pause').isNull(), 
                                         F.last('pause', ignorenulls=True).over(w1)
                                        ).otherwise(F.col('pause'))
                        ).orderBy(ts_name)
        
    df2 = df2.withColumn('pause', F.when(F.col('segment').isNotNull(),
                                         F.col('cum_pause') - F.col('pause')
                                        ).otherwise(F.col('pause'))
                        )
    
         
    df2 = df2.withColumn('lat2', F.when(F.col('pause_dist').isNotNull(), F.col('lat')))
    df2 = df2.withColumn('lat2', F.when(F.col('pause_dist').isNull(),
                                        F.last('lat2', ignorenulls=True).over(w2)
                                       )
                                  .otherwise(F.col('lat'))
                        ).orderBy(ts_name)
    df2 = df2.withColumn('lat2', F.when(F.col('lat2').isNull(), F.col('lat')).otherwise(F.col('lat2')))
                        
    df2 = df2.withColumn('lon2', F.when(F.col('pause_dist').isNotNull(), F.col('lon')))
    df2 = df2.withColumn('lon2', F.when(F.col('pause_dist').isNull(), 
                                        F.last('lon2', ignorenulls=True).over(w2)
                                       ).otherwise(F.col('lon'))
                            ).orderBy(ts_name)
    df2 = df2.withColumn('lon2', F.when(F.col('lon2').isNull(), F.col('lon')).otherwise(F.col('lon2')))
        
    df2 = df2.withColumn('pause_dist', F.when(F.col('segment').isNotNull() &
                                              F.col('pause_dist').isNull(),
                                              app_fun(F.col('lat'),F.col('lon'),F.col('lat2'),F.col('lon2'))
                                             ).otherwise(F.col('pause_dist'))
                        )
    df2 = df2.drop(*['lat2','lon2'])
    
    # remove short sub-trips 
    
    
    df2 = df2.withColumn('t1', F.when((F.col('trip') == 4) &
                                        (F.col('pause_dist') <= speed_segment_length) &
                                        F.lead('trip',1).over(Window.orderBy(ts_name)).isNotNull(),
                                        F.col('segment') 
                                       )
                        )
    
    df2 = df2.withColumn('t1', F.when(F.col('segment').isNotNull() &
                                      F.col('t1').isNull(),
                                      F.first('t1', ignorenulls=True)
                                       .over(Window.partitionBy('segment')
                                                   .rowsBetween(0, Window.unboundedFollowing)
                                            )
                                      ).otherwise(F.col('t1'))
                         ).orderBy(ts_name)
    
    df2 = df2.withColumn('t1', F.when(F.col('t1').isNotNull() &
                                      (F.col('trip') == 4) &
                                      F.lead('t1',1).over(Window.orderBy(ts_name)).isNotNull(),
                                      F.lead('t1',1).over(Window.orderBy(ts_name))
                                     ).otherwise(F.col('t1'))
                        ).orderBy(ts_name)
    
    df2 = df2.withColumn('t2', F.when((F.col('trip') == 4) &
                                        (F.col('pause_dist') <= speed_segment_length) &
                                        F.lead('trip',1).over(Window.orderBy(ts_name)).isNotNull(),
                                        F.col('pause_dist')
                                       ).otherwise(0.0)
                        )
    df2 = df2.withColumn('t3', F.when(F.col('t1').isNotNull(),
                                      F.sum('t2').over(Window.partitionBy('t1').orderBy(ts_name)
                                                             .rowsBetween(Window.unboundedPreceding,0)
                                                      )
                                     )
                        ).orderBy(ts_name)
    df2 = df2.withColumn('t2', F.when((F.col('trip') == 4) &
                                      (F.col('t2') == F.col('t3')),
                                     F.col('t2') + F.lag('t3',1).over(Window.orderBy(ts_name))
                                     ).otherwise(F.col('t2'))
                        ).orderBy(ts_name)
    df2 = df2.withColumn('t3', F.when(F.col('t1').isNotNull(),
                                      F.sum('t2').over(Window.partitionBy('t1').orderBy(ts_name)
                                                             .rowsBetween(Window.unboundedPreceding,0)
                                                      )
                                     )
                        ).drop(*['t1','t2']).orderBy(ts_name)
  
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 4) &
                                        (F.col('t3') <= speed_segment_length) &
                                        F.lead('trip',1).over(Window.orderBy(ts_name)).isNotNull(),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        ).orderBy(ts_name)
    
    df2 = df2.withColumn('trip', F.when((F.col('trip') == 1) &
                                        (F.lag('t3',1).over(Window.orderBy(ts_name)) <= speed_segment_length),
                                        F.col('tripType')
                                       ).otherwise(F.col('trip'))
                        ).drop('t3').orderBy(ts_name)
    
    return df2

In [41]:
#%%time

vehicle_speed_cutoff = 35 # km/h
bicycle_speed_cutoff = 10 # km/h 
walk_speed_cutoff = 1 # km/h
speed_percentile = 90
speed_segment_length = 30 # m
min_trip_length = 100 # m
min_trip_duration = 180 # sec

ts_name = 'timestamp'
dist_name = 'distance'
speed_name = 'speed'

print("====> classify trips...")
start_time = time.time()
gps_data3 = classify_trips(gps_data2, ts_name, dist_name, speed_name, 
                           vehicle_speed_cutoff, bicycle_speed_cutoff, walk_speed_cutoff, 
                           min_trip_length, min_trip_duration, speed_segment_length, speed_percentile).cache()
gps_data3.count()
elapsed_time = time.time() - start_time
print("      time elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(elapsed_time))))

====> classify trips...
      time elapsed: 01:09:17


In [42]:
gps_data3.show(10000,False)

+-------------------+---+---------+-----------+-----------+----------+--------+-------+
|timestamp          |dow|lat      |lon        |fixTypeCode|tripNumber|tripType|tripMOT|
+-------------------+---+---------+-----------+-----------+----------+--------+-------+
|2016-08-16 18:23:25|2  |51.01609 |-114.099305|2          |0         |0       |0      |
|2016-08-16 18:23:30|2  |51.016075|-114.09925 |1          |0         |0       |0      |
|2016-08-16 18:23:35|2  |51.016078|-114.099263|1          |0         |0       |0      |
|2016-08-16 18:23:40|2  |51.016072|-114.09925 |1          |0         |0       |0      |
|2016-08-16 18:23:45|2  |51.016083|-114.099242|1          |0         |0       |0      |
|2016-08-16 18:23:50|2  |51.016085|-114.09924 |1          |0         |0       |0      |
|2016-08-16 18:23:55|2  |51.016088|-114.099247|1          |0         |0       |0      |
|2016-08-16 18:24:00|2  |51.016088|-114.099247|1          |0         |0       |0      |
|2016-08-16 18:24:05|2  |51.0160

In [None]:
gps_data3.drop(*['lat','lon','dow','distance','speed2','fixTypeCode','cum_pause','duration']).show(20000, False)
#gps_data.persist()

In [None]:
gps_data.coalesce(1).write.option("header",True).option("inferSchema","true").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv("gps_data.csv")



In [None]:
gps_data.printSchema()

In [None]:
gps_data.count()

In [None]:
gps_data_path = '/Users/molinaro/Documents/GITHUB/PALMS/gps_data.csv/gps_data.csv'
gps_data = spark.read.csv(gps_data_path, header=True, inferSchema=True)
#gps_data = gps_data.limit(40000)
print(gps_data.printSchema())

### Process GPS data in Calgary/gps/A01r.csv

In [None]:
# segment 108

In [None]:
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3")
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
# segment 64

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df, 'j4', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE4").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j3']).show(20000, False)

In [None]:
df2 = set_pause(df2, 'i1', ts_name).checkpoint()
df2 = check_case(df2, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df2, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df2 = set_pause(df2, 'i1', ts_name).checkpoint()
df2 = check_case(df2, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df2, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df2 = set_pause(df2, 'i1', ts_name).checkpoint()
df2 = check_case(df2, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df2, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df3 = set_pause(df3, 'i1', ts_name).checkpoint()
df3 = check_case(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df3 = set_pause(df3, 'i1', ts_name).checkpoint()
df3 = check_case(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df3, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df4, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df4, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df4, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
# segment 28

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
# segment 13

In [None]:
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i1', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df2 = df.drop(*['j1','j2','j3','j4'])
df2 = set_pause(df2, 'i1', ts_name).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df2 = check_case(df2, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df2, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df3 = df2.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i1', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df3 = df3.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i1', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df3 = df3.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i1', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i3','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
# segment 108
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i3', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df = check_case(df, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df2 = df2.drop(*['j1','j2','j3','j4'])
df2 = set_pause(df2, 'i3', ts_name).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
df2 = check_case(df2, 'i3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df2, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i1','j4']).show(20000, False)

In [None]:
df3 = df3.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i3', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i2']).show(20000, False)

In [None]:
# segment 166
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = proc_segment(df, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = df.drop(*['j1','j2','j3','j4'])
df = set_pause(df, 'i2', ts_name).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df = check_case(df, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j3']).show(20000, False)

In [None]:
df2 = proc_segment(df, 'j4', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE4").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j3']).show(20000, False)

In [None]:
df2 = df2.drop(*['j1','j2','j3'])
df2 = set_pause(df2, 'j4', ts_name).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j3']).show(20000, False)

In [None]:
df2 = check_case(df2, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df2 = proc_segment(df2, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df2.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4','j1','j2']).show(20000, False)

In [None]:
df3 = df2.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i2', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df3 = df3.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i2', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df3 = df3.drop(*['j1','j2','j3','j4'])
df3 = set_pause(df3, 'i2', ts_name).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df3 = check_case(df3, 'i2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df3 = proc_segment(df3, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4','j1','j2']).show(20000, False)

In [None]:
df3.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4','j1','j2','j3']).show(20000, False)

In [None]:
# segment 180
df4 = proc_segment(df3, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = df4.drop(*['j1','j2','j3','j4'])
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df4 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df4, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = df4.drop(*['j1','j2','j3','j4'])
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df4 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df4 = proc_segment(df4, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df4 = df4.drop(*['j1','j2','j3','j4'])
df4 = set_pause(df4, 'i1', ts_name).checkpoint()
df4.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df5 = check_case(df4, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df5 = proc_segment(df5, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df5 = df5.drop(*['j1','j2','j3','j4'])
df5 = set_pause(df5, 'i1', ts_name).checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df5 = check_case(df5, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df5 = proc_segment(df5, 'j2', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 3, "CASE2").checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3']).show(20000, False)

In [None]:
df5 = df5.drop(*['j1','j2','j3','j4'])
df5 = set_pause(df5, 'i1', ts_name).checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df5 = check_case(df5, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df5 = proc_segment(df5, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df5.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
#segment 188
df6 = proc_segment(df5, 'j1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 3, 2, "CASE1").checkpoint()
df6.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df6 = df6.drop(*['j1','j2','j3','j4'])
df6 = set_pause(df6, 'i1', ts_name).checkpoint()
df6.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3']).show(20000, False)

In [None]:
df6 = check_case(df6, 'i1', ts_name, min_dist_per_min, min_pause_duration, max_pause_time).checkpoint()
df6.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i2','i3','j4']).show(20000, False)

In [None]:
df6 = proc_segment(df6, 'j3', ts_name, min_dist_per_min, min_pause_duration, max_pause_time, 0, 0, "CASE3").checkpoint()
df6.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','i1','i3','j4']).show(20000, False)

In [None]:
df6.drop(*['lat','lon','dow','distance','fixTypeCode',
         'cum_pause','total_sec','height','speed','heading','state_cp','tripType_cp','j4','j1','j2','j3']).show(20000, False)

In [None]:
def push_and_pop(rdd):
    # two transformations: moves the head element to the tail
    first = rdd.first()
    return rdd.filter(
        lambda obj: obj != first
    ).union(
        sc.parallelize([first])
    )

def serialize_and_deserialize(rdd):
    # perform a collect() action to evaluate the rdd and create a new instance
    return sc.parallelize(rdd.collect())

def do_test(serialize=False):
    rdd = sc.parallelize(range(1000))
    for i in xrange(25):
        t0 = time.time()
        rdd = push_and_pop(rdd)
        if serialize:
            rdd = serialize_and_deserialize(rdd)
        print("%.3f" % (time.time() - t0))

do_test()

In [None]:
tw = str(60) + ' seconds'
sw = str(60) + ' seconds'

st_sec = str(df.first()[0].second)

offset = st_sec + ' seconds' 

param_name = 'distance'

df.groupBy(
            F.window(ts_name, tw, sw, offset)
          ).sum(param_name)\
           .select('window','sum({})'.format(param_name))\
           .withColumn('start', F.col('window').start)\
           .withColumn('end', F.col('window').end)\
           .drop('window')\
           .show(20, False)
"""
df.groupBy(
            F.window(ts_name, tw, sw, offset)
          ).sum()\
           .sort('window.start')\
           .filter(F.col('sum({})'.format(param_name))>=min_dist_per_min)\
           .select('window')\
           .withColumn('start', F.col('window').start)\
           .withColumn('end', F.col('window').end)\
           .drop('window')\
           .show(20, False)
"""

## Accelerometer data processing

row = Row("id", "date", "value")
df = sc.parallelize([
        row(1, "2015-01-01", 20.0),
        row(2, "2015-01-02", 30.0),
        row(3, "2015-01-03", 0.0),
        row(4, "2015-01-04", 30.0),
        row(5, "2015-01-05", 50.0),
        row(6, "2015-01-06", 0.0),
        row(7, "2015-01-07", 0.0),
        row(8, "2015-01-08", 0.0),
        row(9, "2015-01-09", 0.0),
        row(10, "2015-01-10", 0.0),
        row(11, "2015-01-11", 20.0),
        row(12, "2015-01-12", 0.0),
        row(13, "2015-01-13", 0.0),
        row(14, "2015-01-14", 40.0),
        row(15, "2015-01-15", 8.0),
        row(16, "2015-01-16", 0.0)
    ]).toDF().withColumn("date", F.col("date").cast("timestamp"))

df.show(20, False)

tw = str(24*60) + ' minutes'
sw = str(24*3600) + ' seconds'
offset = str(0) + ' seconds'
datetime_name = 'date'
param_name = 'value'
param_value = 0

df.groupBy(
            F.window(datetime_name, '{}'.format(tw),'{}'.format(sw),'{}'.format(offset))
         ).avg(param_name)\
          .sort('window.start')\
          .filter(F.col('avg({})'.format(param_name))==param_value)\
          .select('window')\
          .withColumn('start', F.col('window').start)\
          .withColumn('end', F.col('window').end)\
          .drop('window')\
          .show(20, False)

In [None]:
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/Calgary/acc/A01r_5sec.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/APEN baseline/acc/14A1.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/APEN follow-up/acc/24A02.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/Barcelona_youth/acc/G01FWS6.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/DPS baseline/acc/117101.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/DPS follow_up/acc/217101.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/IPEN_Y_BE/acc/32_0006.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/IPEN_Y_DK/acc/160101.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/NBBB baseline/acc/wcmc201003a1_cdt.csv'
#acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/raw/NBBB follow_up/acc/201004a1.csv'
acc_path_raw = '/Users/molinaro/Documents/GITHUB/PALMS/data/PARC_data/NYC/accelerometer/2001_1s.csv'

In [None]:
acc_data_raw = spark.read.text(acc_path_raw)
acc_data_raw.cache()

In [None]:
acc_data_raw.count()

In [None]:
interval, acc_data = gen_acc_dataframe(acc_data_raw)

In [None]:
print(interval)
acc_data.show(20,False)

In [None]:
acc_data.printSchema()

In [None]:
acc_columns = ['axis1','axis2','axis3','steps','lux','incl_off','incl_standing','incl_sitting','incl_lying']
acc_data_ext = split_acc_data(acc_data, acc_columns)
acc_data_ext.cache()

In [None]:
ts_name = 'timestamp'
window = 5 #seconds

acc_data_act = select_acc_intervals(acc_data_ext, ts_name, interval, window, False, True)
acc_data_act.show(10000, False)

In [None]:
#LightCO, ModerateCO, HardCO, VeryHardCO = (100, 1953, 5725, 99999)
#LightCO, ModerateCO, HardCO, VeryHardCO = (100, 1953, 5725, 9498)
#LightCO, ModerateCO, HardCO, VeryHardCO = (500, 2000, 3000, 4500)
LightCO, ModerateCO, HardCO, VeryHardCO = (133, 193, 233, 9999)
window = 5 #seconds
acc_data_act = activity_count(acc_data_act, 'timestamp', window, LightCO, ModerateCO, HardCO, VeryHardCO, False)
acc_data_act.cache()

In [None]:
acc_data_act.show(10000, False)

In [None]:
acc_data_ext.show(10000, False)

In [None]:
# DETERMINE NON-WEAR PERIOD
ts_name = 'timestamp'
AC_name = 'activity'
AI_name = 'activityIntensity'
new_col = 'non_wear'
window = 5
minutes_zeros_row = 90

acc_data_act = non_wear_filter(acc_data_act, ts_name, AC_name, AI_name, window, minutes_zeros_row)
acc_data_act#.cache()
acc_data_act.count()

date_format = '%Y-%m-%d'
time_format = '%H:%M:%S'
datetime_format = date_format + ' ' + time_format
startdate = datetime.strptime('2016-08-16 18:23:25', datetime_format) 
df = non_wear_filter(acc_data_act.filter(F.col('timestamp')>=startdate), 
                                         ts_name, AC_name, AI_name, interval)

In [None]:
# DETERMINE ACTIVITY BOUT NUMBER
ts_name = 'timestamp'
AC_name = 'activity'
new_col = 'activityBoutNumber'
window = 5
UP=9999
LOW=1953
DURATION=10
TOL=2

acc_data_act = activity_bout_filter(acc_data_act, ts_name, AC_name, new_col, window,
                                    UP, LOW, DURATION, TOL)
acc_data_act.cache()
acc_data_act.count()

In [None]:
acc_data_act.show(20, False)

In [None]:
acc_data_act.join(acc_data_ext, ['timestamp'], how='left' ).orderBy('timestamp').show(20,False)

df2.filter(F.col('activityBoutNumber')==1).orderBy('timestamp').show(2000)

In [None]:
import glob
list_procs = sorted(glob.glob("PALMS_output/*.csv"))
print(list_procs)
header_saved = False
with open('PALMS_output.csv', 'w') as fout:
    for filename in list_procs:
        with open(filename) as fin:
            head = next(fin)
            if not header_saved:
                fout.write(head)
                header_saved = True
            for line in fin:
                fout.write(line)

In [None]:
# DETERMINE SEDENTARY BOUT NUMBER
ts_name = 'timestamp'
AC_name = 'activity'
new_col = 'sedentaryBoutNumber'
window = 5
UP=180
LOW=0
DURATION=30
TOL=1

acc_data_act = sedentary_bout_filter(acc_data_act, ts_name, AC_name, new_col, window,
                                     UP, LOW, DURATION, TOL)
acc_data_act.cache()
acc_data_act.count()

df3.filter(F.col('sedentaryBoutNumber')==5).orderBy('timestamp').show(2000)

In [None]:
acc_data_act.show(20000,False)

tw = '10 minutes'
sw = '10 minutes'
st_min = acc_data_act.first()[0].minute
st_sec = acc_data_act.first()[0].second
start_time = (st_min-10*(st_min//10))*60 + st_sec
offset = '{} seconds'.format(str(start_time))
param_name = 'activityIntensity'
datetime_name = 'timestamp'

df_tws = df.groupBy(
            F.window(datetime_name, '{}'.format(tw),'{}'.format(sw),'{}'.format(offset))
             ).count()\
              .sort('window.start')\
              .withColumn('start', F.col('window').start)\
              .withColumn('end', F.col('window').end)\
              .drop('window')
              
df_tws.show(100, False)           

from pyspark.sql import Row
from pyspark.sql.window import Window

def test_window(ws):
    
    """
        Small dataframe to test the sliding window 
    
    """
    
    row = Row("id", "date", "value")
    
    df = sc.parallelize([
        row(1, "2015-01-01", 20.0),
        row(2, "2015-01-06", 10.0),
        row(3, "2015-01-07", None),
        row(4, "2015-01-12", 30.0),
        row(5, "2015-01-13", 5.0),
        row(6, "2015-01-14", None),
        row(7, "2015-01-15", None),
        row(8, "2015-01-16", None),
        row(9, "2015-01-17", None),
        row(10, "2015-01-18", 20.0),
        row(11, "2015-01-19", 20.0),
        row(12, "2015-01-20", None),
        row(13, "2015-01-21", None)
    ]).toDF().withColumn("date", F.col("date").cast("timestamp"))
    
    df = df.withColumn('tot_sec', F.col('date').cast('long'))
    df.createOrReplaceTempView('df')
    df = spark.sql("""select *, tot_sec - lag(tot_sec, 1, 0)
                            OVER (ORDER by date) AS diff
                            FROM df""") 
    df = df.withColumn('new_value', F.when(df['value'].isNotNull(),df['value'])\
                        .otherwise(F.last(df['value'], ignorenulls=True)\
                                  .over(Window.orderBy('tot_sec')\
                                       .rangeBetween(-ws,0)))  
             )
    return df.show()

test_window(9000)

## Merge dataframes

In [None]:
merge_data = gps_data.join(acc_data, 'timestamp', how='left' ).orderBy('timestamp')
merge_data.cache()
merge_data.count()

In [None]:
merge_data.show(20, False)

In [None]:
merge_data.printSchema()

In [None]:
merge_data.count()

In [None]:
merge_data_act = gps_data.join(acc_data_act, ['timestamp'], how='left' ).orderBy('timestamp')
merge_data_act.count()

In [None]:
merge_data_act.printSchema()

In [None]:
merge_data_act.toPandas().head(40)

In [None]:
merge_data_act = gps_data.join(df3, ['timestamp'], how='left' ).orderBy('timestamp')
merge_data_act.cache()

In [None]:
merge_data_act.toPandas().head(40)

In [None]:
merge_data_act.coalesce(1).write.option("header",True).option("inferSchema","true").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").csv("merged_data")


In [None]:
merge_data_ext = gps_data.join(acc_data_ext, 'timestamp', how='left').orderBy('timestamp')

In [None]:
merge_data_ext.toPandas().head(20)