# Project 1 - Streams Processing

- André Bastos, Nº 56969
- Carolina Goldstein, Nº 57213
- Rafaela Cruz, Nº 56926

Run this notebook using the publisher `publisher-debs.sh`, with no additional parameters (i.e., the default).

## Util Functions

In [None]:
import socket
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, ceil, concat_ws, window, avg, sum, round
from pyspark.sql import Row
import math
    
# Longitude and latitude from the upper left corner of the grid, to help conversion
init_long = -74.916578
init_lat = 41.47718278
# Longitude and latitude from the lower right boundaries for filtering purposes
limit_long = -73.120778
limit_lat = 40.12971598
    
# Used to filter the rows of the dataset. It returns a boolean value signaling if the row fills all the requirements.
# These requirements are that it is not empty, its locations are within the city and that trip time, trip distance
# and total amount are above 0
def apply_filters(line):
    # Split the line by a ,
    splitted_line = line.split(',')
    # Return boolean
    return (
        (len(line) > 0) and \
        (float(splitted_line[6]) > init_long) and \
        (float(splitted_line[6]) < limit_long) and \
        (float(splitted_line[7]) > limit_lat) and \
        (float(splitted_line[7]) < init_lat) and \
        (float(splitted_line[8]) > init_long) and \
        (float(splitted_line[8]) < limit_long) and \
        (float(splitted_line[9]) > limit_lat) and \
        (float(splitted_line[9]) < init_lat) and \
        (float(splitted_line[5]) > 0) and \
        (float(splitted_line[4]) > 0) and \
        (float(splitted_line[16]) > 0)
        )
    

# Returns every row as it was with the additional areas based on the locations
def get_areas(line, _type = "bigger"):
    
    # Split the line by a ,
    splitted_line = line.split(',')
    line = splitted_line
    
    # Longitude and latitude that correspond to a shift in 500 meters
    long_shift = 0.005986
    lat_shift = 0.004491556

    # Longitude and latitude that correspond to a shift in 250 meters
    if _type == "smaller":
        long_shift = long_shift / 2
        lat_shift = lat_shift / 2
        
    return (
        line[0], line[1], line[2], line[3], line[4], line[5], line[6],
        line[7], line[8], line[9], line[10], line[11], line[12], line[13], line[14], line[15], line[16],
        str(math.ceil((float(line[6])-init_long)/long_shift)) + "-" + str(math.ceil((init_lat-float(line[7]))/lat_shift)),
        str(math.ceil((float(line[8])-init_long)/long_shift)) + "-" + str(math.ceil((init_lat-float(line[9]))/lat_shift))
        )

# Filters locations and some integers for dataframes
def filter_locations_and_integers(lines):
    split_lines = split(lines["value"], ",")
    lines = lines.filter(split_lines.getItem(6) < limit_long) \
        .filter(split_lines.getItem(6) > init_long) \
        .filter(split_lines.getItem(7) < init_lat) \
        .filter(split_lines.getItem(7) > limit_lat) \
        .filter(split_lines.getItem(8) < limit_long) \
        .filter(split_lines.getItem(8) > init_long) \
        .filter(split_lines.getItem(9) < init_lat) \
        .filter(split_lines.getItem(9) > limit_lat) \
        .filter(split_lines.getItem(5) > 0) \
        .filter(split_lines.getItem(4) > 0) \
        .filter(split_lines.getItem(16) > 0)
    return lines

# Function that gets the areas from locations
def get_areas_df(lines, _type = "bigger"):
    
    # Longitude and latitude that correspond to a shift in 500 meters
    long_shift = 0.005986
    lat_shift = 0.004491556

    # Longitude and latitude that correspond to a shift in 250 meters
    if _type == "smaller":
        long_shift = long_shift / 2
        lat_shift = lat_shift / 2
        
    split_lines = split(lines["value"], ",")
    
    lines = lines \
    .withColumn("cell_pickup_longitude", ceil((split_lines.getItem(6).cast("double") - init_long) / long_shift)) \
    .withColumn("cell_pickup_latitude", -ceil((split_lines.getItem(7).cast("double") - init_lat) / lat_shift)) \
    .withColumn("cell_dropoff_longitude", ceil((split_lines.getItem(8).cast("double") - init_long) / long_shift)) \
    .withColumn("cell_dropoff_latitude", -ceil((split_lines.getItem(9).cast("double") - init_lat) / lat_shift))
    
    lines = lines \
        .withColumn("cell_pickup", concat_ws("-", lines["cell_pickup_latitude"], lines["cell_pickup_longitude"])) \
        .withColumn("cell_dropoff", concat_ws("-", lines["cell_dropoff_latitude"], lines["cell_dropoff_longitude"])) \
        .drop("cell_pickup_latitude", "cell_pickup_longitude", "cell_dropoff_latitude", "cell_dropoff_longitude")
    
    return lines

