In [1]:
import findspark
findspark.init()
import pyspark
import os

sc = pyspark.SparkContext()
spark = pyspark.SQLContext(sc)

working_dir = os.getcwd()

In [2]:
# Load parquet file into dataframe
df = spark.read.parquet("file://" + working_dir + "/testDataset/part-00000-711fabb0-5efc-4d83-afad-0e03a3156794.snappy.parquet")

In [3]:
# Count lines
df.count()

600000

In [4]:
# Show first 5 lines
df.show(5)

+--------------------+--------------------+-------------------+---------+--------+---------+
|            deviceId|             eventId|          eventTime|      lat|     lon|eventType|
+--------------------+--------------------+-------------------+---------+--------+---------+
|8f88ac99-8aa5-88a...|000c5bd2-d2d5-4cf...|2017-05-24 12:34:40|47.409271|8.547062| location|
|8f88ac99-8aa5-88a...|001b3982-ec22-4bb...|2017-05-24 16:21:41|47.417977|8.554384| location|
|8f88ac99-8aa5-88a...|00287463-b763-40b...|2017-05-24 16:02:50|47.416406| 8.55298| location|
|8f88ac99-8aa5-88a...|002946bb-19b8-40a...|2017-05-24 15:03:39|47.417743|8.554184| location|
|8f88ac99-8aa5-88a...|002947fd-bb93-434...|2017-05-24 12:56:02|47.406545|8.547231| location|
+--------------------+--------------------+-------------------+---------+--------+---------+
only showing top 5 rows



In [5]:
# Remove rows with any null or NaN value
df.dropna

<bound method DataFrame.dropna of DataFrame[deviceId: string, eventId: string, eventTime: timestamp, lat: double, lon: double, eventType: string]>

In [6]:
# Create Temp View from the dataset
df.createOrReplaceTempView("dataset")

### Remove unnecessary event types
Since we are going to predict estimation based on the start position, we won't need the rest of events (location, etc). Also, we remove rows with gps non valid values (0.0).

In [7]:
# 1. Create a view with only the values we need for processing
spark.sql("SELECT deviceId, eventTime, lat, lon, eventType FROM dataset WHERE (eventType='trip-start' OR eventType='trip-end') \
AND (lat != 0.0 and lon != 0.0) ORDER BY eventTime").createOrReplaceTempView("cleands")


## Explore data transformation for a single device
The purpose is to reduce the problem so that we are sure that the transformations that we apply to the data produce the desired results. 
The aim of these transformations is to clean the dataset and take only the correctly paired start-trip / end-trip data.

In [8]:
# Show an example of data for a certain device
singleDevice = spark.sql("SELECT * FROM cleands WHERE deviceId='D08699AE-BDAC-4AB4-2F15-177C74993133'")
singleDevice.createOrReplaceTempView("singledevice")
singleDevice.show(100, False)

+------------------------------------+-------------------+---------+----------+----------+
|deviceId                            |eventTime          |lat      |lon       |eventType |
+------------------------------------+-------------------+---------+----------+----------+
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-25 23:46:05|32.988837|-97.263735|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-25 23:46:09|32.988837|-97.263735|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 00:35:30|32.98925 |-97.263821|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 00:35:39|32.989253|-97.263822|trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 02:08:40|32.841525|-97.068658|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 02:08:46|32.841525|-97.068658|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 13:57:30|32.926142|-97.087224|trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 19:17:41|32.98928 |-97.264482|trip-start|

### 1. Extract previous and next event for every row
To do this, we use LAG and LEAD window functions to shift the data. The result will add two new columns *previousEvent* and *nextEvent* to the dataset

In [9]:
# Shift values of eventType to add previous and next event
singleDevice = spark.sql("SELECT *, LAG(eventType, 1) OVER (ORDER BY eventTime) AS previousEvent, \
LEAD(eventType, 1) OVER (ORDER BY eventTime) AS nextEvent FROM singledevice")
singleDevice.createOrReplaceTempView("singledevice")
singleDevice.show(100)

