-sandbox
<h1 style="color:blue">Structured Streaming using Python for capturing NYC taxis data</h1>

 I'd like to compute real-time metrics like peak time of taxi pickups and drop offs, average ride time, most popular boroughs defined by number of taxi orders in NYC, etc.

-sandbox
<h2 style="color:blue">Sample Data</h2>

I decided to use some sample taxi data as csv files in /FileStore/tables/streaming/ which I am using to build this application. Let's see how the current folder looks like:

In [3]:
%fs ls "/FileStore/tables/streaming/"

There are 10 CSV files from January 2018 in the directory plus an additional NYC Taxi Zones mapping csv file. Each January file contains 5000 rows.
Let's see what each CSV file contains by selecting the first one, they all have the same structure.

In [5]:
%fs head "/FileStore/tables/streaming/fhv_tripdata_2018_01_1-b3a55.csv"

Let's have a look at the zones translation file as well.

In [7]:
%fs head "/FileStore/tables/streaming/Lookup/NYC_Taxi_zones.csv"


Each line in the file contains record with 7 fields, out of which only 4 are used for this analysis - `pickup datetime`, `dropoff_time`, `PUlocationID`, `DOlocationID`.

-sandbox
<h1 style="color:blue">Batch/Interactive Processing</h1>

Let's investigate the static data first by creating a dataframe on the files and call it staticInputDF

In [10]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

inputPath = "/FileStore/tables/streaming/"

staticDataFrame = spark.read.format("csv")\
   .option("header", "true")\
   .option("inferSchema","true") \
   .load(inputPath) \
   .select("Pickup_DateTime", "DropOff_datetime", "PUlocationID", "DOlocationID")

OrderSchema = staticDataFrame.schema 
display(staticDataFrame)

Pickup_DateTime,DropOff_datetime,PUlocationID,DOlocationID
2018-01-30T21:20:14.000+0000,2018-01-30T21:31:20.000+0000,41.0,116
2018-01-30T21:30:35.000+0000,2018-01-30T21:37:53.000+0000,97.0,181
2018-01-30T21:51:29.000+0000,2018-01-30T22:14:31.000+0000,181.0,36
2018-01-30T21:53:33.000+0000,2018-01-30T22:06:41.000+0000,97.0,37
2018-01-30T21:42:22.000+0000,2018-01-30T22:04:30.000+0000,144.0,143
2018-01-30T21:06:23.000+0000,2018-01-30T21:16:47.000+0000,237.0,141
2018-01-30T21:33:08.000+0000,2018-01-30T21:39:48.000+0000,161.0,48
2018-01-30T21:13:54.000+0000,2018-01-30T21:56:48.000+0000,79.0,265
2018-01-30T21:05:59.000+0000,2018-01-30T21:21:44.000+0000,64.0,15
2018-01-30T21:32:14.000+0000,2018-01-30T21:42:10.000+0000,15.0,171


Now Let's load the mapping table for NYC zone and boroughs names which are used below

In [12]:
LocationPath = "/FileStore/tables/streaming/Lookup/NYC_Taxi_zones.csv"
df_Location = spark.read.option("header", "true").csv(LocationPath)

display(df_Location)

LocationID,zone,borough
1,Newark Airport,EWR
3,Allerton/Pelham Gardens,Bronx
4,Alphabet City,Manhattan
5,Arden Heights,Staten Island
6,Arrochar/Fort Wadsworth,Staten Island
7,Astoria,Queens
8,Astoria Park,Queens
9,Auburndale,Queens
24,Bloomingdale,Manhattan
10,Baisley Park,Queens


For the purposes of the analysis we need to combine Pickup_datetime and Dropoff_Datetime in one column - called ServiceTime and adding a new hardcoded column for ServiceType. 
We also need to join our static dataframe with the location mapping table to retrieve the borough and zone names.

