<h1>LAB3: SPARK STREAMING</h1>

<h2>Inormation team:</h2>
 21127456 - Vo Cao Tri <br>
 21127608 - Tran Trung Hieu <br>
 21127668 - Dinh Quang Phong <br>

<h3>IMPORT LIBRARY</h3>

In [2]:
import os

In [3]:
import findspark
findspark.init()
import pyspark
# Creating a SparkSession in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Spark Streaming Demonstration")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()
# keep the size of shuffles small
spark.conf.set("spark.sql.shuffle.partitions", "2") 

In [4]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import *

<h3>Task 1: Discover a method to simulate a stream by utilizing data sourced from files</h3>

1. Define input data path

In [None]:
# input path in local filesystem
inputPath = '../data/'

2. Get schema from file data

In [5]:
staticInputDF = (
  spark
    .read    
    .csv(inputPath)
)
schema = staticInputDF.schema

In [6]:
schema

StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', StringType(), True), StructField('_c5', StringType(), True), StructField('_c6', StringType(), True), StructField('_c7', StringType(), True), StructField('_c8', StringType(), True), StructField('_c9', StringType(), True), StructField('_c10', StringType(), True), StructField('_c11', StringType(), True), StructField('_c12', StringType(), True), StructField('_c13', StringType(), True), StructField('_c14', StringType(), True), StructField('_c15', StringType(), True), StructField('_c16', StringType(), True), StructField('_c17', StringType(), True), StructField('_c18', StringType(), True), StructField('_c19', StringType(), True)])

In [7]:
entries = os.listdir("../data")
    
    # Filter out directories, leaving only files
files = [entry for entry in entries if os.path.isfile(os.path.join("../data", entry))]

number_file = len(files)

1440


3. Create spark streaming and check the status

In [None]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(schema)                # Set the schema of the csv data
    .option("maxFilesPerTrigger", number_file)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)
# cast the Timestamp type since it is not automatically parsed
streamingInputDF = streamingInputDF.select(f.col('_c0').alias('Type'), f.col('_c3').alias('Time'))
# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      window(streamingInputDF.Time, "1 hour"))
    .count()
)
# Is this DF actually a streaming DF?
print('Using Spark streaming:', streamingCountsDF.isStreaming)

<h3>Task 2: Query that aggregates the number of trips by drop-off datetime for each hour. </h3>

In [None]:
# This query stores the aggregation results in memory then visualize it
strQuery = (
  streamingCountsDF
    .writeStream
    .format("memory")      
    .queryName("counts")   
    .outputMode("complete") 
    .option("truncate", "false")
    .start()
)
strQuery.awaitTermination(60)

strQuery.stop()

result = spark.sql('select * from counts order by window')

result.show(result.count(), truncate=False)

1. Create folders and save the result of count

In [None]:
countIncrement = 360000
count = countIncrement

for row in result.collect():
    path = f'./output-{count}'

    os.makedirs(path, exist_ok=True)

    with open(f"{path}/output-{count}.txt", "w") as file:
        file.write(str(row['count']))
        
    count += countIncrement


<h3>Task 3: Create a query that counts the number of taxi trips each hour that drop off at either the Goldman Sachs headquarters or the Citigroup headquarters.</h3>

1. Define the map location of Goldman Sachs and Citygroup

In [13]:
goldmanCondition = (
    (col("dropoff_longitude") >= -74.0144185) & (col("dropoff_longitude") <= -74.013777) &
    (col("dropoff_latitude") >= 40.7138745) & (col("dropoff_latitude") <= 40.7152275)
)

citigroupCondition = (
    (col("dropoff_longitude") >= -74.012083) & (col("dropoff_longitude") <= -74.009867) &
    (col("dropoff_latitude") >= 40.720053) & (col("dropoff_latitude") <= 40.7217236)
)

2. Create spark streaming with schema of input data file

In [8]:
streamingInputDF = (
  spark
    .readStream
    .schema(schema)          
    .csv(inputPath)
)

3. Get the column that need to calculate

In [11]:
yellowRecordsDF = streamingInputDF.filter(f.col('_c0') == 'yellow')
yellowRecordsDF = yellowRecordsDF.select(f.col('_c0').alias('Type'),f.col('_c10').alias('dropoff_longitude'), f.col('_c11').alias('dropoff_latitude'), f.col('_c3').alias('dropoff_datetime'))

greenRecordsDF = streamingInputDF.filter(f.col('_c0') == 'green')
greenRecordsDF = greenRecordsDF.select(f.col('_c0').alias('Type'),f.col('_c8').alias('dropoff_longitude'), f.col('_c9').alias('dropoff_latitude'), f.col('_c3').alias('dropoff_datetime'))


4. Filter the taxi trips that drop-off either Goldman Sachs headquarters or the Citigroup headquarters

In [14]:
df = yellowRecordsDF.union(greenRecordsDF)

goldmanDF = df.filter(goldmanCondition).withColumn("headquarters", lit("goldman"))
citigroupDF = df.filter(citigroupCondition).withColumn("headquarters", lit("citigroup"))


In [15]:
filteredDF = goldmanDF.union(citigroupDF)

5. Count the number of trips using spark streaming

In [16]:
streamingCount = (                 
    filteredDF
    .groupBy( 
      filteredDF.Type,
      window(filteredDF.dropoff_datetime, "1 hour"), filteredDF.headquarters)
    .count()
)

# This query stores the aggregation results in memory then visualize it
query = (
  streamingCount
    .writeStream
    .format("memory")         # console or memory(= store in-memory table) 
    .queryName("counts")      # counts = name of the in-memory table
    .outputMode("complete")   
    .option("truncate", "false")
    .start()
)
query.awaitTermination(60)

query.stop()