+--------------------+-------------------+---------+----------+----------+-------------+----------+
|            deviceId|          eventTime|      lat|       lon| eventType|previousEvent| nextEvent|
+--------------------+-------------------+---------+----------+----------+-------------+----------+
|D08699AE-BDAC-4AB...|2017-05-25 23:46:05|32.988837|-97.263735|  trip-end|         null|  trip-end|
|D08699AE-BDAC-4AB...|2017-05-25 23:46:09|32.988837|-97.263735|  trip-end|     trip-end|  trip-end|
|D08699AE-BDAC-4AB...|2017-05-26 00:35:30| 32.98925|-97.263821|  trip-end|     trip-end|trip-start|
|D08699AE-BDAC-4AB...|2017-05-26 00:35:39|32.989253|-97.263822|trip-start|     trip-end|  trip-end|
|D08699AE-BDAC-4AB...|2017-05-26 02:08:40|32.841525|-97.068658|  trip-end|   trip-start|  trip-end|
|D08699AE-BDAC-4AB...|2017-05-26 02:08:46|32.841525|-97.068658|  trip-end|     trip-end|trip-start|
|D08699AE-BDAC-4AB...|2017-05-26 13:57:30|32.926142|-97.087224|trip-start|     trip-end|trip-start|


### 2. Remove trip-start and trip-end non valid rows
The only valid data is that which defines a valid trip, that is, a trip-start followed by a trip-end event. Those trip-start with no trip-end after it, or those trip-end without a preceeding trip-start are removed from the dataset.

In [10]:
# Remove non valid trip-start and trip-end (those without its corresponding pair)
singleDevice = spark.sql("SELECT deviceId, eventTime, lat, lon, eventType FROM singledevice \
WHERE NOT ((eventType='trip-end' AND previousEvent='trip-end') \
OR (eventType='trip-start' AND nextEvent='trip-start'))")
singleDevice.createOrReplaceTempView("singledevice")
singleDevice.show(20, False)


+------------------------------------+-------------------+---------+----------+----------+
|deviceId                            |eventTime          |lat      |lon       |eventType |
+------------------------------------+-------------------+---------+----------+----------+
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 00:35:39|32.989253|-97.263822|trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 02:08:40|32.841525|-97.068658|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 19:17:41|32.98928 |-97.264482|trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-05-26 19:19:30|32.988822|-97.263723|trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-06-01 00:11:22|32.989214|-97.26381 |trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-06-01 00:38:47|32.841445|-97.06854 |trip-end  |
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-06-01 13:44:54|32.789698|-97.133493|trip-start|
|D08699AE-BDAC-4AB4-2F15-177C74993133|2017-06-02 01:17:43|32.84152 |-97.068662|trip-end  |

### 3. Put all the data of every complete trip in the same row
We move the position and time data of every complete trip in a single row. This way we will be able to process each row as a trip, and we'll no longer need deviceId or eventType

In [11]:
# Move start and end values of a trip to a single row. We won't need deviceId any more
singleDevice = spark.sql("SELECT eventTime AS eventTimeStart, \
LEAD(eventTime, 1) OVER (ORDER BY eventTime) AS eventTimeEnd, lat AS latStart, lon AS lonStart, \
LEAD(lat, 1) OVER (ORDER BY eventTime) AS latEnd, \
LEAD(lon, 1) OVER (ORDER BY eventTime) AS lonEnd, \
eventType FROM singledevice")
singleDevice.createOrReplaceTempView("singledevice")
singleDevice = spark.sql("SELECT eventTimeStart, eventTimeEnd, latStart, lonStart, latEnd, lonEnd \
FROM singledevice WHERE eventType='trip-start'")
singleDevice.createOrReplaceTempView("singledevice")
singleDevice.show(100, False)

