# Initialize a Spark Session

In [19]:
import findspark
findspark.init()
import pyspark
# Creating a SparkSession in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Spark Streaming Demonstration")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()
# keep the size of shuffles small
spark.conf.set("spark.sql.shuffle.partitions", "2") 

# Read data in streaming mode

In [20]:
inputPath = './taxi-data/'

In [21]:
from pyspark.sql.types import *

Define 2 schema:

- One for yellow

- One for green

- For each file, we will read them twice

    - First one follows yellow schema

    - Next one follows green schema

    - Filter to keep only suitable rows (matched schema)


In [22]:
# Define schemas
yellow_schema = StructType([
    StructField("type", StringType(), True),
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", StringType(), True),
    StructField("tpep_dropoff_datetime", StringType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", FloatType(), True),
    StructField("extra", FloatType(), True),
    StructField("mta_tax", FloatType(), True),
    StructField("tip_amount", FloatType(), True),
    StructField("tolls_amount", FloatType(), True),
    StructField("improvement_surcharge", FloatType(), True),
    StructField("total_amount", FloatType(), True)
])

green_schema = StructType([
    StructField("type", StringType(), True),
    StructField("VendorID", IntegerType(), True),
    StructField("lpep_pickup_datetime", StringType(), True),
    StructField("Lpep_dropoff_datetime", StringType(), True),
    StructField("Store_and_fwd_flag", StringType(), True),
    StructField("RateCodeID", IntegerType(), True),
    StructField("Pickup_longitude", FloatType(), True),
    StructField("Pickup_latitude", FloatType(), True),
    StructField("Dropoff_longitude", FloatType(), True),
    StructField("Dropoff_latitude", FloatType(), True),
    StructField("Passenger_count", IntegerType(), True),
    StructField("Trip_distance", FloatType(), True),
    StructField("Fare_amount", FloatType(), True),
    StructField("Extra", FloatType(), True),
    StructField("MTA_tax", FloatType(), True),
    StructField("Tip_amount", FloatType(), True),
    StructField("Tolls_amount", FloatType(), True),
    StructField("Ehail_fee", FloatType(), True),
    StructField("improvement_surcharge", FloatType(), True),
    StructField("Total_amount", FloatType(), True),
    StructField("Payment_type", IntegerType(), True),
    StructField("Trip_type", IntegerType(), True)
])

In [23]:
yellow_df = spark \
            .readStream \
            .schema(yellow_schema) \
            .option('maxFilesPerTrigger', 1) \
            .csv(inputPath)


- Rename columns for `union()`

- Filter to keep only `yellow` type

In [24]:
yellow_df = yellow_df.withColumnRenamed("tpep_dropoff_datetime", "drop_off_datetime")
yellow_df = yellow_df.filter(yellow_df.type == 'yellow')

In [25]:
yellow_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- drop_off_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [26]:
green_df = spark \
        .readStream \
        .schema(green_schema) \
        .option('maxFilesPerTrigger', 1) \
        .csv(inputPath)


- Rename to `union()`

- Filter to keep only `green` type

In [27]:
green_df = green_df.withColumnRenamed("Lpep_dropoff_datetime", "drop_off_datetime")
green_df = green_df.filter(green_df.type == 'green')

In [28]:
green_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- drop_off_datetime: string (nullable = true)
 |-- Store_and_fwd_flag: string (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- Pickup_longitude: float (nullable = true)
 |-- Pickup_latitude: float (nullable = true)
 |-- Dropoff_longitude: float (nullable = true)
 |-- Dropoff_latitude: float (nullable = true)
 |-- Passenger_count: integer (nullable = true)
 |-- Trip_distance: float (nullable = true)
 |-- Fare_amount: float (nullable = true)
 |-- Extra: float (nullable = true)
 |-- MTA_tax: float (nullable = true)
 |-- Tip_amount: float (nullable = true)
 |-- Tolls_amount: float (nullable = true)
 |-- Ehail_fee: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- Total_amount: float (nullable = true)
 |-- Payment_type: integer (nullable = true)
 |-- Trip_type: integer (nullable = true)



Union 2 DataFrame to get a suitable format

In [29]:
taxi_df = yellow_df.select('drop_off_datetime').union(green_df.select('drop_off_datetime'))


In [30]:
taxi_df.isStreaming

True

In [31]:
from pyspark.sql.functions import *      # for window() function


# Task 2

In [32]:
streaming_count_df = (
    taxi_df
    .groupBy(
        window(taxi_df.drop_off_datetime, '1 hour')
    )
    .count()
)

In [33]:
streaming_count_df.isStreaming

True

In [None]:
query = (
    streaming_count_df
    .writeStream 
    .format('console')
    .outputMode('complete')
    .option('truncate', 'false')
    .start()
)

24/05/30 14:21:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/0l/z7rsxg3518l42bxfyj8d9j5m0000gn/T/temporary-5fe3da51-d338-4a38-85c9-52a7d3d7224f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/05/30 14:21:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|43   |
+------------------------------------------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|43   |
|{2015-12-01 04:00:00, 2015-12-01 05:00:00}|42   |
+------------------------------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+

In [66]:
query = (
    streaming_count_df
    .writeStream 
    .format('memory')
    .queryName('counts')
    .outputMode('complete')
    .option('truncate', 'false')
    .start()
)
query.awaitTermination(60)

24/05/30 14:47:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/0l/z7rsxg3518l42bxfyj8d9j5m0000gn/T/temporary-20de55b8-6c91-42ad-b4c1-554f1c083e27. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/05/30 14:47:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answe

KeyboardInterrupt: 

In [67]:
query.isActive

True

In [68]:
query.stop()

In [69]:
spark.sql('''SELECT * 
        FROM counts
        ORDER BY window''').show(24, truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-12-01 00:00:00, 2015-12-01 01:00:00}|7396 |
|{2015-12-01 01:00:00, 2015-12-01 02:00:00}|5780 |
|{2015-12-01 02:00:00, 2015-12-01 03:00:00}|3605 |
|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|2426 |
|{2015-12-01 04:00:00, 2015-12-01 05:00:00}|2505 |
|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|3858 |
|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|10258|
|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|19007|
|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|23799|
|{2015-12-01 09:00:00, 2015-12-01 10:00:00}|24003|
|{2015-12-01 10:00:00, 2015-12-01 11:00:00}|21179|
|{2015-12-01 11:00:00, 2015-12-01 12:00:00}|20219|
|{2015-12-01 12:00:00, 2015-12-01 13:00:00}|20522|
|{2015-12-01 13:00:00, 2015-12-01 14:00:00}|20556|
|{2015-12-01 14:00:00, 2015-12-01 15:00:00}|21712|
|{2015-12-01 15:00:00, 2015-12-01 16:00:00}|22016|
|{2015-12-01 16:00:00, 2015-12-

Get result to write outputs

In [70]:
result_df = spark.sql('SELECT * FROM counts ORDER BY window')

In [71]:
import pandas as pd
import os

Write output into respetive output folder (format `output-xxxxxx`)

In [72]:
for row in result_df.collect():
    start_time = row[0]['start']
    end_time = row[0]['end']
    count = row[1]

    ms = (end_time.hour * 60 + end_time.minute) * 60 * 10**2
    result = row[1]
    res_df = pd.DataFrame(data=[[start_time, end_time, count]], columns=['start_time', 'end_time', 'count'])
    dir_name = f'output-{ms}'
    os.makedirs(dir_name, exist_ok=True)
    res_df.to_csv(f'output-{ms}/output.csv')

# Task 3

In [None]:
goldman = [[-74.0141012, 40.7152191], [-74.013777, 40.7152275], [-74.0141027,40.7138745], [-74.0144185, 40.7140753]]
citigroup = [[-74.011869, 40.7217236], [-74.009867, 40.721493], [-74.010140,40.720053], [-74.012083, 40.720267]]

In [None]:
goldman

In [None]:
citigroup

Change columns name for `union()`

In [None]:
green_df.printSchema()

In [None]:
green_df = green_df.withColumnRenamed('Dropoff_longitude', 'dropoff_longitude')
green_df = green_df.withColumnRenamed('Dropoff_latitude', 'dropoff_latitude')

Yellow schema do not need to change

In [None]:
yellow_df.printSchema()

In [None]:
region_taxi_df = yellow_df.select('dropoff_longitude', 'dropoff_latitude', 'drop_off_datetime')\
            .union(green_df.select('dropoff_longitude', 'dropoff_latitude', 'drop_off_datetime'))


In [None]:
in_goldman = (
    (region_taxi_df.dropoff_longitude >= -74.0144185) & (region_taxi_df.dropoff_longitude <= -74.013777) &
    (region_taxi_df.dropoff_latitude >= 40.7138745) & (region_taxi_df.dropoff_latitude <= 40.7152275)
)

in_groupciti = (
    (region_taxi_df.dropoff_longitude >= -74.012083) & (region_taxi_df.dropoff_longitude <= -74.009867) &
    (region_taxi_df.dropoff_latitude >= 40.720053) & (region_taxi_df.dropoff_latitude <= 40.7217236)
)

In [None]:
region_taxi_df = region_taxi_df.filter(in_goldman | in_groupciti)
region_taxi_df = region_taxi_df.withColumn('region', when(in_goldman, 'goldman').otherwise('citigroup'))

In [None]:
streaming_count_df = (
    region_taxi_df
    .groupBy(
        window(region_taxi_df.drop_off_datetime, '1 hour'),
        'region'
    )
    .count()
)

In [None]:
streaming_count_df.isStreaming

In [None]:
query = (
    streaming_count_df
    .writeStream
    .format('memory')
    .queryName('regionalCounts')
    .outputMode('complete')
    .option('truncate', 'false')
    .start()
)
query.awaitTermination()

In [None]:
query.stop()

In [None]:
spark.sql(
    '''
    SELECT * 
    FROM regionalCounts
    ORDER BY window
    '''
).show(24, truncate=False)