# Gets the route from the concatenations of both cells
def get_routes(lines):
    lines = lines.withColumn("route", concat_ws("/", lines["cell_pickup"], lines["cell_dropoff"]))
    return lines

# Function that does basic pre processing on the data for the dataframes examples
def pre_process_df(lines):
    # Filter empty rows
    lines = lines.na.drop(how="all")
    # Filter locations outside current range or bad inputs. Also filter distance time, 
    # time and total amounts that are less than 0.
    lines = filter_locations_and_integers(lines)
    return lines

# Get only the specified columns
# Columns is an array with tuples inside, the tuples got the column name and the respective type
def get_columns(lines, columns):
    # Array with columns for index
    whole_columns = ["medallion", "hack_license", "pickup_datetime", "dropoff_datetime", "trip_time",
                    "trip_distance", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
                    "payment_type", "fare_amount", "surcharge", "mta_tax", "tip_amount", "tolls_amount", "total_amount"]
    split_lines = split(lines["value"], ",")
    for column in columns:
        lines = lines.withColumn(column[0], split_lines.getItem(whole_columns.index(column[0])).cast(column[1]))
    
    return lines
    
def dumpBatchDF(df, epoch_id):
    df.show(20, False)

## Query 1 - Spark Streaming Dataframes

Find the top 10 most frequent routes during the last 30 minutes:

- A route is represented by a starting grid cell and an ending grid cell.
- All routes completed within the last 30 minutes are considered for the query.
- Use a grid of 300 x 300 cells (each cell is a square of 500 x 500 m)

In [None]:
# Get spark session instance
spark = SparkSession \
    .builder \
    .appName("Kafka Pstr Project 1") \
    .getOrCreate()

lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "debs") \
  .load()

# Apply filters
lines = pre_process_df(lines)

# Get specified columns only
columns = [("dropoff_datetime", "timestamp"), ("dropoff_longitude", "double"), ("dropoff_latitude", "double"),
          ("pickup_longitude", "double"), ("pickup_latitude", "double")]
lines = get_columns(lines, columns)

# Get the cells from locations
lines = get_areas_df(lines)

# Join areas to form routes
lines = get_routes(lines)

# Count the occurrences for each route and present only the top 10 most frequent routes
most_frequent_routes = lines.withWatermark("dropoff_datetime", "30 minutes") \
                            .groupBy(window(lines.dropoff_datetime, "30 minutes", "10 minutes"),"route") \
                            .count() \
                            .orderBy("window","count",ascending=False) \
                            .limit(10)

query = most_frequent_routes \
            .writeStream \
            .trigger(processingTime="10 seconds") \
            .outputMode("complete") \
            .foreachBatch(dumpBatchDF) \
            .start()

query.awaitTermination(60)

query.stop()
spark.stop()

## Query 2 - Spark Streaming Dataframes

Identify areas that are currently most profitable for taxi drivers:

- The profitability of an area is determined by dividing the area profit by the number of dropoffs in that area within the last 15 minutes.
- The profit that originates from an area is computed by calculating the average fare + tip for trips that started in the area and ended within the last 15 minutes.
- For this problem use a cell size of 250m X 250 m, i.e., a 600 x 600 grid

In [None]:
# Get spark session instance
spark = SparkSession \
    .builder \
    .appName("Kafka Pstr Project 1") \
    .getOrCreate()

lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "debs") \
  .load()

# Apply filters
lines = pre_process_df(lines)

# Get specified columns only
columns = [("dropoff_datetime", "timestamp"), ("pickup_datetime", "timestamp"),
    ("pickup_longitude", "double"), ("pickup_latitude", "double"),
    ("dropoff_longitude", "double"), ("dropoff_latitude", "double"), 
    ("fare_amount", "float"), ("tip_amount", "float")]
lines = get_columns(lines, columns)

# Get the cells from locations
lines = get_areas_df(lines, "smaller")

# Get the profit amount
lines = lines.withColumn("profit", lines["fare_amount"] + lines["tip_amount"]) \
             .drop("value") 

# Select only needed columns
lines = lines.select("dropoff_datetime","pickup_datetime", "profit", "cell_pickup", "cell_dropoff")

# Compute the average profit by pickup area
profit_average = lines.groupBy(lines.cell_pickup,window(lines.dropoff_datetime,"15 minutes"))\
                      .agg(avg(lines.profit).alias("avg_profit"))
   
# Compute total numbers of dropoffs
taxis_total = lines.groupBy(lines.cell_dropoff,window(lines.dropoff_datetime,"30 minutes"))\
                   .count()

                    
query = profit_average \
    .writeStream \
    .trigger(processingTime="15 seconds") \
    .queryName("profit") \
    .outputMode("complete") \
    .format("memory")\
    .start()


query2 = taxis_total \
    .writeStream \
    .trigger(processingTime="15 seconds") \
    .queryName("taxis") \
    .outputMode("complete") \
    .format("memory") \
    .start()

for i in range(6) :
    spark.sql('''select profit.cell_pickup AS cell, profit.window, taxis.window, profit.avg_profit / taxis.count AS profitability 
    from taxis join profit on 
    taxis.cell_dropoff = profit.cell_pickup
    ORDER BY profit.window, profitability DESC''')\
        .show(5,False)
    query.awaitTermination(60)

query.stop()
query2.stop()
spark.stop()

## Query 3 - Spark Streaming

Detect "slow" areas:
- Compute the average idle time of taxis for each area:
    - The idle time of a taxi is the time mediating between the drop off of a ride, and the pickup time of the following ride.
    - It is assumed that a taxi is available if it had at least one ride in the last hour.

In [None]:
# Input: (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime))
# If the time between a dropoff and the next pickup is longer than 1 hour, the taxi is not available (is_available = 0)
def updateFunctionAvailableTaxis(newValues, runningObj):
    if runningObj is None:
        runningObj = (newValues[0][0],newValues[0][1],newValues[0][2],newValues[0][3],0)
    for v in newValues:
        is_available = 1
        if datetime.strptime(runningObj[3], "%Y-%m-%d %H:%M:%S")-datetime.strptime(v[2], "%Y-%m-%d %H:%M:%S") > timedelta(hours=1):
            is_available = 0
        runningObj = (v[0], v[1], v[2], v[3], is_available)
    return runningObj
# Output: (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime, is_available))

# Input: (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime))
# For each ride, append the dropoff area and the dropoff datetime of the last ride
def updateFunction(newValues, runningObj):
    if runningObj is None:
        runningObj = (newValues[0][0],newValues[0][1],newValues[0][2],newValues[0][3],0,0)
    else:
        for v in newValues:
            runningObj = (v[0], v[1], v[2], v[3], runningObj[1], runningObj[3])
    return runningObj
# Output: (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime, old dropoff area, old dropoff datetime))

# Creating the spark context and streaming context objects
sc = SparkContext("local[2]", "KafkaExample")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('checkpoint')
lines = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

try:
    # Kafka sends a timestamp followed by the actual value of the row, so keep the second value in the tuple
    filtered_lines = lines.map(lambda line: line[1])
    # Apply filters
    filtered_lines = filtered_lines.filter(lambda line: apply_filters(line))
    # Get areas
    lines_areas = filtered_lines.map(lambda line: get_areas(line))
    
    # We map the needed values to show them in the shape:
    # (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime))
    datetime_per_taxi = lines_areas.map(lambda line: (line[1],(line[17],line[18],line[2],line[3])))
    
    # Get only the available taxis - We obtain tuples in the shape:
    # (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime, is_available))
    # And filter them to keep only those with the parameter is_available = 1
    available_taxis = datetime_per_taxi.updateStateByKey(updateFunctionAvailableTaxis) \
                                       .filter(lambda a: a[1][4] == 1) \
                                       .map(lambda a: (a[0],(a[1][0], a[1][1], a[1][2], a[1][3])))
    
    # For each ride, append the dropoff area and the dropoff datetime of the last ride
    # We get the tuples:
    # (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime, old dropoff area, old dropoff datetime))
    # We filter out the tuples in which the old dropoff time is 0
    extended_rides = available_taxis.updateStateByKey(updateFunction) \
                                    .filter(lambda a: a[1][4] != 0)
    
    # (taxi, (pickup area, dropoff area, pickup datetime, dropoff datetime, old dropoff area, old dropoff datetime))
    # New pickup area must be the same as old dropoff area
    # Old dropoff datetime must be < than new pickup datetime
    processed_extended_rides = extended_rides.filter(lambda a: a[1][0] == a[1][4]) \
                                             .filter(lambda a: datetime.strptime(a[1][5], "%Y-%m-%d %H:%M:%S").timestamp() < datetime.strptime(a[1][2], "%Y-%m-%d %H:%M:%S").timestamp())
    
    # Now we only need the new pickup area, the new pickup datetime and the old dropoff datetime:
    # (pickup area, pickup datetime, old dropoff datetime)
    # We can then compute the idle time for each ride in each area
    idle_time_per_ride_and_area = processed_extended_rides.map(lambda a: (a[1][0], (a[1][2],a[1][5]))) \
                                             .map(lambda a: (a[0], datetime.strptime(a[1][0], "%Y-%m-%d %H:%M:%S")-datetime.strptime(a[1][1], "%Y-%m-%d %H:%M:%S"))) \
                                             .map(lambda a: (a[0], int(a[1].seconds))) \
                                             .transform(lambda rdd: rdd.sortBy(lambda a: a[1], ascending=False))
    
    # (pickup area, idle time)
    # With the idle time for every ride in each area, we can compute the average idle time for each area
    avg_idle_time_per_area = idle_time_per_ride_and_area.map(lambda a: (a[0],(a[1],1))) \
                                                        .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1])) \
                                                        .map(lambda a: (a[0], a[1][0]/a[1][1])) \
                                                        .transform(lambda rdd: rdd.sortBy(lambda a: a[1], ascending=False))

    
    avg_idle_time_per_area.pprint()

    ssc.start()
    ssc.awaitTermination(60)
    ssc.stop()
    sc.stop()

except Exception as e:
    print(e)
    ssc.stop()
    sc.stop()

## Query 4 - Spark Streaming

Detect congested areas (routes):
- Areas (routes) where, when the taxis enter there, the rides increase in their duration.
- For that, there should be alerts when a taxi has a peak in the duration of the ride that is followed by at least 2 rides all increasing in their duration and above area average duration for the last 4 hours.
- The alert should contain the location where the taxi started the ride which had the peak duration.

In [None]:
# Update function that persists through time the information of the number of rides that rise sequentially in
# duration for all routes
# Example of newValues (120.0, '-73.989525', '40.741528')
# This updateState does not need artificial window because it already resets values in it's nature
def updateFunction(newValues, runningObj):
    if runningObj is None:
        runningObj = (newValues[0][0], 0, 0, newValues[0][1], newValues[0][2], newValues[0][0])
    for v in newValues:
        # counter - counts the number of sequential rides
        counter = runningObj[1]
        # prev_dur - previous rows duration
        prev_dur = runningObj[0]
        # dur - target's row duration
        dur = v[0]
        # long - target's row long
        long = runningObj[2]
        # lat - target's row lat
        lat = runningObj[3]
        # first_dur - first ride's duration of the sequence of rides
        first_dur = runningObj[4]

        # if current duration is inferior to the previous one, reset counter
        if dur <= prev_dur:
            counter = 0
        else: counter += 1

        # if reset happens, also reset some variables to persist from first ride
        if counter == 0:
            long = v[1]
            lat = v[2]
            first_dur = v[0]

        runningObj = (dur, counter, long, lat, first_dur)
    return runningObj

# Update function that gets the count of duration per route
# Example of newValues (120.0, pickup_datetime, dropoff_datetime)
# Also has an artificial window that resets computations when 4 hours have passed
def updateFunctionMean(newValues, runningCount):
    if runningCount is None:
        runningCount = (0.0, 0.0, newValues[0][1], newValues[0][2])
    for v in newValues:

        runningCount = (v[0] + runningCount[0], runningCount[1]+1, runningCount[2], v[2])

        if datetime.strptime(v[2], "%Y-%m-%d %H:%M:%S") - datetime.strptime(runningCount[2], "%Y-%m-%d %H:%M:%S") > timedelta(hours=4):
            runningCount = (v[0], 1, v[1], v[2])

    return runningCount

# Creating the spark context and streaming context objects
sc = SparkContext("local[2]", "KafkaExample")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('checkpoint')
lines = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

try:
    
    # Kafka sends a timestamp followed by the actual value of the row, so keep the second value in the tuple
    filtered_lines = lines.map(lambda line: line[1])
    # Apply filters
    filtered_lines = filtered_lines.filter(lambda line: apply_filters(line))
    # Get areas
    lines_areas = filtered_lines.map(lambda line: get_areas(line))

    # Get desired columns only
    desired_columns = lines_areas.map(lambda line: (str(line[17]) + ":" + str(line[18]), (float(line[4]), line[6], line[7])))

    # Get rows with number of rising rides through the beginning
    counters = desired_columns.updateStateByKey(updateFunction)

    # Only get the rows that had a sequential ride number equal or above to 2
    # Example: ('315-325:379-372', (560.0, 1, 0, '-73.97625', '40.748528', 234,1))
    counters_filtered = counters.filter(lambda line: line[1][1] >= 2)

    # Get desired columns for mean computation
    desired_columns_means = lines_areas.map(lambda line: (str(line[17]) + ":" + str(line[18]), (float(line[4]), line[2], line[3])))

    # Get the mean of duration per route
    means = desired_columns_means.updateStateByKey(updateFunctionMean) \
            .map(lambda line: (line[0], float(line[1][0]) / float(line[1][1]), line[1][2], line[1][3]))

    # Joins the two above together and flattens, took summed duration
    joined = counters_filtered.join(means) \
            .map(lambda line: (line[0], line[1][0][1], line[1][0][2], line[1][0][3], line[1][0][4], line[1][1]))

    # Per position meanings: (area, nr_rides, first_long, first_lat, first_dur, area_mean_dur)
    # Filter out rows that have a first ride duration equal or less than the mean of the same route
    peak_filter = joined.filter(lambda line: float(line[4]) > float(line[5]))
    
    # Get only useful information to show
    peak_filter = peak_filter.map(lambda line: (line[0], line[1], line[2], line[3]))

    peak_filter.pprint()

    ssc.start()
    ssc.awaitTermination(60)
    ssc.stop()
    sc.stop()

except Exception as e:
    print(e)
    ssc.stop()
    sc.stop()

## Query 5 - Spark Streaming Dataframes

Select the most pleasant taxi drivers:

- To distinguish the most pleasant taxi driver in one day, it should be shown the taxi driver with the highest percentage of tips in that day.

In [None]:
# Get spark session instance
spark = SparkSession \
    .builder \
    .appName("Kafka Pstr Project 1") \
    .getOrCreate()

lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "debs") \
  .load()

# Apply filters
lines = pre_process_df(lines)

# Get specified columns only
columns = [("dropoff_datetime", "timestamp"), ("hack_license", "string"), ("tip_amount", "double"), 
          ("total_amount", "double")]
lines = get_columns(lines, columns)

# Get tip percentage
lines = lines.withColumn("tip_percentage", lines["tip_amount"]/lines["total_amount"]) \
    .drop("value","key","topic","partition","offset","timestamp","timestampType")

# For each taxi driver, compute the average tip percentage and show only the taxi driver with the highest value
# for this average
avg_tip_percentage = lines.groupBy(window("dropoff_datetime","1 days"),"hack_license") \
                          .avg("tip_percentage") \
                          .orderBy('avg(tip_percentage)',ascending=False) \
                          .limit(1)

query = avg_tip_percentage \
            .writeStream \
            .outputMode("complete") \
            .trigger(processingTime="60 seconds") \
            .foreachBatch(dumpBatchDF) \
            .start()

