In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from shapely.geometry import Point, Polygon
import pyspark.sql.utils
import os

In [2]:
# change this parameter accordingly to your machine
FILE_DIR = "file:///home/p4stwi2x/Desktop/abs/taxi-data/"

spark = SparkSession.builder.master("local")\
          .appName("RegionEventCount")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "2")

In [3]:
csvSchema = StructType([StructField("type", StringType(), True),
                        StructField("VendorID", IntegerType(), True),
                        StructField("tpep_pickup_datetime", TimestampType(), True),
                        StructField("tpep_dropoff_datetime",TimestampType(), True),

                        StructField("blankCol1", StringType(), True),
                        StructField("blankCol2", StringType(), True),
                        StructField("blankCol3", StringType(), True),
                        StructField("blankCol4", StringType(), True),

                        StructField("long_green",DoubleType(), True),
                        StructField("lat_green", DoubleType(), True),
                        StructField("long_yellow", DoubleType(), True),
                        StructField("lat_yellow", DoubleType(), True)])

In [4]:
streamingInputDF_ex03 = (
  spark
    .readStream
    .schema(csvSchema)
    # .option("maxFilesPerTrigger", 1)
    .csv(FILE_DIR)
)

df_with_lat_long = streamingInputDF_ex03.withColumn(
    "lat",
    when(col("type") == "green", col("lat_green"))
    .when(col("type") == "yellow", col("lat_yellow"))
).withColumn(
    "long",
    when(col("type") == "green", col("long_green"))
    .when(col("type") == "yellow", col("long_yellow"))
)

df_with_lat_long.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- blankCol1: string (nullable = true)
 |-- blankCol2: string (nullable = true)
 |-- blankCol3: string (nullable = true)
 |-- blankCol4: string (nullable = true)
 |-- long_green: double (nullable = true)
 |-- lat_green: double (nullable = true)
 |-- long_yellow: double (nullable = true)
 |-- lat_yellow: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



In [5]:
goldman = [[-74.0141012, 40.7152191], [-74.013777, 40.7152275], [-74.0141027, 40.7138745], [-74.0144185, 40.7140753]]
citigroup = [[-74.011869, 40.7217236], [-74.009867, 40.721493], [-74.010140,40.720053], [-74.012083, 40.720267]]
def get_HQ(long, lat):
    pt = Point(long, lat)
    gd_p = Polygon(goldman)
    ct_p = Polygon(citigroup)

    
    if gd_p.contains(pt):
        return "goldman"
    elif ct_p.contains(pt):
        return "citigroup"
    return "unknown"

spark.udf.register("getHQ", get_HQ)

<function __main__.get_HQ(long, lat)>

In [6]:
df_with_lat_long = df_with_lat_long\
    .withColumn("drop_loc", expr("getHQ(long, lat)"))\
    .where(col("drop_loc") != "unknown")

df_with_lat_long.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- blankCol1: string (nullable = true)
 |-- blankCol2: string (nullable = true)
 |-- blankCol3: string (nullable = true)
 |-- blankCol4: string (nullable = true)
 |-- long_green: double (nullable = true)
 |-- lat_green: double (nullable = true)
 |-- long_yellow: double (nullable = true)
 |-- lat_yellow: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- drop_loc: string (nullable = true)



In [7]:
streamingCountsDF = (
  df_with_lat_long.select("drop_loc", "tpep_dropoff_datetime")
    .groupBy(
        df_with_lat_long.drop_loc,
        window(df_with_lat_long.tpep_dropoff_datetime, "10 minutes"))
    .count()
)

query = (
  streamingCountsDF
    .writeStream
    .format("memory")         # console or memory(= store in-memory table)
    .queryName("counts")      # counts = name of the in-memory table
    .outputMode("complete")
    # .option("truncate", "false")
    .start()
)

query.processAllAvailable()

In [8]:
query.stop()

In [9]:
try:
    spark.sql('select drop_loc , window, count from counts order by window').show(truncate=False)
    print ("Query executed")
except pyspark.sql.utils.AnalysisException:
    print("Unable to process your query!!")

