## 0. Preparation

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from math import radians, cos, sin, asin, sqrt, atan2, degrees
from datetime import datetime
from time import mktime
from pyspark.sql.window import Window
import pandas as pd

Initialize Spark

In [2]:
# sc.stop()

In [3]:
conf = SparkConf().setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')])

sc = SparkContext("local", "Simple App", conf=conf)
sqlContext = SQLContext(sc)

In [4]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '65419'),
 ('spark.executor.id', 'driver'),
 ('spark.cores.max', '3'),
 ('spark.app.id', 'local-1551988008103'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', 'zis-mbp-2'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.memory', '8g'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Simple App'),
 ('spark.ui.showConsoleProgress', 'true')]

Configure directories for source data files and log files

In [5]:
sample_data_1_file = 'xian_sample_1.csv'
full_data_1_file = 'gps_20161001'
sample_dir = '../data/'
full_data_dir = '../data_full/xian/'
output_dir = '../output/'
boundary_path = '../log/boundary.csv'

In [44]:
# data_file = sample_dir + sample_data_1_file
data_file = full_data_dir + full_data_1_file
output_file = output_dir + full_data_1_file

Some constant variables

In [7]:
DEBUG = False
SHOWLINES = 5
TIMEFORMAT = "yyyy-MM-dd HH:mm:ss"
grid_x_size = 1 # Grid with horizontal length of 1 km
grid_y_size = 1 # Grid with vertical length of 1 km
time_interval = 600 # Unit: sec

Some utility functions

In [8]:
def check_df(df):
    if DEBUG:
        print(df.show(SHOWLINES))
        df.printSchema()

In [9]:
def get_distance(longit_a, latit_a, longit_b, latit_b):
    """
    Reference:
        https://medium.com/@nikolasbielski/using-a-custom-udf-in-pyspark-to-compute-haversine-distances-d877b77b4b18
    Return:
        double(,3): unit in km
    """ 
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
    # Calculate Distance, unit = km
    distance = central_angle * radius
    return abs(round(distance, 3))

udf_get_distance = F.udf(get_distance)

In [10]:
def get_velocity(distance, time):
    """
    Return:
        double(,3): unit in km/h
    """ 
    return(round(distance/time*3600, 3))

udf_get_velocity = F.udf(get_velocity)

In [11]:
def get_direction(longit_a, latit_a, longit_b, latit_b):
    """
    Reference: https://gist.github.com/jeromer/2005586

    Return:
        (0,1,2,3) => "S, W, N, E"
        -1 => Stationary
    """
    diffLong = longit_b - longit_a
    
    if diffLong == 0:
        return -1
        
    x = sin(diffLong) * cos(latit_b)
    y = cos(latit_a) * sin(latit_b) - (sin(latit_a)
            * cos(latit_b) * cos(diffLong))

    initial_bearing = atan2(x, y)

    # Now we have the initial bearing but math.atan2 return values
    # from -180° to + 180° which is not what we want for a compass bearing
    # The solution is to normalize the initial bearing as shown below
    initial_bearing = degrees(initial_bearing)
#     compass_bearing = (initial_bearing + 360) % 360

#     return round(compass_bearing, 3)

    # Version 2: Return (0,1,2,3) => "S, W, N, E"
    # First make the initial bearing positive, which is from 0° to 360°
    # Then add a 45° offset to get the approx. direction
    direction = (initial_bearing + 180 + 45) % 360 // 90
    return round(direction, 0)

udf_get_direction = F.udf(get_direction)

In [12]:
def get_grid_interval(lat_max, lat_min, lon_max, lon_min, x_size, y_size):
    # Get width and length
    total_x_xian = get_distance(lon_max, lat_max, lon_min, lat_max)
    total_y_xian = get_distance(lon_max, lat_max, lon_max, lat_min)
    
    # Get number of grids
    num_grid_x = total_x_xian//grid_x_size + 1
    num_grid_y = total_y_xian//grid_y_size + 1
    
    # Get interval in coordinates
    lon_interval = round((lon_max - lon_min)/num_grid_x, 6)
    lat_interval = round((lat_max - lat_min)/num_grid_y, 6)
    
    return num_grid_x, num_grid_y, lon_interval, lat_interval