+-------------------+-------------------+---------+----------+---------+----------+
|eventTimeStart     |eventTimeEnd       |latStart |lonStart  |latEnd   |lonEnd    |
+-------------------+-------------------+---------+----------+---------+----------+
|2017-05-26 00:35:39|2017-05-26 02:08:40|32.989253|-97.263822|32.841525|-97.068658|
|2017-05-26 19:17:41|2017-05-26 19:19:30|32.98928 |-97.264482|32.988822|-97.263723|
|2017-06-01 00:11:22|2017-06-01 00:38:47|32.989214|-97.26381 |32.841445|-97.06854 |
|2017-06-01 13:44:54|2017-06-02 01:17:43|32.789698|-97.133493|32.84152 |-97.068662|
|2017-06-02 14:07:44|2017-06-02 14:32:29|32.841552|-97.068303|32.988817|-97.263714|
|2017-06-08 02:19:02|2017-06-08 13:53:49|32.822621|-97.058403|32.847142|-97.076774|
|2017-06-09 00:04:53|2017-06-09 00:05:51|32.841374|-97.067877|32.841491|-97.068668|
|2017-06-10 00:58:05|2017-06-10 01:23:09|32.989001|-97.263582|32.841531|-97.068663|
+-------------------+-------------------+---------+----------+---------+----

## Apply the transformations to the whole dataset

### 1. Extract previous and next event for every row
To do this, we use LAG and LEAD window functions to shift the data. The result will add two new columns *previousEvent* and *nextEvent* to the dataset

In [12]:
# Shift values of eventType to add previous and next event
allDevices = spark.sql("SELECT *, LAG(eventType, 1) OVER (PARTITION BY deviceId ORDER BY eventTime) AS previousEvent, \
LEAD(eventType, 1) OVER (PARTITION BY deviceId ORDER BY eventTime) AS nextEvent FROM cleands")
allDevices.createOrReplaceTempView("alldevices")
allDevices.show(20)

+--------------------+-------------------+----------+---------+----------+-------------+---------+
|            deviceId|          eventTime|       lat|      lon| eventType|previousEvent|nextEvent|
+--------------------+-------------------+----------+---------+----------+-------------+---------+
|11111111-1b2a-4bc...|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|         null| trip-end|
|11111111-1b2a-4bc...|2017-03-22 17:14:26|  38.76522|-9.098054|  trip-end|   trip-start| trip-end|
|11111111-1b2a-4bc...|2017-03-22 17:14:26|  38.76522|-9.098054|  trip-end|     trip-end|     null|
|11111111-2605-430...|2017-03-22 17:14:26|  38.76522|-9.098054|  trip-end|         null| trip-end|
|11111111-2605-430...|2017-03-22 17:14:26|  38.76522|-9.098054|  trip-end|     trip-end|     null|
|11111111-3306-4d3...|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|         null| trip-end|
|11111111-3306-4d3...|2017-03-22 17:14:26|  38.76522|-9.098054|  trip-end|   trip-start| trip-end|
|11111111-

### 2. Remove trip-start and trip-end non valid rows
The only valid data is that which defines a valid trip, that is, a trip-start followed by a trip-end event. Those trip-start with no trip-end after it, or those trip-end without a preceeding trip-start are removed from the dataset.

In [13]:
# Remove non valid trip-start and trip-end (those without its corresponding pair)
allDevices = spark.sql("SELECT deviceId, eventTime, lat, lon, eventType FROM alldevices \
WHERE NOT ((eventType='trip-end' AND previousEvent='trip-end') \
OR (eventType='trip-start' AND nextEvent='trip-start'))")
allDevices.createOrReplaceTempView("alldevices")
allDevices.show(20, False)