+---------+------------------------------------------+-----+
|drop_loc |window                                    |count|
+---------+------------------------------------------+-----+
|citigroup|{2015-12-01 00:10:00, 2015-12-01 00:20:00}|1    |
|citigroup|{2015-12-01 00:20:00, 2015-12-01 00:30:00}|2    |
|citigroup|{2015-12-01 00:50:00, 2015-12-01 01:00:00}|2    |
|citigroup|{2015-12-01 01:00:00, 2015-12-01 01:10:00}|1    |
|citigroup|{2015-12-01 01:40:00, 2015-12-01 01:50:00}|1    |
|citigroup|{2015-12-01 02:40:00, 2015-12-01 02:50:00}|1    |
|citigroup|{2015-12-01 04:00:00, 2015-12-01 04:10:00}|1    |
|citigroup|{2015-12-01 05:10:00, 2015-12-01 05:20:00}|1    |
|goldman  |{2015-12-01 05:20:00, 2015-12-01 05:30:00}|1    |
|citigroup|{2015-12-01 05:40:00, 2015-12-01 05:50:00}|1    |
|goldman  |{2015-12-01 05:50:00, 2015-12-01 06:00:00}|2    |
|citigroup|{2015-12-01 05:50:00, 2015-12-01 06:00:00}|6    |
|goldman  |{2015-12-01 06:00:00, 2015-12-01 06:10:00}|1    |
|goldman  |{2015-12-01 0

In [10]:
try:
    spark.sql('SELECT drop_loc, window, count, prev FROM (SELECT  drop_loc, window, count, lag(count, 1, NULL) OVER (ORDER BY drop_loc, window) AS prev FROM counts) WHERE count >= 10 and count >= 2 * prev AND prev IS NOT NULL').show(100,truncate=False)
    print ("Query executed")
except pyspark.sql.utils.AnalysisException:
    print("Unable to process your query!!")

+---------+------------------------------------------+-----+----+
|drop_loc |window                                    |count|prev|
+---------+------------------------------------------+-----+----+
|citigroup|{2015-12-01 08:50:00, 2015-12-01 09:00:00}|12   |3   |
|citigroup|{2015-12-01 14:00:00, 2015-12-01 14:10:00}|10   |3   |
+---------+------------------------------------------+-----+----+

Query executed


In [11]:
try:
    spark.sql('SELECT drop_loc, window, count, prev FROM (SELECT  drop_loc, window, count, lag(count, 1, NULL) OVER (ORDER BY window) AS prev FROM counts WHERE count >= 10)').show(100, truncate=False)
    print ("Query executed")
except pyspark.sql.utils.AnalysisException:
    print("Unable to process your query!!")

+---------+------------------------------------------+-----+----+
|drop_loc |window                                    |count|prev|
+---------+------------------------------------------+-----+----+
|citigroup|{2015-12-01 06:40:00, 2015-12-01 06:50:00}|13   |NULL|
|citigroup|{2015-12-01 06:50:00, 2015-12-01 07:00:00}|18   |13  |
|citigroup|{2015-12-01 07:00:00, 2015-12-01 07:10:00}|15   |18  |
|citigroup|{2015-12-01 07:10:00, 2015-12-01 07:20:00}|10   |15  |
|citigroup|{2015-12-01 07:20:00, 2015-12-01 07:30:00}|10   |10  |
|citigroup|{2015-12-01 07:40:00, 2015-12-01 07:50:00}|10   |10  |
|citigroup|{2015-12-01 08:00:00, 2015-12-01 08:10:00}|11   |10  |
|citigroup|{2015-12-01 08:20:00, 2015-12-01 08:30:00}|14   |11  |
|citigroup|{2015-12-01 08:50:00, 2015-12-01 09:00:00}|12   |14  |
|citigroup|{2015-12-01 09:00:00, 2015-12-01 09:10:00}|16   |12  |
|goldman  |{2015-12-01 09:20:00, 2015-12-01 09:30:00}|11   |16  |
|citigroup|{2015-12-01 09:30:00, 2015-12-01 09:40:00}|12   |11  |
|goldman  

In [12]:
output_path_ex04 = 'file:///home/p4stwi2x/Desktop/abs/output_task_4'

trending = spark.sql('SELECT drop_loc, window, count, prev FROM (SELECT  drop_loc, window, count, lag(count, 1, NULL) OVER (ORDER BY drop_loc, window) AS prev FROM counts) WHERE count >= 10 and count >= 2 * prev AND prev IS NOT NULL')\
          .withColumn("temp", (minute('window.start')+10) * 6000)

minute_count = spark.sql('SELECT * from counts ORDER BY window')\
          .withColumn("temp", (minute('window.start')+10)*60000)

for trend in trending.collect():
    trend_timestamp_count = trend['temp']
    trend_headquarter = trend['drop_loc']
    trend_windows_count = trend['count']
    trend_windows_data = trend['window']
    prev_count = trend['prev']

    headquarter = "Goldman Sachs" if (trend_headquarter == "goldman") else "Citigroup"
    print(f"The number of arrivals to {headquarter} has doubled from {prev_count} to {trend_windows_count} at {trend_windows_data}!")

    output_dir = os.path.join(output_path_ex04, f"output-{trend_timestamp_count}")

    df_windows = spark.createDataFrame([(trend_headquarter,trend_windows_count,trend_timestamp_count,prev_count)], ["headquarter","current_value","timestamp","prev_value"])

    df_windows.write.mode("overwrite")\
            .format("csv")\
            .option("header", "true")\
            .save(output_dir)

    print(f"Timestamp {trend_timestamp_count} has been exported to folder {output_dir}")

log_path = os.path.join(output_path_ex04, "output.log")

if not os.path.exists(output_path_ex04):
    os.makedirs(output_path_ex04)
if not os.path.exists(log_path):
  with open(log_path, 'a') as file:  # /content/drive/MyDrive/data/output_task_4/output.log'
      file.write(f"Number of arrivals to Goldman Sachs : " + str(minute_count.filter(minute_count.drop_loc == "goldman").count()) + "\n")
      file.write(f"Number of arrivals to Citigroup : " + str(minute_count.filter(minute_count.drop_loc == "citigroup").count()))

print(f"Number of arrivals to Goldman Sachs : " + str(minute_count.filter(minute_count.drop_loc == "goldman").count()))
print(f"Number of arrivals to Citigroup : " + str(minute_count.filter(minute_count.drop_loc == "citigroup").count()))



The number of arrivals to Citigroup has doubled from 3 to 12 at Row(start=datetime.datetime(2015, 12, 1, 8, 50), end=datetime.datetime(2015, 12, 1, 9, 0))!
Timestamp 360000 has been exported to folder file:///home/p4stwi2x/Desktop/abs/output_task_4/output-360000
The number of arrivals to Citigroup has doubled from 3 to 10 at Row(start=datetime.datetime(2015, 12, 1, 14, 0), end=datetime.datetime(2015, 12, 1, 14, 10))!
Timestamp 60000 has been exported to folder file:///home/p4stwi2x/Desktop/abs/output_task_4/output-60000
Number of arrivals to Goldman Sachs : 59
Number of arrivals to Citigroup : 111


In [13]:
spark.stop()