## 0. Preparation

In [54]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# from pyspark.sql.functions import udf, from_unixtime
from pyspark.sql import functions as F
from math import radians, cos, sin, asin, sqrt, atan2, degrees
from datetime import datetime
from pyspark.sql.window import Window
import pandas as pd

Initialize Spark

In [247]:
# sc.stop()

In [2]:
sc = SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)

In [240]:
spark = SparkSession.builder.master("local").\
    appName("Simple App").\
    config("spark.some.config.option", "some-value").\
    getOrCreate()

Configure directories for source data files and log files

In [56]:
sample_data_1_path = 'xian_sample_1.csv'
full_data_1_path = 'gps_20161001.csv'
sample_dir = '../data/'
full_data_dir = '../full_data/'
output_dir = '../output/'
boundary_dir = '../log/boundary.csv'

In [76]:
data_file = sample_dir + sample_data_1_path

Some constant variables

In [45]:
DEBUG = True
SHOWLINES = 5
TIMEFORMAT = "yyyy-MM-dd HH:mm:ss"

Some utility functions

In [33]:
def check_df(df):
    if DEBUG:
        print(df.show(SHOWLINES))
        df.printSchema()

In [39]:
def get_distance(longit_a, latit_a, longit_b, latit_b):
    """
    Reference:
        https://medium.com/@nikolasbielski/using-a-custom-udf-in-pyspark-to-compute-haversine-distances-d877b77b4b18
    Return:
        double(,3): unit in km
    """ 
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
    # Calculate Distance, unit = km
    distance = central_angle * radius
    return abs(round(distance, 3))

udf_get_distance = F.udf(get_distance)

In [44]:
def get_velocity(distance, time):
    """
    Return:
        double(,3): unit in km/h
    """ 
    return(round(distance/time*3600, 3))

udf_get_velocity = F.udf(get_velocity)

In [50]:
def get_direction(longit_a, latit_a, longit_b, latit_b):
    diffLong = longit_b - longit_a
        
    x = sin(diffLong) * cos(latit_b)
    y = cos(latit_a) * sin(latit_b) - (sin(latit_a)
            * cos(latit_b) * cos(diffLong))

    initial_bearing = atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = degrees(initial_bearing)
    compass_bearing = (initial_bearing + 360) % 360
    return round(compass_bearing, 3)

udf_get_direction = F.udf(get_direction)

## 1. Import data from csv using PySpark

Load Didi data with pre-defined schema

In [4]:
fields = [StructField('driver_id', StringType(), True), 
          StructField('order_id', StringType(), True),
         StructField('timestamp', StringType(), True),
         StructField('lon', DoubleType(), True),
         StructField('lat', DoubleType(), True)]
schema = StructType(fields)

In [77]:
didi_df = sqlContext.read.csv(data_file, schema=schema)

In [22]:
check_df(didi_df)

+--------------------+--------------------+----------+---------+--------+
|           driver_id|            order_id| timestamp|      lon|     lat|
+--------------------+--------------------+----------+---------+--------+
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|1475277482|108.92466|34.27657|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|1475277488|108.92527|34.27658|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|1475277506| 108.9276|34.27659|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|1475277476|108.92399|34.27655|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|1475277515| 108.9291| 34.2766|
+--------------------+--------------------+----------+---------+--------+
only showing top 5 rows

None
root
 |-- driver_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)



## 2. Data Processing

Convert unixtime to proper timestamp format

In [23]:
didi_df_1 = didi_df.withColumn('timestamp', F.from_unixtime("timestamp"))

In [24]:
check_df(didi_df_1)

+--------------------+--------------------+-------------------+---------+--------+
|           driver_id|            order_id|          timestamp|      lon|     lat|
+--------------------+--------------------+-------------------+---------+--------+
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|2016-10-01 07:18:02|108.92466|34.27657|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|2016-10-01 07:18:08|108.92527|34.27658|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|2016-10-01 07:18:26| 108.9276|34.27659|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|2016-10-01 07:17:56|108.92399|34.27655|
|a01b8439e1e42ffcd...|f11440a64a0f084fe...|2016-10-01 07:18:35| 108.9291| 34.2766|
+--------------------+--------------------+-------------------+---------+--------+
only showing top 5 rows

None
root
 |-- driver_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)



