In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f
findspark.init()

# Creating a SparkSession in Python
spark = SparkSession.builder\
    .master("local")\
    .appName("Spark Streaming Task 3")\
    .getOrCreate()

# Configures the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.shuffle.partitions", "10")

24/05/17 15:43:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
import os
cwd = os.getcwd()
cwd

'/home/dat_21127240/BigData/Big-Data-With-Seaborn-Bokeh-Plotly/Lab_03/src'

In [3]:
inputPath = cwd + "/../data/taxi-data"
outputPath = cwd + "/../output_3"
checkpointPath = cwd + "/../checkpoint_3"

In [4]:
full_schema = StructType([StructField(f"_c{i}", StringType(), nullable=True) for i in range(22)])

# define schema for yellow taxi trips
yellow_taxi_schema = StructType([
    StructField("type", StringType(), nullable=False),
    StructField("VendorID", IntegerType()),
    StructField("tpep_pickup_datetime", TimestampType()),
    StructField("tpep_dropoff_datetime", TimestampType()),
    StructField("passenger_count", IntegerType()),
    StructField("trip_distance", FloatType()),
    StructField("pickup_longitude", FloatType()),
    StructField("pickup_latitude", FloatType()),
    StructField("RatecodeID", FloatType()),
    StructField("store_and_fwd_flag", StringType()),
    StructField("dropoff_longitude", FloatType()),
    StructField("dropoff_latitude", FloatType()),
    StructField("payment_type", IntegerType()),
    StructField("fare_amount", FloatType()),
    StructField("extra", FloatType()),
    StructField("mta_tax", FloatType()),
    StructField("tip_amount", FloatType()),
    StructField("tolls_amount", FloatType()),
    StructField("improvement_surcharge", FloatType()),
    StructField("total_amount", FloatType())
])
# define schema for green taxi trips
green_taxi_schema = StructType([
    StructField("type", StringType(), nullable=False),
    StructField("VendorID", IntegerType()),
    StructField("lpep_pickup_datetime", TimestampType()),
    StructField("Lpep_dropoff_datetime", TimestampType()),
    StructField("Store_and_fwd_flag", StringType()),
    StructField("RateCodeID", IntegerType()),
    StructField("Pickup_longitude", FloatType()),
    StructField("Pickup_latitude", FloatType()),
    StructField("Dropoff_longitude", FloatType()),
    StructField("Dropoff_latitude", FloatType()),
    StructField("Passenger_count", IntegerType()),
    StructField("Trip_distance", FloatType()),
    StructField("Fare_amount", FloatType()),
    StructField("Extra", FloatType()),
    StructField("MTA_tax", FloatType()),
    StructField("Tip_amount", FloatType()),
    StructField("Tolls_amount", FloatType()),
    StructField("Ehail_fee", FloatType()),
    StructField("improvement_surcharge", FloatType()),
    StructField("Total_amount", FloatType()),
    StructField("Payment_type", IntegerType()),
    StructField("Trip_type", IntegerType())
])

In [5]:
# read the entire data files first
full_df = spark.readStream\
    .option("maxFilesPerTrigger", 100)\
    .csv(f"file://{inputPath}",
         header=False,
         schema=full_schema)

# Filter rows based on the 'type' column
yellow_df = full_df.where(full_df["_c0"] == "yellow").selectExpr(
    *[f"cast(_c{i} as {field.dataType.simpleString()}) as {field.name}" \
        for i, field in enumerate(yellow_taxi_schema.fields)]
)

green_df = full_df.where(full_df["_c0"] == "green").selectExpr(
    *[f"cast(_c{i} as {field.dataType.simpleString()}) as {field.name}" \
        for i, field in enumerate(green_taxi_schema.fields)]
)

In [6]:
sub_yellow = yellow_df.selectExpr("tpep_dropoff_datetime as dropoff_datetime",
                                  "dropoff_longitude", 
                                  "dropoff_latitude")
sub_green = green_df.selectExpr("Lpep_dropoff_datetime as dropoff_datetime",
                                "Dropoff_longitude as dropoff_longitude", 
                                "Dropoff_latitude as dropoff_latitude")

sub_df = sub_yellow.union(sub_green)

In [7]:
goldman = [[-74.0141012, 40.7152191], [-74.013777, 40.7152275], 
           [-74.0141027, 40.7138745], [-74.0144185, 40.7140753]]
citigroup = [[-74.011869, 40.7217236], [-74.009867, 40.721493], 
             [-74.010140,40.720053], [-74.012083, 40.720267]]

In [8]:
@f.udf(returnType=BooleanType())
def in_polygon(x, y, polygon):
    num_vertices = len(polygon)
    inside = False
    
    # first point
    p1 = polygon[0]
    
    # For each edge
    for i in range(1, num_vertices + 1):
        # Next point
        p2 = polygon[i % num_vertices]
        
        # if is above the minimum latitude (y)
        if y > min(p1[1], p2[1]):
            # if is below the maximum latitude (y)
            if y <= max(p1[1], p2[1]):
                # if is to the left of the maximum longitude (x)
                if x <= max(p1[0], p2[0]):
                    # get intersection between horizontal line and the edge
                    x_intersection = (y - p1[1]) * (p2[0] - p1[0]) / (p2[1] - p1[1]) + p1[0]
                    # if the point is on the same line as the edge or to the left of the x-intersection
                    if p1[0] == p2[0] or x <= x_intersection:
                        # modify flag
                        inside = not inside
                        
        # Store the current point as the first point for the next iteration
        p1 = p2
    # Return final value of the flag
    return inside

In [9]:
processed_df = sub_df\
    .withColumn("goldman", in_polygon("dropoff_longitude", "dropoff_latitude", f.lit(goldman)))\
    .withColumn("citigroup", in_polygon("dropoff_longitude", "dropoff_latitude", f.lit(citigroup)))
    
# filter out
processed_df = processed_df.where("goldman == 1 OR citigroup == 1")
processed_df = processed_df.withColumn("headquarter", f.when(f.col("goldman") == 1, "goldman").otherwise("citigroup"))

by_dropoff = processed_df.groupBy("headquarter", f.window(f.col("dropoff_datetime"), "1 hour"))\
    .count()

In [10]:
def foreach_batch_function(batch_df, epoch_id):
    # get endHour column
    tmp_df = batch_df.withColumn("startHour", f.hour(f.col("window.start")))
    
    # for each endHour, write to different directory
    start_hours = tmp_df.select("startHour").distinct().collect()
    
    if len(start_hours) != 0:
        for start_hour in start_hours:
            h_num = start_hour['startHour']
            hour_df = tmp_df.where(f.col("startHour") == h_num).select("headquarter", "count")
            output_dir = f"file://{outputPath}/output-{(h_num + 1) * 60 * 60 * 1000}"
            
            hour_df.write.mode("overwrite").csv(output_dir)

In [11]:
query = by_dropoff.writeStream\
    .outputMode("complete")\
    .foreachBatch(foreach_batch_function)\
    .queryName("Region_Event_Count")\
    .option("checkpointLocation", f"file://{checkpointPath}")\
    .start()
query.awaitTermination()

24/05/17 15:43:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/05/17 15:43:56 WARN HDFSBackedStateStoreProvider: The state for version 1 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/05/17 15:43:56 WARN HDFSBackedStateStoreProvider: The state for version 1 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/05/17 15:43:56 WARN HDFSBackedStateStoreProvider: The state for version 1 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/05/17 15:43:56 WARN HDFSBackedStateStoreProvider: The state for version 1 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first b

KeyboardInterrupt: 

In [12]:
query.stop()
spark.stop()

24/05/17 15:50:51 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor