# Group project : Dask

#### Bilal Kostet, Antoine Somerhausen, Pierre Hosselet, Pacome Van Overschelde, Romain Vandepopeliere - Group 3

## Index

0. [Starting Dask](#Starting-PySpark)
0. [Loading and preparing data : adding the zone column](#Loading-and-preparing-data)
0. [Question 1](#Question-1)
0. [Question 2](#Question-2)
    * [Average speed, travel time and travel distance by zone](#2.1-Dynamics)
    * [Average visited zones and exchanges](#2.2-Exchanges)
0. [Computation of the delayed quantities](#Computation-of-the-delayed-quantities)
0. [Comment on the warnings obtained](#Comment-on-the-warnings-obtained)

## Starting Dask

In [56]:
#We start by loading the Dask client, that allows to connect to a distributed cluster. 
#In our case, since we use a single machine, it is useful to visualize the progress of the tasks.
from dask.distributed import Client
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59226 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:59226/status,

0,1
Dashboard: http://127.0.0.1:59226/status,Workers: 4
Total threads: 12,Total memory: 15.75 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:59227,Workers: 4
Dashboard: http://127.0.0.1:59226/status,Total threads: 12
Started: Just now,Total memory: 15.75 GiB

0,1
Comm: tcp://127.0.0.1:59258,Total threads: 3
Dashboard: http://127.0.0.1:59259/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59231,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-ic2fbwhr,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-ic2fbwhr

0,1
Comm: tcp://127.0.0.1:59267,Total threads: 3
Dashboard: http://127.0.0.1:59268/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59230,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-jw9_id2m,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-jw9_id2m

0,1
Comm: tcp://127.0.0.1:59264,Total threads: 3
Dashboard: http://127.0.0.1:59265/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59233,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-nmf9p97x,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-nmf9p97x

0,1
Comm: tcp://127.0.0.1:59261,Total threads: 3
Dashboard: http://127.0.0.1:59262/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59232,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-5ghniqj3,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-5ghniqj3


## Loading and preparing data

Let us import the packages and the functions that we'll need throughout this work

In [57]:
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from shapely.geometry import Point, Polygon
import json
import time

Dask can read .csv files and put them in a dask dataframe

In [58]:
%%time
gps=dd.read_csv('ProjectData/drivers.csv', dtype={'latitude' : 'float64', 'longitude' : 'float64', 'driver' : 'string'},parse_dates=['timestamp'])

Wall time: 7.98 ms


We also have to create the zones dataframe, based on the JSON file. This file is really small, and will be used in one function only. Dask advises that in these cases, it is not that useful and Pandas should be used instead. We will proceed to use dask anyway for the sake of scalability, as we may want to apply this analysis to a bigger region.

For this, we will need a custom function to convert the contents of the nested JSON file to something that can be easily interepreted into a dask dataframe.

In [59]:
%%time
def polyzone(dbag):
    polygonpts=[]
    for i in range(len(dbag['polygon'])):
        polygonpts.append((float(dbag['polygon'][i]['lat']),float(dbag['polygon'][i]['lng'])))
    return {
        'id_zone' : dbag['id_zone'],
        'polygonpts' : polygonpts
    }


Wall time: 0 ns


We apply this custom function to a string corresponding to the contents of our JSON file. This allows to obtain a dataframe with our zones, with the id of the zone as index for the sake of optimization.

In [60]:
%%time
zones=db.read_text('ProjectData/zones.json').map(json.loads)
zones=zones.pluck('zones').flatten().map(polyzone)
dzones=zones.to_dataframe().set_index('id_zone',sorted=True)

Wall time: 427 ms


We create a list with all the Polygon structures defining the zones that will be used as an iterator to check in which zone the data points belong.

In [61]:
%%time
polygonlist=[]
for i,j in dzones.iterrows():
    polygonlist.append(Polygon(j.values.tolist()[0]))

Wall time: 387 ms


We apply our custom function to generate the new column 'zone' by returning the zone in which a coordinate point is thanks to the fact that the zones are just indexed from 1 to 51.

In [62]:
%%time
def findzone(row): 
    point=Point(row.latitude,row.longitude)
    for i in range(len(polygonlist)):
        polygon=Polygon(polygonlist[i])
        if polygon.contains(point):
            return i+1
    return 0

gps['zone'] = gps.apply(findzone, axis=1).astype('int32')

Wall time: 20.9 ms


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=(None, 'int64'))



## Question 1

The goal of this question is to know what are the ten zones that are visited by the most drivers. The strategy is simply to exhibit the different pairs (driver, zone) that are in the dataframe, dropping all the duplicates of this pair of values and then count the number of appearance of each zone identifier. We chose to not exclude points not belonging to any zone, here, to have an idea of how much GPS jitter there was, indicated by zone 0.

In [63]:
%%time
gps_mostvisited=gps[['driver','zone']].drop_duplicates().zone.value_counts().nlargest(11)

Wall time: 4.99 ms


## Question 2

In this question, we want to adress some other features of the dataset. 

Firstly, we would like to know what are the average speed, the average travel time and the average travel distance for the drivers in a given zone. For this purpose, we will need to compute some differences between data of distinct rows. In Dask, we can use the dataframe method .diff() to compute these. There are also products between values of distinct rows. This can be accessed by the .shift(period=) method on series that takes a series, and returns its shift by the amount of rows indicated in the key period.

Secondly, what about the exchanges between the zones? 

### 2.1 Dynamics

Let us first clean the dataframe of the drivers belonging to invalid zones:

In [64]:
%%time
gpsclean=gps[gps.zone > 0]

Wall time: 998 µs


We now define the function that will be applied to our subdataframe originating from grouping on driver and zone values.
This function will compute the total time and distance travelled by each driver in each zone, as well as their mean speed.

By inspecting the results, one realizes that some GPS recordings are outliers. Indeed, an .orderBy('speed') in decreasing order tells us that some instantaneous speeds are around 1500 m/s, and a non-negligible number of other speeds are impossible to reach by car. This feature is not a computation mistake nor a code mistake, it really belongs to the dataset : these are just bugs of the GPS. In order to obtain some average speeds which are realistic, we put a cut-off and we will throw away all the datapoints that have a speed bigger than 200 km/h.

Also, we will also choose a cut-off for the delay column. Sometimes there is a huge time gap ($\sim$ hours) between two consecutive timestamps for the same driver. We assume this is not due to his travel and that the speed computed during this big time gap is not a speed of travel: for example, the driver simply stopped the recording and restarted it somewhere else, hours later. Hence, one should get rid of this step. We choose 20 min (=1200s) as an upper bound for the difference between two consecutive timestamps, in order to be sure that they are kinematically meaningful.

Note that we chose to apply this filtering in this dynamical section only, the other sections are processed with the whole dataset (except that we dropped out the datapoints that are not in the zones).


In [65]:
%%time
def speed_compute(x):
    x=x.sort_values('timestamp')
    x['delay'] = x.timestamp.diff().dt.total_seconds()
    x['distances'] = da.arcsin(da.sqrt(da.sin(da.radians(x.longitude.diff().mul(0.5))).pow(2).mul(da.cos(da.radians(x.latitude))).mul(da.cos(da.radians(x.latitude.shift(periods=1)))).add(da.sin(da.radians(x.latitude.diff().mul(0.5))).pow(2)))).mul(2*6371)
    x['speeds'] = x.distances.div(x.delay).mul(3600)
    x=x[~(da.isinf(x.speeds) | (x.speeds > 200) | x.delay > 1200)]  
    x['total_time'] = x['delay'].sum()
    x['total_time'].iloc[1:] = np.nan
    x['total_distance'] = x['distances'].sum()
    x['total_distance'].iloc[1:] = np.nan
    x['meanspeed'] = x['total_distance'].div(x['total_time']).mul(3600)
    x=x.drop(columns=['driver','latitude','longitude','distances','delay','timestamp','speeds']).rename(columns={'zone':'dzone'})
    return x

Wall time: 0 ns


We can apply this function to the grouped data, and group again these results for each zone, taking the average on every separate driver:

In [66]:
gps_speeds=gpsclean.groupby(['driver','zone']).apply(speed_compute).groupby('dzone').mean().sort_values('meanspeed',ascending=False)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  gps_speeds=gpsclean.groupby(['driver','zone']).apply(speed_compute).groupby('dzone').mean().sort_values('meanspeed',ascending=False)


### 2.2 Exchanges

Here we would like to have a better understanding of how the zones are connected. For instance, let us take the $i^{\textrm{th}}$ zone. We could ask ourselves: in average, how many different zones are visited by the drivers which pass through zone $i$? The second question that we want to adress is closely related but take the back and forth between zones into account. Now the question would be: in average, how many zone exchanges are made by the drivers which pass through zone $i$? 

Let us start with the amount of *visited zones* related to a given zone: 

In [67]:
%%time
def nb_zone(x):
    x['count'] = len(x)
    return x
gps_meanvisited = gpsclean[['driver','zone']].drop_duplicates().groupby('driver').apply(nb_zone).groupby('zone').mean().sort_values('count', ascending=False)

Wall time: 24.9 ms


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


Now, the *exchanges* between a given zone and the other zones. Here, we have to keep track of people coming back in the same zones. We will use the diff() as for the speeds, in order to spot the rows in which there is a change of zone.

In [68]:
%%time
def nb_zone2(x):
    x = x.sort_values('timestamp', ascending=False)
    x['count'] = x.zone.diff().ne(0).cumsum().max()
    return x
gps_meanexchange = gpsclean[['driver','zone','timestamp']].groupby('driver').apply(nb_zone2)
gps_meanexchange = gps_meanexchange.drop_duplicates(subset=['driver','zone']).groupby('zone').mean().sort_values('count', ascending=False)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


Wall time: 52.8 ms


## Computation of the delayed quantities

In [69]:
%%time
(gps_mostvisited_c, gps_speeds_c, gps_meanvisited_c, gps_meanexchange_c) = dask.compute(gps_mostvisited,gps_speeds,gps_meanvisited,gps_meanexchange)
display(gps_mostvisited_c, gps_speeds_c, gps_meanvisited_c, gps_meanexchange_c)

22    10823
21     8039
14     4150
25     3638
13     3178
20     1591
23     1160
24     1064
26      747
0       547
15      512
Name: zone, dtype: int64

Unnamed: 0_level_0,total_time,total_distance,meanspeed
dzone,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2,206.0,2.943416,68.73452
11,1751.166667,8.410023,59.956685
8,2268.333333,8.147536,30.439522
30,1020.5,3.396694,23.760043
27,2022.272727,5.830054,23.199141
29,792.115385,2.798897,20.81895
20,2146.177876,4.317292,19.731458
15,1447.841797,3.550905,19.554468
40,754.352941,3.852713,18.390652
28,1909.116667,4.832468,17.858329


Unnamed: 0_level_0,count
zone,Unnamed: 1_level_1
12,6.25
8,6.166667
35,5.3
49,5.25
11,5.0
40,4.705882
30,4.333333
2,4.333333
29,4.288462
16,4.251366


Unnamed: 0_level_0,count
zone,Unnamed: 1_level_1
12,9.75
8,9.5
11,8.333333
30,7.666667
13,7.126809
15,6.933594
35,6.8
49,6.75
17,6.649123
16,6.546448


Wall time: 11min 10s


## Cleaning up

In [70]:
client.restart()



0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:59226/status,

0,1
Dashboard: http://127.0.0.1:59226/status,Workers: 4
Total threads: 12,Total memory: 15.75 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:59227,Workers: 4
Dashboard: http://127.0.0.1:59226/status,Total threads: 12
Started: 11 minutes ago,Total memory: 15.75 GiB

0,1
Comm: tcp://127.0.0.1:59362,Total threads: 3
Dashboard: http://127.0.0.1:59363/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59231,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-m8hlv85r,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-m8hlv85r

0,1
Comm: tcp://127.0.0.1:59372,Total threads: 3
Dashboard: http://127.0.0.1:59373/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59230,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-gyfzt09m,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-gyfzt09m

0,1
Comm: tcp://127.0.0.1:59359,Total threads: 3
Dashboard: http://127.0.0.1:59360/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59233,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-sob5g77i,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-sob5g77i

0,1
Comm: tcp://127.0.0.1:59369,Total threads: 3
Dashboard: http://127.0.0.1:59370/status,Memory: 3.94 GiB
Nanny: tcp://127.0.0.1:59232,
Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-w488wmck,Local directory: C:\Users\Bilal\AppData\Local\Temp\dask-worker-space\worker-w488wmck


## Comment on the warnings obtained

There are a few warnings left in this notebook. Namely, "UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected." regarding the .apply() method on groupby()." complains about the lack of explicit definition of the types used in the columns on which apply() is used. However, this is not an issue in this case as the type of data we have is not misleading, and will be easily inferred.