### This project parses 3 TB nested Json file into csv using pyspark along with sparksql for further analysis

# Attention !! All the data used is test data for only partial data from Dec 30,2015. For complete version, please check other files

# Read Json, parse

## Import dependencies

In [291]:
import pyspark
from pyspark.sql import SQLContext
from scipy.interpolate import interp1d # import dependency

In [290]:
sc
sqlContext = SQLContext(sc)

## Read JSON File

In [None]:
bus_file='test.jsons'
bus = sqlContext.read.json(bus_file)
bus.registerTempTable("bus")

## Show Schema

In [None]:
bus.printSchema()

## load and apply SQL Query

In [None]:
with open("spark_extract.sql") as fr:
     query = fr.read()
output = sqlContext.sql(query)

## Flatten the list

### Method A

In [None]:
import itertools
def extract(parts):
    for p in parts:
        for o in itertools.izip(p.Line,p.Latitude,p.Longitude,p.RecordedAtTime,p.vehicleID,p.Trip,p.TripDate):
            yield o

### Method B

In [None]:
def parse_list(p):
    if p.ROUTE_ID!=None:
        return zip(p.ROUTE_ID,p.latitude,p.longitude,p.recorded_time\
                   ,p.vehicle_id,p.TRIP_ID,p.tripdate,p.SHAPE_ID\
                   ,p.STOP_ID,p.distance_stop,p.distance_shape,p.status,p.destination)
    else:
        return []

## Tranfer time to Unix time for interpolatation

In [None]:
import time
import dateutil.parser
def unix_time(x):
    dt = dateutil.parser.parse(x)
    return time.mktime(dt.timetuple())

## Interpolate function

## Method A

In [None]:
def findIncreasingList(parts):
    prev = 0
    for record in parts:
        if record[-1]<prev:
            return
        prev = record[-1]
        yield record

In [None]:
from scipy.interpolate import interp1d # import dependency
def predict(x):
    pre_x = [p[-1] for p in x if p[-1]!=None]
    if len(pre_x) >= 2:
        pre_y = [unix_time(p[1]) for p in x if p[-1]!=None]
        f = interp1d(pre_x, pre_y)
    else:
        return []
    return findIncreasingList([(p[0],p[2],p[3],f(p[-1]+p[-2]))\
                               for p in x if p[-1]!=None and (p[-1]+p[-2]) <= pre_x[-1]])

## method b

In [None]:
from scipy.interpolate import interp1d
def predict_map(x):
    train_y = [unix_time(p[3]) for p in x if p[-3]!=None ]
    if len(train_y) >= 2:
        train_x = [p[-3] for p in x if p[-3]!=None]
        f = interp1d(train_x, train_y)
        distance = [(p[-3]+p[-4]) for p in x \
                    if p[-3]!=None and (p[-3]+p[-4]) <= train_x[-1]]
        stoptimes = f(distance)
        stops = [p[-5] for p in x if p[-3]!=None]
    else:
        return[]
    return map(lambda a,b: (a,b), stops,stoptimes)
    #return [(p[-4],f(p[-2]+p[-3])) for p in x if (p[-2]!=None and p[-3]!=None) and (p[-2]+p[-3]) <= pre_x[-1]] 

## Groupby Date and Line & Apply Interpolation

## Simple Extraction

In [None]:
output.flatMap(parse_list)\
      .take(2)

## Group By TRIP_ID and Date

In [None]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),x)).groupByKey()\
      .map(lambda x: x[1])\
      .take(10)

## Groupbykey and Apply Interpolation

In [None]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),(x[0],x[3],x[5],x[8],x[-4],x[-3])))\
      .groupByKey()\
      .flatMap(lambda x: predict(x[1]))\
      .take(10)

## Remove the prefix, timezones and save as CSV

In [None]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),(x[0],)).groupByKey() \
      .flatMap(lambda x: predict(x[1]))\
      .map(lambda x: ",".join(map(str, x)))\
      .map(lambda x: x.replace('MTA NYCT_', '').replace('MTABC_','').replace('MTA_','').replace('-05:00',''))\
      .saveAsTextFile('stoptimes')