query.awaitTermination(60)

query.stop()
spark.stop()

## Query 6 - Spark Streaming Dataframes

What does the price discrimination look like for the pickup areas where the taxi drivers make most money, in each 2 hour window?

In [None]:
# Get spark session instance
spark = SparkSession \
    .builder \
    .appName("Kafka Pstr Project 1") \
    .getOrCreate()

lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "debs") \
  .load()

# Apply filters
lines = pre_process_df(lines)

# Get specified columns only
columns = [("pickup_datetime", "timestamp"), ("fare_amount", "float"), ("surcharge", "float"), 
          ("mta_tax", "float"), ("tip_amount", "float"), ("tolls_amount", "float"), ("total_amount", "float")]
lines = get_columns(lines, columns)

# Get the cells from locations
lines = get_areas_df(lines)

lines = lines.drop("value","key","topic","partition","offset","timestamp","timestampType","cell_pickup_latitude", "cell_pickup_longitude")

# For each pickup area and each time interval of 2 hours, we compute the sum of the total amount and, of that
# total amount, what percentage corresponds to the fare amount, the surcharge, the mta_tax, the tip_amount 
# and the toll_amount
# We order by the summed total amount, to show these statistics only for the areas where taxi drivers make
# more money
statistics = lines.groupBy("cell_pickup", window("pickup_datetime","2 hours")) \
                  .agg(round(sum("total_amount"),3).alias("sum_total_amount"), \
                       round(sum("fare_amount")/sum("total_amount"),3).alias("fare"), \
                       round(sum("surcharge")/sum("total_amount"),3).alias("surcharge"),\
                       round(sum("mta_tax")/sum("total_amount"),3).alias("mta_tax"),\
                       round(sum("tip_amount")/sum("total_amount"),3).alias("tip"),\
                       round(sum("tolls_amount")/sum("total_amount")).alias("tolls"))\
                  .orderBy("sum_total_amount", ascending=False) \
                  .limit(5)

query = statistics \
            .writeStream \
            .outputMode("complete") \
            .trigger(processingTime="30 seconds") \
            .foreachBatch(dumpBatchDF) \
            .start()

query.awaitTermination(60)

query.stop()
spark.stop()

# Query 7 - Spark Streaming Dataframes

In which routes the total amount per mile is highest?

In [None]:
# Get spark session instance
spark = SparkSession \
    .builder \
    .appName("Kafka Pstr Project 1") \
    .getOrCreate()

lines = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "debs") \
  .load()

# Apply filters
lines = pre_process_df(lines)

# Get specified columns only
columns = [("pickup_datetime", "timestamp"), ("pickup_longitude", "double"), ("pickup_latitude", "double"),
            ("dropoff_longitude", "double"), ("dropoff_latitude", "double"),
            ("trip_distance", "float"), ("total_amount", "float")]
lines = get_columns(lines, columns)

# Get the cells from locations
lines = get_areas_df(lines)

# Join areas to form routes
lines = get_routes(lines)

lines = lines.drop("value","key","topic","partition","offset","timestamp","timestampType")

# For each hour and each route, we compute the summed total amount and the summed trip distance 
total_per_route = lines.groupBy(window("pickup_datetime", "1 hours"),"route") \
                       .agg(sum("total_amount").alias("sum_total_amount"),
                            sum("trip_distance").alias("total_distance"))
                        
# We add a new column to the dataframe with the division between the summed total amount and the 
# summed trip distance
profit_per_route = total_per_route.withColumn("amount_per_mile", 
                                               total_per_route["sum_total_amount"]/total_per_route["total_distance"]) \
                                  .drop("sum_total_amount","total_distance")

# We order the results by the total amount per mile, the obtain the top 10 routes in which
# a taxi driver can make more money per mile
most_profitable_routes = profit_per_route.orderBy("amount_per_mile", ascending=False) \
                                         .limit(10)

query = most_profitable_routes \
            .writeStream \
            .outputMode("complete") \
            .trigger(processingTime="5 seconds") \
            .foreachBatch(dumpBatchDF) \
            .start()

query.awaitTermination(60)

query.stop()
spark.stop()