In [14]:
df_Pickup = staticDataFrame.select(col("Pickup_Datetime").alias("ServiceTime"), col("PUlocationID").alias("Location")).withColumn("ServiceType",lit("Pickup"))
df_Dropoff = staticDataFrame.select(col("Dropoff_Datetime").alias("ServiceTime"), col("DOlocationID").alias("Location")).withColumn("ServiceType",lit("DropOff"))

df_final = df_Pickup.union(df_Dropoff)
df_final = df_final.join(df_Location,df_final.Location == df_Location.LocationID, 'left' ).withColumn("ServiceHour", hour("ServiceTime"))

df_final.createOrReplaceTempView("taxi_data")
staticSchema= df_final.schema

display(df_final)

ServiceTime,Location,ServiceType,LocationID,zone,borough,ServiceHour
2018-01-30T21:20:14.000+0000,41.0,Pickup,41.0,Central Harlem,Manhattan,21
2018-01-30T21:30:35.000+0000,97.0,Pickup,97.0,Fort Greene,Brooklyn,21
2018-01-30T21:51:29.000+0000,181.0,Pickup,181.0,Park Slope,Brooklyn,21
2018-01-30T21:53:33.000+0000,97.0,Pickup,97.0,Fort Greene,Brooklyn,21
2018-01-30T21:42:22.000+0000,144.0,Pickup,144.0,Little Italy/NoLiTa,Manhattan,21
2018-01-30T21:06:23.000+0000,237.0,Pickup,237.0,Upper East Side South,Manhattan,21
2018-01-30T21:33:08.000+0000,161.0,Pickup,161.0,Midtown Center,Manhattan,21
2018-01-30T21:13:54.000+0000,79.0,Pickup,79.0,East Village,Manhattan,21
2018-01-30T21:05:59.000+0000,64.0,Pickup,,,,21
2018-01-30T21:32:14.000+0000,15.0,Pickup,15.0,Bay Terrace/Fort Totten,Queens,21


d Now we can compute the number of orders per 
 borough, Service hour & day with one hour windows. To do this, we will group by the
 `ServiceType`, `ServiceHour`, `borough` columns and 1 hour windows over the `Servicetime` column.

In [16]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
    df_final\
      .selectExpr( "Borough",
                   "ServiceType",
                   "ServiceHour",
                   "ServiceTime").withColumn("Service_Day",  date_format(col("ServiceTime"),'EEEE'))
        .groupBy(
                     col("Service_Day"),
                     col("ServiceType"),
                     col("ServiceHour"),
                     col("Borough"), 
                     window("ServiceTime", "1 hour")) 
                .count()
            )
 
staticCountsDF.cache()

# Register the DataFrame as view 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

Let's check how our static data look like after the grouping - you can notice the one hour window column.

In [18]:
display(staticCountsDF)

Now we can directly use SQL to query the table. For example, here are we show a timeline of windowed counts separated by Service type.

In [20]:
%sql
select ServiceType,Servicehour, sum(count) as total_count from static_counts group by ServiceType,Servicehour order by Servicehour, ServiceType

ServiceType,Servicehour,total_count
DropOff,0,4
Pickup,0,4
DropOff,1,8
Pickup,1,6
DropOff,2,4
Pickup,2,5
DropOff,3,7
Pickup,3,6
DropOff,4,9
Pickup,4,10


How about checking the count of pick up orders made in different boroughs per hour

In [22]:
%sql
select Borough,Service_Day, sum(count) as total_count from static_counts where Servicetype = 'Pickup' group by Service_Day, Borough

Borough,Service_Day,total_count
Manhattan,Wednesday,14700
,Wednesday,7299
Manhattan,Thursday,245
,Tuesday,1299
Manhattan,Sunday,180
Brooklyn,Sunday,120
Queens,Sunday,100
Bronx,Sunday,55
Staten Island,Tuesday,32
,Friday,1


Note the most popular pick up locations are Manhattan & Brooklyn and the peak pick up hours are 4 and 5pm

-sandbox
<h1 style="color:blue">Stream Processing </h1>

Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, new files will be added to the previous dataframe. The query we have to write is pretty much the same as the interactive query above.