Create index for each order_id, sorted by timestamp in descending order. <br/>
This is to identify nodes at time t and t+1

In [27]:
window = Window.partitionBy(didi_df_1['order_id']).orderBy(didi_df_1['timestamp'])

In [31]:
didi_df_2 = didi_df_1.withColumn('rank', F.dense_rank().over(window))
check_df(didi_df_2)

+--------------------+--------------------+-------------------+---------+--------+----+
|           driver_id|            order_id|          timestamp|      lon|     lat|rank|
+--------------------+--------------------+-------------------+---------+--------+----+
|a01b8439e1e42ffcd...|210acc529e4b9fc55...|2016-10-01 21:45:29|108.94276|34.27099|   1|
|a01b8439e1e42ffcd...|210acc529e4b9fc55...|2016-10-01 21:45:32|108.94276|34.27099|   2|
|a01b8439e1e42ffcd...|210acc529e4b9fc55...|2016-10-01 21:45:35|108.94262|34.27108|   3|
|a01b8439e1e42ffcd...|210acc529e4b9fc55...|2016-10-01 21:45:38|108.94242|34.27108|   4|
|a01b8439e1e42ffcd...|210acc529e4b9fc55...|2016-10-01 21:45:41|108.94225|34.27108|   5|
+--------------------+--------------------+-------------------+---------+--------+----+
only showing top 5 rows

None
root
 |-- driver_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double

Create a duplicate for t+1

In [34]:
didi_df_2_post = didi_df_2.\
    withColumnRenamed("rank", "rank_new").\
    withColumnRenamed("timestamp", "timestamp_new").\
    withColumnRenamed("lon", "lon_new").\
    withColumnRenamed("lat", "lat_new")

In [35]:
didi_df_2_post = didi_df_2_post.drop('driver_id')

In [36]:
check_df(didi_df_2_post)

+--------------------+-------------------+---------+--------+--------+
|            order_id|      timestamp_new|  lon_new| lat_new|rank_new|
+--------------------+-------------------+---------+--------+--------+
|210acc529e4b9fc55...|2016-10-01 21:45:29|108.94276|34.27099|       1|
|210acc529e4b9fc55...|2016-10-01 21:45:32|108.94276|34.27099|       2|
|210acc529e4b9fc55...|2016-10-01 21:45:35|108.94262|34.27108|       3|
|210acc529e4b9fc55...|2016-10-01 21:45:38|108.94242|34.27108|       4|
|210acc529e4b9fc55...|2016-10-01 21:45:41|108.94225|34.27108|       5|
+--------------------+-------------------+---------+--------+--------+
only showing top 5 rows

None
root
 |-- order_id: string (nullable = true)
 |-- timestamp_new: string (nullable = true)
 |-- lon_new: double (nullable = true)
 |-- lat_new: double (nullable = true)
 |-- rank_new: integer (nullable = true)



Join the two tables for t and t+1 on order ID

In [37]:
didi_df_3 = didi_df_2.alias('a').join(didi_df_2_post.alias('b'),on = 'order_id').where('a.rank == b.rank_new-1')
check_df(didi_df_3)

+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+
|            order_id|           driver_id|          timestamp|      lon|     lat|rank|      timestamp_new|  lon_new| lat_new|rank_new|
+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:29|108.94276|34.27099|   1|2016-10-01 21:45:32|108.94276|34.27099|       2|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:32|108.94276|34.27099|   2|2016-10-01 21:45:35|108.94262|34.27108|       3|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:35|108.94262|34.27108|   3|2016-10-01 21:45:38|108.94242|34.27108|       4|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:38|108.94242|34.27108|   4|2016-10-01 21:45:41|108.94225|34.27108|       5|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-

Get distance using coordinates of $node_t$ and $node_{t+1}$

In [41]:
didi_df_4 = didi_df_3.withColumn('distance', udf_get_distance(
    didi_df_3.lon, didi_df_3.lat, 
    didi_df_3.lon_new, didi_df_3.lat_new).cast(DoubleType()))

In [42]:
check_df(didi_df_4)

+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+
|            order_id|           driver_id|          timestamp|      lon|     lat|rank|      timestamp_new|  lon_new| lat_new|rank_new|distance|
+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:29|108.94276|34.27099|   1|2016-10-01 21:45:32|108.94276|34.27099|       2|     0.0|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:32|108.94276|34.27099|   2|2016-10-01 21:45:35|108.94262|34.27108|       3|   0.017|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:35|108.94262|34.27108|   3|2016-10-01 21:45:38|108.94242|34.27108|       4|    0.02|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:38|108.94242|34.27108|   4|2016-10-01 21:45:41|108.94225|34.27108|    