# Read From CSV and SQL Manupilation

## Import dependencies

In [5]:
from pyspark.sql.types import *

## Reset Schemas and Indexing

In [None]:
customSchema = StructType([StructField("ROUTE_ID", StringType(), True),\
                           StructField("latitude", DoubleType(), True),\
                           StructField("longitude", DoubleType(), True),\
                           StructField("recorded_time", StringType(), True),\
                           StructField("vehicle_id", StringType(), True),\
                           StructField("TRIP_ID", StringType(), True),\
                           StructField("tripdate", DateType(), True),\
                           StructField("SHAPE_ID", StringType(), True),\
                           StructField("STOP_ID", StringType(), True),\
                           StructField("distance_stop", StringType(), True),\
                           StructField("distance_shape", StringType(), True),\
                           StructField("status", StringType(), True),\
                           StructField("destination", StringType(), True)])             

In [20]:
stop_times_schema = StructType([StructField("trip_id", StringType(), True),\
                           StructField("arrival_time", StringType(), True),\
                           StructField("departure_time", StringType(), True),\
                           StructField("stop_id", StringType(), True),\
                           StructField("stop_sequence", StringType(), True),\
                           StructField("pickup_type", StringType(), True),
                           StructField("drop_off_type", StringType(), True)])

In [21]:
real_stoptimes_schema = StructType([StructField("ROUTE_ID", StringType(), True),\
                           StructField("TRIP_ID", StringType(), True),\
                           StructField("STOP_ID", StringType(), True),\
                           StructField("time",IntegerType(), True)])

## Use CSV=>DF tool to read saved csv

In [26]:
real_stoptimes = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('stops.csv', schema = real_stoptimes_schema)

In [27]:
stoptimes = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('stop_times.txt',schema = stop_times_schema)

In [28]:
stoptimes.take(10)

[Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:20:00', departure_time=u'01:20:00', stop_id=u'901471', stop_sequence=u'1', pickup_type=u'0', drop_off_type=u'0'),
 Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:21:28', departure_time=u'01:21:28', stop_id=u'301386', stop_sequence=u'2', pickup_type=u'0', drop_off_type=u'0'),
 Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:22:00', departure_time=u'01:22:00', stop_id=u'301387', stop_sequence=u'3', pickup_type=u'0', drop_off_type=u'0'),
 Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:22:55', departure_time=u'01:22:55', stop_id=u'301388', stop_sequence=u'4', pickup_type=u'0', drop_off_type=u'0'),
 Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:23:13', departure_time=u'01:23:13', stop_id=u'301389', stop_sequence=u'5', pickup_type=u'0', drop_off_type=u'0'),
 Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:23:53', departure_time=u'01:23:53', stop_id=u'301390', sto

In [98]:
from pyspark.sql.functions import split

In [33]:
new_time = real_stoptimes.withColumn('realtime',split(pyspark.sql.functions.from_unixtime(real_stoptimes.time), ' ')[1])\
                         .withColumn('date',split(pyspark.sql.functions.from_unixtime(real_stoptimes.time), ' ')[0])

In [82]:
stoptimes.take(1)

[Row(trip_id=u'EN_U5-Weekday-008000_B12_1', arrival_time=u'01:20:00', departure_time=u'01:20:00', stop_id=u'901471', stop_sequence=u'1', pickup_type=u'0', drop_off_type=u'0')]

In [34]:
new_time.show(2)

+--------+--------------------+-------+----------+--------+----------+
|ROUTE_ID|             TRIP_ID|STOP_ID|      time|realtime|      date|
+--------+--------------------+-------+----------+--------+----------+
|     Q46|QV_W5-Weekday-110...| 502327|1451520016|19:00:16|2015-12-30|
|     Q46|QV_W5-Weekday-110...| 502331|1451520119|19:01:59|2015-12-30|
+--------+--------------------+-------+----------+--------+----------+
only showing top 2 rows



In [118]:
new_stoptimes = stoptimes.withColumn('route_id', split(stoptimes.trip_id,'_')[2])

In [133]:
new_stoptimes.arrival_time.

+--------------------+------------+--------------+-------+-------------+-----------+-------------+--------+
|             trip_id|arrival_time|departure_time|stop_id|stop_sequence|pickup_type|drop_off_type|route_id|
+--------------------+------------+--------------+-------+-------------+-----------+-------------+--------+
|EN_U5-Weekday-008...|    01:20:00|      01:20:00| 901471|            1|          0|            0|     B12|
+--------------------+------------+--------------+-------+-------------+-----------+-------------+--------+
only showing top 1 row



In [35]:
new_time.registerTempTable('new_time')

In [132]:
new_time.show(1)

+--------+--------------------+-------+----------+--------+----------+
|ROUTE_ID|             TRIP_ID|STOP_ID|      time|realtime|      date|
+--------+--------------------+-------+----------+--------+----------+
|     Q46|QV_W5-Weekday-110...| 502327|1451520016|19:00:16|2015-12-30|
+--------+--------------------+-------+----------+--------+----------+
only showing top 1 row



In [120]:
new_stoptimes.registerTempTable('stoptimes')

In [121]:
from pyspark.sql.functions import udf
def get_sec(s):
    l = s.split(':')
    return int(l[0]) * 3600 + int(l[1]) * 60 + int(l[2])
sqlContext.registerFunction("getsec", lambda x: get_sec(x), IntegerType())

## Apply getsec function to sec and calculate the delays

In [294]:
import pyspark.sql.functions as func

In [249]:
a = [1,3,4,5]
list(enumerate(a))

[(0, 1), (1, 3), (2, 4), (3, 5)]

In [295]:
from pyspark.sql.window import Window

In [254]:
def headway(a):
    ini = 0
    for index,i in enumerate(a):
            if abs(i- a[index-1]) > 180:
                ini+=1
    return ini
sqlContext.registerFunction("headway", lambda x: headway(x), IntegerType())

In [None]:
def min_sum(a):
    for i in enumerate(a):
        

In [281]:
join = sqlContext.sql('SELECT * \
                       FROM new_time\
                       INNER JOIN stoptimes\
                       ON TRIP_ID = trip_id AND STOP_ID = stop_id')

In [None]:
join.registerTempTable('new_join')

## Calculate the performance on 3 ways:

1. On Time performance: if the bus arrives 1min ahead of the schedule or 5mins after the schedule. It is ontime
2. Peakhour wait assesment:if the bus arrives 3min ahead or 3min after the scheduled time on 6-9 or 16-19. It is ontime
3. off-peak hour wait assesement: if the bus arrives within 5mins of the schedule except peak hours. It is ontime

In [None]:
new = sqlContext.sql('SELECT ROUTE_ID,STOP_ID, date,\
                      COUNT(IF((delay BETWEEN -60 AND 300),1,null))/COUNT(delay) as ontime_ratio,\
                      COUNT(IF((HOUR(realtime) BETWEEN 6 AND 9) OR (HOUR(realtime) BETWEEN 16 AND 19) AND (delay BETWEEN -300 AND 300),1,null))/COUNT(IF((HOUR(realtime) BETWEEN 6 AND 9) OR (HOUR(realtime) BETWEEN 16 AND 19),1,null)) as peak_wait,\
                      COUNT(IF((HOUR(realtime) NOT BETWEEN 6 AND 9) OR (HOUR(realtime) NOT BETWEEN 16 AND 19) AND (delay BETWEEN -300 AND 300),1,null))/COUNT(IF((HOUR(realtime) NOT BETWEEN 6 AND 9) OR (HOUR(realtime) NOT BETWEEN 16 AND 19),1,null)) as peak_wait\
                      FROM new_join\
                      GROUP BY ROUTE_ID, STOP_ID, date').show()

## Time Tansfer to UnixTimeStamp

## Calculate the trips of each line of everyday to test the data intergrety

In [None]:
gaps = sqlContext.sql('SELECT Route_Id, tripdate, count(recorded_time) AS trips\
                       FROM record\
                       GROUP BY Route_Id, tripdate\
                       ORDER BY tripdate DESC') #apply sql Query