In [1]:
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
from pyspark.sql.types import *
import seaborn as sns
import pandas as pd
import numpy as np
import geopy.distance
import matplotlib
import datetime
import os

import fastplot
%matplotlib inline

In [2]:
#schema of initial data 
schema_inital_segment = 'idRequest string, deviceId string, dateTime timestamp, \
latitude double, longitude double, speedKmh Integer, heading Integer,\
accuracyDrop Integer, EngineStatus Integer, Type Integer, segmentDistance double,\
segmentDuration double, segmentSpeedKmH double'
#reading csv
path = os.path.abspath(os.getcwd())
df = spark.read.csv('file:///%s/all_merged/*'%path,sep=",", schema = schema_inital_segment)

<h2>1-reading input data to create the segment table</h2>

In [3]:
#dateTime format
df = df.withColumn('dateTime', F.to_timestamp('dateTime'))
df.count()

333631879

<h2>2-making segment table</h2>

In [4]:
schema_segment_inital ='deviceId String,\
                      type Integer,\
                      dateTime string,\
                      startLatitude double,\
                      startLongitude double,\
                      startEngineStatus Integer,\
                      startAccuracyDrop Integer,\
                      endAccuracyDrop Integer,\
                      endEngineStatus Integer,\
                      endLatitude double,\
                      endLongitude double,\
                      segmentDistance double,\
                      segmentDuration double,\
                      segmentSpeedKmH double'

@F.pandas_udf(schema_segment_inital, functionType=F.PandasUDFType.GROUPED_MAP)
def segment_table_maker(df):
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
    df = df.sort_values(by="dateTime")
    df.reset_index(inplace=True)
    df_segment_temp=pd.DataFrame()

    '''Creating empty lists for columns '''
    deviceId = []; dateTime = []; 
    startLat = []; startLon = [];startEngineStatus = [];startAccuracyDrop = [];
    endLat = [];endLon = [];endEngineStatus = [];endAccuracyDrop = [];
    distance = [];duration = [];speed = [];
    Type = df['Type'][0]
    for index,row in df.iterrows():
        if index == 0:
            pass
        else:
            deviceId.append(row['deviceId'])
            dateTime.append(str(df.loc[index-1,'dateTime']))
            startLat.append(df.loc[index-1,'latitude'])
            startLon.append(df.loc[index-1,'longitude'])
            startEngineStatus.append(df.loc[index-1,'EngineStatus'])
            startAccuracyDrop.append(df.loc[index-1,'accuracyDrop'])
            endLat.append(row['latitude'])
            endLon.append(row['longitude'])
            endEngineStatus.append(row['EngineStatus'])
            endAccuracyDrop.append(row['accuracyDrop'])
            distance.append(row['segmentDistance'])
            duration.append(row['segmentDuration'])
            speed.append(row['segmentSpeedKmH'])               
            
    
    df_segment_temp['deviceId'] = deviceId
    df_segment_temp['type'] = Type
    df_segment_temp['dateTime'] = dateTime
    df_segment_temp['startLatitude'] = startLat
    df_segment_temp['startLongitude'] = startLon
    df_segment_temp['startLongitude'] = startLon
    df_segment_temp['startEngineStatus'] = startEngineStatus 
    df_segment_temp['startAccuracyDrop'] = startAccuracyDrop
    df_segment_temp['endLatitude'] = endLat
    df_segment_temp['endLongitude'] = endLon                       
    df_segment_temp['endEngineStatus'] = endEngineStatus
    df_segment_temp['endAccuracyDrop'] = endAccuracyDrop      
    df_segment_temp['segmentDistance'] = distance
    df_segment_temp['segmentDuration'] = duration
    df_segment_temp['segmentSpeedKmH'] = speed 

    return df_segment_temp

In [5]:
df_segment_temp = df.groupby("deviceId").apply(segment_table_maker)

In [6]:
df_segment_temp.count()

333341694

In [7]:
# df_segment_temp.write.csv('file:///%s/segment_table_no_filter'%path, sep=",")

In [7]:
'''applying filter: dropping segment durations longer than 70s
as well as segments with speed higher than 130KmH or equal to absolute zero'''
df_segment_temp = df_segment_temp.filter(df_segment_temp['segmentDuration']<=70)
df_segment_temp = df_segment_temp.filter(df_segment_temp['segmentSpeedKmH']<=130)
df_segment_temp = df_segment_temp.filter(df_segment_temp['segmentSpeedKmH']!=0)

