In [1]:
#importing spark and creating spark sessions
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("streaming").master("local[*]").getOrCreate()



In [2]:
#importing required libraries
import math
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row
import datetime
from time import sleep
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from functools import reduce
from pyspark.sql.functions import date_format

#importing HTML for better display
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))



In [3]:
#Creating the schema for data to be received via spark streaming
dataSchema = StructType([StructField("medallion",StringType(),True), 
           StructField("hack_license",StringType(),True), 
           StructField("pickup_datetime",TimestampType(),True), 
           StructField("dropoff_datetime",TimestampType(),True), 
           StructField("trip_time_in_secs",StringType(),True), 
           StructField("trip_distance",StringType(),True), 
           StructField("pickup_longitude",FloatType(),True), 
           StructField("pickup_latitude",FloatType(),True), 
           StructField("dropoff_longitude",FloatType(),True), 
           StructField("dropoff_latitude",FloatType(),True), 
           StructField("payment_type",StringType(),True), 
           StructField("fare_amount",FloatType(),True), 
           StructField("surcharge",FloatType(),True), 
           StructField("mta_tax",FloatType(),True), 
           StructField("tip_amount",FloatType(),True), 
           StructField("tolls_amount",FloatType(),True), 
           StructField("total_amount",FloatType(),True)
                       ])

In [4]:
#Reading the csv data into the dataframe
df = (spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).csv("sampled_data", timestampFormat= "dd-MM-yyyy HH:mm"))


In [5]:
#Giving the specifications of latitudes and longitudes
start_lat = 41.474937      #latitude for starting cell
start_long = -74.913585    #longitude for starting cell
lat_degree = 0.004491556/2 #Change in degrees for latitude
long_degree = 0.004491556/2#Change in degrees for longitude
Grid_size = 150            #Grid size for finding most frequent routes in kms
cell_size = 0.5            #Cell size for finding most frequent routes in kms

In [6]:
#Function defined for calculating grid cells
def get_grids(longitude, latitude):
    if longitude == None or latitude == None:               #Removing distorted data
        return None
    long = int(abs(longitude - start_long)/long_degree)     #Generating first half of grid cell
    lat = int(abs(latitude - start_lat)/lat_degree)         #Generating second half of grid cell
    if (long <= 600 and long >= 0 and lat <= 600 and lat >= 0):
        result = (str(long) + "." + str(lat))               # Getting the grid cells in proper format
        return float(result)

In [7]:
#Creating a UDF for pyspark streaming dataframe
grid = udf(get_grids, FloatType())
df = df.withColumn("start_cell", grid(df.pickup_longitude, df.pickup_latitude))\
       .withColumn("end_cell", grid(df.dropoff_longitude, df.dropoff_latitude))
df = df.where(df.start_cell != df.end_cell)            #Removing the discrepency of data

In [8]:
from pyspark.sql.functions import *