+------------------------------------+-------------------+----------+---------+----------+
|deviceId                            |eventTime          |lat       |lon      |eventType |
+------------------------------------+-------------------+----------+---------+----------+
|11111111-1b2a-4bc0-9818-2b62e59a1a4a|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|
|11111111-1b2a-4bc0-9818-2b62e59a1a4a|2017-03-22 17:14:26|38.76522  |-9.098054|trip-end  |
|11111111-3306-4d31-b992-cd8b79d45e58|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|
|11111111-3306-4d31-b992-cd8b79d45e58|2017-03-22 17:14:26|38.76522  |-9.098054|trip-end  |
|11111111-74d8-4eb3-ae71-693eb56cc990|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|
|11111111-74d8-4eb3-ae71-693eb56cc990|2017-03-22 17:14:26|38.76522  |-9.098054|trip-end  |
|11111111-a924-49c8-84b4-60b0c2e7b5d3|2017-03-22 16:50:25|38.7735368|-9.168737|trip-start|
|11111111-a924-49c8-84b4-60b0c2e7b5d3|2017-03-22 17:14:26|38.76522  |-9.098054|trip-end  |

### 3. Put all the data of every complete trip in the same row
We move the position and time data of every complete trip in a single row. This way we will be able to process each row as a trip, and we'll no longer need deviceId or eventType

In [14]:
# Move start and end values of a trip to a single row. We won't need deviceId any more
allDevices = spark.sql("SELECT eventTime AS eventTimeStart, \
LEAD(eventTime, 1) OVER (PARTITION BY deviceId ORDER BY eventTime) AS eventTimeEnd, lat AS latStart, lon AS lonStart, \
LEAD(lat, 1) OVER (PARTITION BY deviceId ORDER BY eventTime) AS latEnd, \
LEAD(lon, 1) OVER (PARTITION BY deviceId ORDER BY eventTime) AS lonEnd, \
eventType FROM alldevices")
allDevices.createOrReplaceTempView("alldevices")
allDevices = spark.sql("SELECT eventTimeStart, eventTimeEnd, latStart, lonStart, latEnd, lonEnd \
FROM alldevices WHERE eventType='trip-start'")
allDevices.createOrReplaceTempView("alldevices")
allDevices.show(20, False)

+-------------------+-------------------+----------+---------+--------+---------+
|eventTimeStart     |eventTimeEnd       |latStart  |lonStart |latEnd  |lonEnd   |
+-------------------+-------------------+----------+---------+--------+---------+
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:50:25|2017-03-22 17:14:26|38.7735368|-9.168737|38.76522|-9.098054|
|2017-03-22 16:5

In [16]:
allDevices.count()

3154

### Remove duplicates
After examining the resulting data, we observe that there many registers with exactly the same data for the time start and de starting and end position, so we remove the redundant data to avoid bias on the predicting model

In [17]:
allDevices = allDevices.dropDuplicates(['eventTimeStart', 'latStart', 'lonStart', 'latEnd', 'lonEnd'])
allDevices.show(20, False)

+-------------------+-------------------+---------+----------+---------+----------+
|eventTimeStart     |eventTimeEnd       |latStart |lonStart  |latEnd   |lonEnd    |
+-------------------+-------------------+---------+----------+---------+----------+
|2017-05-24 12:56:34|2017-05-24 12:56:37|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 15:43:57|2017-05-24 15:44:00|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 15:57:30|2017-05-24 15:57:30|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 15:59:53|2017-05-24 15:59:55|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 16:19:29|2017-05-24 16:19:31|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 16:26:59|2017-05-24 16:27:00|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 12:30:43|2017-05-24 12:30:48|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 12:33:54|2017-05-24 12:33:54|47.409291|8.546942  |47.423743|8.555213  |
|2017-05-24 13:02:34|2017-05-24 13:02:36|47.409291|8.546942  |47.423743|8.55

In [18]:
allDevices.count()

1537

In [None]:
import pandas as pd

In [None]:
dfPanda = singleDevice.toPandas()

In [None]:
dfPanda