In [25]:
from pyspark.sql.types import *
from pyspark.sql.functions import *  

spark.conf.set("spark.sql.shuffle.partitions", "2") 


streamingDataframe = (
  spark
    .readStream
    .schema(OrderSchema)
    .option("maxFilesPerTrigger", 1) \
    .format("csv")\
    .option("header", "true")\
    .load(inputPath)
)

df_StreamingPickup = streamingDataframe.select(col("Pickup_Datetime").alias("ServiceTime"), col("PUlocationID").alias("Location")).withColumn("ServiceType",lit("Pickup"))
df_StreamingDropoff = streamingDataframe.select(col("Dropoff_Datetime").alias("ServiceTime"), col("DOlocationID").alias("Location")).withColumn("ServiceType",lit("DropOff"))
df_Streamingfinal = df_StreamingPickup.union(df_StreamingDropoff)

# Join Streaming data with location mapping table to retrieve the relevant NYC zone names
df_Streamingfinal = df_Streamingfinal.join(df_Location,df_Streamingfinal.Location == df_Location.LocationID, 'left' ).withColumn("ServiceHour", hour("ServiceTime"))

# # # Same query as staticInputDF
# Same query as staticInputDF
OrderByBoroughPerDayAndServiceType = (                 
  df_Streamingfinal
   .groupBy(
     date_format(col("ServiceTime"),'EEEE').alias("Service_Day"),
       df_Streamingfinal.ServiceType,
       df_Streamingfinal.ServiceHour,
       df_Streamingfinal.borough,
       window(df_Streamingfinal.ServiceTime, "1 hour")
          ) 
    .count()
)

OrderByBoroughPerDayAndServiceType.isStreaming

In [26]:
spark.conf.set("spark.sql.shuffle.partitions", "2") 

query = (
  OrderByBoroughPerDayAndServiceType
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table           
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [27]:
from time import sleep
sleep(5)

In [28]:
%sql
-- select ServiceType, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by ServiceType, date_format(window.end, "MMM-dd HH:mm")
select ServiceType,Servicehour, sum(count) as total_count from counts group by ServiceType,Servicehour order by Servicehour, ServiceType

ServiceType,Servicehour,total_count
DropOff,0,1
DropOff,1,3
Pickup,1,2
DropOff,2,2
Pickup,2,2
DropOff,3,3
Pickup,3,3
DropOff,4,4
Pickup,4,4
DropOff,5,7


In [29]:
%sql select Borough,Service_Day, sum(count) as total_count from counts where Servicetype = 'Pickup' group by Service_Day, Borough

Borough,Service_Day,total_count
Staten Island,Wednesday,159
Queens,Wednesday,3158
EWR,Wednesday,8
Bronx,Wednesday,2183
,Tuesday,1299
Brooklyn,Thursday,98
Brooklyn,Wednesday,6305
Bronx,Tuesday,518
Bronx,Sunday,49
,Sunday,708


In [30]:
sleep(5)

In [31]:
%sql 
select ServiceType,Servicehour, sum(count) as total_count from counts group by ServiceType,Servicehour order by Servicehour, ServiceType


ServiceType,Servicehour,total_count
DropOff,0,4
Pickup,0,4
DropOff,1,8
Pickup,1,6
DropOff,2,4
Pickup,2,5
DropOff,3,7
Pickup,3,6
DropOff,4,9
Pickup,4,10


In [32]:
%sql select Borough,Service_Day, sum(count) as total_count from counts where Servicetype = 'Pickup' group by Service_Day, Borough

Borough,Service_Day,total_count
Staten Island,Wednesday,179
Queens,Wednesday,3663
EWR,Wednesday,8
Bronx,Wednesday,2532
,Tuesday,1299
,Sunday,731
Brooklyn,Thursday,98
Brooklyn,Wednesday,7480
Bronx,Tuesday,518
Bronx,Sunday,55