result = spark.sql('select * from counts order by window')


6. Show the result

In [17]:
result.show()

+------+--------------------+------------+-----+
|Action|              window|headquarters|count|
+------+--------------------+------------+-----+
|yellow|{2015-12-01 00:00...|   citigroup|    6|
|yellow|{2015-12-01 01:00...|   citigroup|    2|
|yellow|{2015-12-01 02:00...|   citigroup|    2|
|yellow|{2015-12-01 03:00...|   citigroup|    2|
|yellow|{2015-12-01 04:00...|   citigroup|    1|
|yellow|{2015-12-01 05:00...|   citigroup|   11|
|yellow|{2015-12-01 05:00...|     goldman|    8|
|yellow|{2015-12-01 06:00...|     goldman|   28|
|yellow|{2015-12-01 06:00...|   citigroup|   70|
|yellow|{2015-12-01 07:00...|   citigroup|   93|
|yellow|{2015-12-01 07:00...|     goldman|   44|
| green|{2015-12-01 07:00...|   citigroup|    2|
|yellow|{2015-12-01 08:00...|   citigroup|   75|
| green|{2015-12-01 08:00...|   citigroup|    1|
|yellow|{2015-12-01 08:00...|     goldman|   59|
| green|{2015-12-01 09:00...|     goldman|    2|
|yellow|{2015-12-01 09:00...|   citigroup|   75|
|yellow|{2015-12-01 

In [19]:
goldman_res = result.filter(f.col('headquarters') == 'goldman')
goldman_res.show(truncate=False)

+------+------------------------------------------+------------+-----+
|Action|window                                    |headquarters|count|
+------+------------------------------------------+------------+-----+
|yellow|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|goldman     |8    |
|yellow|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|goldman     |28   |
|yellow|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|goldman     |44   |
|yellow|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|goldman     |59   |
|green |{2015-12-01 09:00:00, 2015-12-01 10:00:00}|goldman     |2    |
|yellow|{2015-12-01 09:00:00, 2015-12-01 10:00:00}|goldman     |70   |
|yellow|{2015-12-01 10:00:00, 2015-12-01 11:00:00}|goldman     |56   |
|green |{2015-12-01 10:00:00, 2015-12-01 11:00:00}|goldman     |2    |
|green |{2015-12-01 11:00:00, 2015-12-01 12:00:00}|goldman     |2    |
|yellow|{2015-12-01 11:00:00, 2015-12-01 12:00:00}|goldman     |32   |
|yellow|{2015-12-01 12:00:00, 2015-12-01 13:00:00}|goldman     |26   |
|yello

In [20]:
citigroup_res = result.filter(f.col('headquarters') == 'citigroup')
citigroup_res.show(truncate=False)

+------+------------------------------------------+------------+-----+
|Action|window                                    |headquarters|count|
+------+------------------------------------------+------------+-----+
|yellow|{2015-12-01 00:00:00, 2015-12-01 01:00:00}|citigroup   |6    |
|yellow|{2015-12-01 01:00:00, 2015-12-01 02:00:00}|citigroup   |2    |
|yellow|{2015-12-01 02:00:00, 2015-12-01 03:00:00}|citigroup   |2    |
|yellow|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|citigroup   |2    |
|yellow|{2015-12-01 04:00:00, 2015-12-01 05:00:00}|citigroup   |1    |
|yellow|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|citigroup   |11   |
|yellow|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|citigroup   |70   |
|yellow|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|citigroup   |93   |
|green |{2015-12-01 07:00:00, 2015-12-01 08:00:00}|citigroup   |2    |
|yellow|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|citigroup   |75   |
|green |{2015-12-01 08:00:00, 2015-12-01 09:00:00}|citigroup   |1    |
|yello

7. Create folders and save the result of count

In [21]:
count_increment = 360000
count = count_increment

for row, row1 in zip(goldman_res.collect(), citigroup_res.collect()):
    new_path = f'./output-{count}'

    os.makedirs(new_path, exist_ok=True)

    with open(f"{new_path}/output-{count}.txt", "w") as file:
        file.write(f"goldman: {str(row['count'])}\ncitigroup: {row1['count']}")
        
    count += count_increment

In [None]:
streamingCount = (                 
    filteredDF
    .groupBy( 
      filteredDF.Type,
      window(filteredDF.dropoff_datetime, "10 minutes"), filteredDF.headquarters)
    .count()
)

# This query stores the aggregation results in memory then visualize it
query = (
  streamingCount
    .writeStream
    .format("memory")         # console or memory(= store in-memory table) 
    .queryName("counts")      # counts = name of the in-memory table
    .outputMode("complete")   
    .option("truncate", "false")
    .start()
)
query.awaitTermination(60)

query.stop()

In [None]:
def process_batch(batch_df, batch_id):
        batch_df.persist()
        goldman_trends = detect_trends(batch_df, "goldman")
        citigroup_trends = detect_trends(batch_df, "citigroup")
        goldman_trends.show(truncate=False)
        citigroup_trends.show(truncate=False)
        batch_df.unpersist()

def detect_trends(df, headquarters):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import lag

    w = Window.partitionBy("headquarters").orderBy("window")

    df = df.filter(col("headquarters") == headquarters)
    df = df.withColumn("prev_count", lag("count").over(w))

    df = df.filter((col("count") >= 10) & (col("prev_count").isNotNull()))
    df = df.filter(col("count") >= 2 * col("prev_count"))

    df.selectExpr(
        "headquarters",
        "count as current_count",
        "window.start as timestamp",
        "prev_count"
    ).write.mode("append").json(output_path)

    return df.selectExpr(
        "headquarters",
        "prev_count",
        "count as current_count",
        "window.start as timestamp"
    )