In [13]:
def get_grid_coord(x, x_min, interval):
    coord_index = max((x - x_min)//interval, 0)
    return coord_index

udf_get_grid_coord = F.udf(get_grid_coord)

## 1. Import data from csv using PySpark

Load Didi data with pre-defined schema

In [14]:
fields = [StructField('driver_id', StringType(), True), 
          StructField('order_id', StringType(), True),
         StructField('timestamp', IntegerType(), True),
         StructField('lon', DoubleType(), True),
         StructField('lat', DoubleType(), True)]
schema = StructType(fields)

In [15]:
didi_df = sqlContext.read.csv(data_file, schema=schema)

In [16]:
check_df(didi_df)

## 2. Data Processing

Convert unixtime to proper timestamp format

In [17]:
# didi_df_1 = didi_df.withColumn('timestamp', F.from_unixtime("timestamp"))

# Removed timestamp conversion
# It's easier to compute time difference using the original integer format
didi_df_1 = didi_df

In [18]:
check_df(didi_df_1)

Create index for each order_id, sorted by timestamp in descending order. <br/>
This is to identify nodes at time t and t+1

In [19]:
window = Window.partitionBy(didi_df_1['order_id']).orderBy(didi_df_1['timestamp'])

In [20]:
didi_df_2 = didi_df_1.withColumn('rank', F.dense_rank().over(window))
check_df(didi_df_2)

Create a duplicate for t+1

In [21]:
didi_df_2_post = didi_df_2.\
    withColumnRenamed("rank", "rank_new").\
    withColumnRenamed("timestamp", "timestamp_new").\
    withColumnRenamed("lon", "lon_new").\
    withColumnRenamed("lat", "lat_new")

In [22]:
didi_df_2_post = didi_df_2_post.drop('driver_id')

In [23]:
check_df(didi_df_2_post)

Join the two tables for t and t+1 on order ID

In [24]:
didi_df_3 = didi_df_2.alias('a').join(didi_df_2_post.alias('b'),on = 'order_id').where('a.rank == b.rank_new-1')
check_df(didi_df_3)

Get distance using coordinates of $node_t$ and $node_{t+1}$

In [25]:
didi_df_4 = didi_df_3.withColumn('distance', udf_get_distance(
    didi_df_3.lon, didi_df_3.lat, 
    didi_df_3.lon_new, didi_df_3.lat_new).cast(DoubleType()))

In [26]:
check_df(didi_df_4)

Get time difference between $node_t$ and $node_{t+1}$

In [27]:
# timeDiff = (F.unix_timestamp('timestamp_new', format=TIMEFORMAT)
#             - F.unix_timestamp('timestamp', format=TIMEFORMAT))
timeDiff = didi_df_4.timestamp_new - didi_df_4.timestamp
didi_df_5 = didi_df_4.withColumn("eclipse_time", timeDiff)
check_df(didi_df_5)

Get velocity between $node_t$ and $node_{t+1}$

In [28]:
didi_df_6 = didi_df_5.withColumn("velocity", udf_get_velocity(
    didi_df_5.distance, didi_df_5.eclipse_time).cast(DoubleType()))

In [29]:
check_df(didi_df_6)

Get compass bearing when travelling from $node_t$ to $node_{t+1}$

In [30]:
didi_df_7 = didi_df_6.withColumn('direction', udf_get_direction(
    didi_df_6.lon, didi_df_6.lat, 
    didi_df_6.lon_new, didi_df_6.lat_new).cast(DoubleType()))
check_df(didi_df_7)

## 3. Construct the Grid

Load the boundaries from existing file

In [31]:
boundary_df = pd.read_csv(boundary_path, skipinitialspace=True, index_col=False)
boundary_df.head()

Unnamed: 0,filename,execution_time,lat_max,lat_min,lon_max,lon_min,time_max,time_min
0,../data/xian_sample_1.csv,03/08/2019 00:13:26,34.27678,34.21653,108.98842,108.92246,2016-10-01 23:13:11,2016-10-01 07:17:26
1,../data/xian_sample_2.csv,03/08/2019 00:13:27,34.23308,34.21647,108.93969,108.9117,2016-10-02 05:52:25,2016-10-02 05:47:09
2,../data_full/xian/gps_20161002,03/08/2019 00:19:22,34.28017,34.20531,108.9986,108.91119,2016-10-03 00:01:11,2016-10-02 00:01:01
3,../data_full/xian/gps_20161001,03/08/2019 00:22:49,34.28022,34.20531,108.9986,108.91118,2016-10-02 00:01:10,2016-10-01 00:02:12


In [32]:
lat_max = boundary_df["lat_max"].max()
lat_min = boundary_df["lat_min"].min()
lon_max = boundary_df["lon_max"].max()
lon_min = boundary_df["lon_min"].min()
time_max = boundary_df["time_max"].max()
time_min = boundary_df["time_min"].min()
print(lat_max, lat_min, lon_max, lon_min, time_max, time_min)

34.28022 34.20531 108.9986 108.91118 2016-10-03 00:01:11 2016-10-01 00:02:12


In [33]:
# Convert timestamp format to unix time
time_min = datetime.strptime(time_min, "%Y-%m-%d %H:%M:%S").timestamp()
print(time_min)

1475251332.0


Construct the grids

In [34]:
num_grid_x, num_grid_y, lon_interval, lat_interval = get_grid_interval(
    lat_max, lat_min, lon_max, lon_min, grid_x_size, grid_y_size)

In [35]:
didi_df_8 = didi_df_7.withColumn('lat_min', F.lit(lat_min)).\
    withColumn('lon_min', F.lit(lon_min)).\
    withColumn('time_min', F.lit(time_min)).\
    withColumn('lat_interval', F.lit(lat_interval)).\
    withColumn('lon_interval', F.lit(lon_interval)).\
    withColumn('time_interval', F.lit(time_interval))    
check_df(didi_df_8)

In [36]:
didi_df_9 = didi_df_8.withColumn('x', udf_get_grid_coord(didi_df_8.lon, didi_df_8.lon_min, didi_df_8.lon_interval)
                             .cast(IntegerType())).\
    withColumn('y', udf_get_grid_coord(didi_df_8.lat, didi_df_8.lat_min, didi_df_8.lat_interval)
                             .cast(IntegerType())).\
    withColumn('t', udf_get_grid_coord(didi_df_8.timestamp, didi_df_8.time_min, didi_df_8.time_interval)
                             .cast(IntegerType()))
check_df(didi_df_9)

## 4. Aggregate for the metrics

Dimensions of a grid:
- x
- y
- t
- direction:
    - -1: Stationary
    - 0: S
    - 1: W
    - 2: N
    - 3: E


Metrics of a grid:
- Number of nodes/instances
- Average velocity

In [37]:
didi_aggr_1 = didi_df_9.groupBy("x", "y", "t", "direction").agg({
    "order_id": 'count', 'velocity': 'avg'
})
check_df(didi_aggr_1)

In [38]:
didi_aggr_2 = didi_aggr_1.withColumnRenamed("avg(velocity)", "avg_velocity").\
    withColumnRenamed("count(order_id)", "cnt_nodes")
check_df(didi_aggr_2)

## 5. Write to output folder

In [39]:
didi_aggr_2.show(5)

+---+---+---+---------+------------------+---------+
|  x|  y|  t|direction|      avg_velocity|cnt_nodes|
+---+---+---+---------+------------------+---------+
|  3|  7|130|     -1.0|               0.0|        1|
|  7|  1|139|      1.0|              35.6|        3|
|  1|  8| 43|      1.0|48.934714285714286|       14|
|  7|  1|139|      3.0|57.900000000000006|        6|
|  7|  1|139|      2.0|              48.6|        2|
+---+---+---+---------+------------------+---------+
only showing top 5 rows



In [40]:
didi_aggr_2.write.csv(output_file)

In [None]:
# Read back from saved files
# didi_aggr_3 = sqlContext.read.csv(output_file)