Get time difference between $node_t$ and $node_{t+1}$

In [47]:
timeDiff = (F.unix_timestamp('timestamp_new', format=TIMEFORMAT)
            - F.unix_timestamp('timestamp', format=TIMEFORMAT))
didi_df_5 = didi_df_4.withColumn("eclipse_time", timeDiff)

Get velocity between $node_t$ and $node_{t+1}$

In [48]:
didi_df_6 = didi_df_5.withColumn("velocity", udf_get_velocity(
    didi_df_5.distance, didi_df_5.eclipse_time).cast(DoubleType()))

In [49]:
check_df(didi_df_6)

+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+------------+--------+
|            order_id|           driver_id|          timestamp|      lon|     lat|rank|      timestamp_new|  lon_new| lat_new|rank_new|distance|eclipse_time|velocity|
+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+------------+--------+
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:29|108.94276|34.27099|   1|2016-10-01 21:45:32|108.94276|34.27099|       2|     0.0|           3|     0.0|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:32|108.94276|34.27099|   2|2016-10-01 21:45:35|108.94262|34.27108|       3|   0.017|           3|    20.4|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:35|108.94262|34.27108|   3|2016-10-01 21:45:38|108.94242|34.27108|       4|    0.02|           3|    24.0

Get compass bearing when travelling from $node_t$ to $node_{t+1}$

In [51]:
didi_df_7 = didi_df_6.withColumn('direction', udf_get_direction(
    didi_df_6.lon, didi_df_6.lat, 
    didi_df_6.lon_new, didi_df_6.lat_new).cast(DoubleType()))

In [52]:
check_df(didi_df_7)

+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+------------+--------+---------+
|            order_id|           driver_id|          timestamp|      lon|     lat|rank|      timestamp_new|  lon_new| lat_new|rank_new|distance|eclipse_time|velocity|direction|
+--------------------+--------------------+-------------------+---------+--------+----+-------------------+---------+--------+--------+--------+------------+--------+---------+
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:29|108.94276|34.27099|   1|2016-10-01 21:45:32|108.94276|34.27099|       2|     0.0|           3|     0.0|      0.0|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:32|108.94276|34.27099|   2|2016-10-01 21:45:35|108.94262|34.27108|       3|   0.017|           3|    20.4|   56.172|
|210acc529e4b9fc55...|a01b8439e1e42ffcd...|2016-10-01 21:45:35|108.94262|34.27108|   3|2016-10-01 21:45:38|108.9424

## 3. Construct the Grid

Load the boundaries from existing file

In [80]:
boundary_df = pd.read_csv(boundary_dir, skipinitialspace=True )
boundary_df.head()

Unnamed: 0,filename,execution_time,lat_max,lat_min,lon_max,lon_min,time_max,time_min
../data/xian_sample_1.csv,03/08/2019,00:13:26,34.27678,34.21653,108.98842,108.92246,2016-10-01 23:13:11,2016-10-01 07:17:26
../data/xian_sample_2.csv,03/08/2019,00:13:27,34.23308,34.21647,108.93969,108.9117,2016-10-02 05:52:25,2016-10-02 05:47:09
../data_full/xian/gps_20161002,03/08/2019,00:19:22,34.28017,34.20531,108.9986,108.91119,2016-10-03 00:01:11,2016-10-02 00:01:01
../data_full/xian/gps_20161001,03/08/2019,00:22:49,34.28022,34.20531,108.9986,108.91118,2016-10-02 00:01:10,2016-10-01 00:02:12


In [73]:
lat_max = boundary_df["lat_max"].max()
lat_min = boundary_df["lat_min"].min()
lon_max = boundary_df["lon_max"].max()
lon_min = boundary_df["lon_min"].min()
time_max = boundary_df["time_max"].max()
time_min = boundary_df["time_min"].min()

In [74]:
print(lat_max, lat_min, lon_max, lon_min, time_max, time_min)

34.28022 34.20531 108.9986 108.91118 2016-10-03 00:01:11 2016-10-01 00:02:12