#Creating the function to run on each batch
def batch_operation(df ,epoch_id):
    start = datetime.now()                 #Start time for analysing the delay
    df = df.withColumn('profit', df.total_amount+df.tip_amount)   #Calculating the profit by summing tip and total amount
    
    #getting the median prfoit for each area, checking the difference being lesser than 15 minutes
    med_df = df.filter((minute(df.dropoff_datetime) - minute(df.pickup_datetime)) < 15)\
               .groupBy(window("dropoff_datetime", "30 minutes"),"start_cell")\
               .sum("profit").withColumnRenamed("sum(profit)", "median_profit").orderBy(col("window").desc())

    #getting the number of empty taxis, checking the difference to be lesser than 30 minutes
    empty_df =  df.filter((minute(df.dropoff_datetime) - minute(df.pickup_datetime)) < 30)\
                    .groupBy(window("dropoff_datetime", "30 minutes"),"medallion","end_cell").count()\
                    .orderBy(col("window").desc())\
                    .where(col("count") == 1 ).groupBy("end_cell").sum("count").withColumnRenamed("sum(count)", "empty_taxis")


    #joined df getting the data of empty taxis
    df = med_df.join(empty_df.select("end_cell", "empty_taxis")\
               .distinct(), med_df.start_cell == empty_df.end_cell , how="left")
    
    #calculating profitability by dividing median profit by number of empty taxis
    df = df.withColumn("profitability", df.median_profit/df.empty_taxis).orderBy(df["window"], ascending = False)
    df = df.drop("end_cell").withColumnRenamed("start_cell", "profitable_areas")
    
    #partioning the window, ordering by profitability
    window_df = Window.partitionBy(df["window"]).orderBy(col("profitability").desc())
    df = df.withColumn("profitable_cell_id", row_number().over(window_df))\
           .filter(col("profitable_cell_id") <= 10)
    #changing the type of window column
    df = df.withColumn("window", col("window").cast(StringType())) 
    
    #getting correct format of data
    df = df.withColumn("pickup_datetime", split(col("window"), ",").getItem(0))\
           .withColumn("dropoff_datetime", split(col("window"), ",").getItem(1)).drop("window")
    df = df.withColumn("pickup_datetime",expr("substring(pickup_datetime, 2, 20)"))\
           .withColumn("dropoff_datetime",expr("substring(dropoff_datetime, 2, 20)"))
        
    end = datetime.now()            #End time for analysing the delay
    delay = end - start             #generating the delay
    
    df = df.withColumn("delay(in seconds)", lit(str(delay.total_seconds())))       #adding delay column for each data set
    
    #selecting the required columns in a particular format
    df = df.select("pickup_datetime", "dropoff_datetime","profitable_cell_id","profitable_areas", "empty_taxis","median_profit", "profitability", "delay(in seconds)" )
    
    #writing the data in csv format into output folder
    df.repartition(1).write.mode("append").csv('/home/ec2-user/output_folder_query2/', header = True)
    df.show(truncate = False)


In [9]:
#querying the data for batch processing
query = (df.writeStream.format("memory")\
           .foreachBatch(batch_operation)\
           .queryName("query2")\
           .outputMode("update")\
           .option("truncate",False)\
           .start())
df.isStreaming

True

In [10]:
#displaying the streaming data for 5 minutes
end_time = datetime.now() + timedelta(minutes=5)
df.createOrReplaceTempView("query2")      #replacing the view if already created
while True:                               #runs for displaying streaming data
    current_time = datetime.now()
    if current_time > end_time:           #if the time limit is exceeded then stops the loop
        break
    display(query.status)
    df_final = spark.sql('SELECT * FROM query2')  #using spark sql to display the data
    display(df_final)                             #displaying the final data
    sleep(3)                                     #adding a sleep time of 3 seconds

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

DataFrame[medallion: string, hack_license: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, trip_time_in_secs: string, trip_distance: string, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float, start_cell: float, end_cell: float]

+-------------------+--------------------+------------------+----------------+-----------+------------------+------------------+-----------------+
|pickup_datetime    |dropoff_datetime    |profitable_cell_id|profitable_areas|empty_taxis|median_profit     |profitability     |delay(in seconds)|
+-------------------+--------------------+------------------+----------------+-----------+------------------+------------------+-----------------+
|2013-03-31 20:30:00|2013-03-31 21:00:00}|1                 |408.385         |2          |11.0              |5.5               |2.16222          |
|2013-03-31 20:30:00|2013-03-31 21:00:00}|2                 |505.368         |34         |136.4600067138672 |4.013529609231388 |2.16222          |
|2013-03-31 20:30:00|2013-03-31 21:00:00}|3                 |502.371         |17         |57.83000183105469 |3.401764813591452 |2.16222          |
|2013-03-31 20:30:00|2013-03-31 21:00:00}|4                 |502.368         |27         |57.83000183105469 |2.1418519

+-------------------+--------------------+------------------+----------------+-----------+------------------+-------------------+-----------------+
|pickup_datetime    |dropoff_datetime    |profitable_cell_id|profitable_areas|empty_taxis|median_profit     |profitability      |delay(in seconds)|
+-------------------+--------------------+------------------+----------------+-----------+------------------+-------------------+-----------------+
|2013-04-30 20:30:00|2013-04-30 21:00:00}|1                 |393.313         |1          |10.5              |10.5               |0.319541         |
|2013-04-30 20:30:00|2013-04-30 21:00:00}|2                 |390.323         |12         |76.5              |6.375              |0.319541         |
|2013-04-30 20:30:00|2013-04-30 21:00:00}|3                 |473.328         |15         |80.93000030517578 |5.395333353678385  |0.319541         |
|2013-04-30 20:30:00|2013-04-30 21:00:00}|4                 |428.326         |74         |57.83000183105469 |0.7