In [8]:
df_segment_temp.count()

300841429

In [9]:
df_segment_temp.limit(3).show(15)

+--------+----+-------------------+-------------+--------------+-----------------+-----------------+---------------+---------------+-----------+------------+---------------+---------------+-------------------+
|deviceId|type|           dateTime|startLatitude|startLongitude|startEngineStatus|startAccuracyDrop|endAccuracyDrop|endEngineStatus|endLatitude|endLongitude|segmentDistance|segmentDuration|    segmentSpeedKmH|
+--------+----+-------------------+-------------+--------------+-----------------+-----------------+---------------+---------------+-----------+------------+---------------+---------------+-------------------+
| 2507297|   2|2019-04-02 10:15:28|     45.08672|       7.60882|                1|                1|              1|              1|   45.08673|     7.60883|           1.36|           27.0|0.18159155668585095|
| 2507297|   2|2019-04-02 10:15:55|     45.08673|       7.60883|                1|                1|              1|              1|   45.08674|     7.60884|   

In [10]:
max_value = df_segment_temp.agg({"segmentSpeedKmH": "max"}).collect()[0][0]
max_value

129.999996335764

In [16]:
'''adding columns related to grid table'''
schema_segments_ultimate ='deviceId String,\
                          type Integer,\
                          dateTime string,\
                          startLatitude double,\
                          startLongitude double,\
                          startEngineStatus Integer,\
                          startAccuracyDrop Integer,\
                          endAccuracyDrop Integer,\
                          endEngineStatus Integer,\
                          endLatitude double,\
                          endLongitude double,\
                          segmentDistance double,\
                          segmentDuration double,\
                          segmentSpeedKmH double,\
                          xtmp double,\
                          ytmp double,\
                          start_x integer,\
                          start_y integer,\
                          end_x integer,\
                          end_y integer,\
                          cell_id_start string,\
                          cell_id_end string'


In [17]:
'''shift by 100m in degrees'''
shiftInMeterLat = 0.0008983152841182118 #100m
shiftInMeterLon = 0.001270644533487797
# ============= Torino and countryside ========================================
minLat = 44.96282106687191
minLon = 7.502048016422193
maxLat = 45.19265016665321 
maxLon = 7.791812422724604
# =============================================================================

In [18]:
@F.pandas_udf(schema_segments_ultimate, functionType=F.PandasUDFType.GROUPED_MAP)
def segment_position_calculator(df_segments):
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
    
    df_segments = df_segments.sort_values(by="dateTime")
    df_segments.reset_index(drop = True, inplace=True)
    
    '''calculating the cells that these segments belong to'''
    df_segments['xtmp'] = (df_segments['startLongitude']-minLon)/shiftInMeterLon
    df_segments['ytmp'] = (df_segments['startLatitude']-minLat)/shiftInMeterLat
    df_segments['start_x'] = (df_segments['xtmp'].apply(lambda x : np.floor(x)))+1
    df_segments['start_y'] = (df_segments['ytmp'].apply(lambda y : np.floor(y)))+1

    df_segments['xtmp'] = (df_segments['endLongitude']-minLon)/shiftInMeterLon
    df_segments['ytmp'] = (df_segments['endLatitude']-minLat)/shiftInMeterLat
    df_segments['end_x'] = (df_segments['xtmp'].apply(lambda x : np.floor(x)))+1
    df_segments['end_y'] = (df_segments['ytmp'].apply(lambda y : np.floor(y)))+1
    df_segments['cell_id_start'] = [f'{int(df_segments.start_y.values[i])}_{int(df_segments.start_x.values[i])}' 
                          for i in range(len(df_segments))]
    df_segments['cell_id_end'] = [f'{int(df_segments.end_y.values[i])}_{int(df_segments.end_x.values[i])}' 
                          for i in range(len(df_segments))]
#     df_segments = df_segments.sort_values(['id'])
    return df_segments

In [19]:
df_segment = df_segment_temp.groupby("deviceId").apply(segment_position_calculator)

In [20]:
df_segment.write.csv('file:///%s/segment_table'%path, sep=",")

In [14]:
df_segment.count()

69743658