# Spark Streaming

* Name: Benedictus Bimo Cahyo Wicaksono<br>
* Student ID: 5025201097<br>
* Class: Big Data<br>
* Lecturer: Abdul Munif, S.Kom., M.Sc.

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=2632ac6534d8dfdfea233a85ed5a2e61ab6df3d3e153cfe6ed68ab5b268f3fff
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("News Streaming") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

After installing and import all necessary packages and libraries, I define the Spark Session

In [None]:
schema = StructType([
    StructField("link", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("category", StringType(), True),
    StructField("short_description", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("date", StringType(), True),
])

For the Data Sctructure, I used this schema.

In [23]:
from google.colab import files

uploadedData = files.upload()

Saving news.zip to news (1).zip


Because it is kinda hard for me to import the folder from my Google Drive, I solve this problem by doing upload the file directly.

In [24]:
import zipfile
import io

for file_name in uploadedData:
    if file_name.endswith('.zip'):
        with zipfile.ZipFile(io.BytesIO(uploadedData[file_name]), 'r') as z:
            z.extractall('./input-5025201097')

From the uploaded file, I store it into *uploadedData* and then extract it into *./input-5025201097*

In [25]:
input_folder = "./input-5025201097"
output_folder = "/content/output-5025201097"

In [26]:
news_df = spark.readStream.schema(schema).json(input_folder)

After defining both input and output folders, I read the json files.

In [28]:
grouped_news_df = news_df.groupBy("category").count()

from pyspark.sql.functions import from_unixtime, unix_timestamp, window

news_df_with_timestamp = news_df.withColumn("timestamp", unix_timestamp("date", "yyyy-MM-dd").cast("timestamp"))

watermarked_news_df = news_df_with_timestamp.withWatermark("timestamp", "1 minutes")

grouped_news_df = watermarked_news_df.groupBy(window("timestamp", "1 minutes"), "category").count()

# Write the aggregated results to the output folder
query = grouped_news_df.writeStream \
    .format("json") \
    .option("path", output_folder) \
    .option("checkpointLocation", "/content/checkpoints") \
    .outputMode("append") \
    .start()

I aggregated news articles by category and counts them, adds a timestamp to each article, applies a watermark to drop data that is too old, groups the articles by a window of 1 minute and category, and writes the aggregated results to an output folder in JSON format. The results are appended to the output folder as new articles are received.

In [None]:
query.awaitTermination()

In [30]:
console_query = grouped_news_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

# Write the aggregated results to the output folder
json_query = grouped_news_df.writeStream \
    .format("json") \
    .option("path", output_folder) \
    .option("checkpointLocation", "/content/checkpoints") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

# Wait for the streaming to terminate
console_query.awaitTermination()
json_query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.9/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

I set up two write streams for grouped news data. The first one is to the console and the second one is to a JSON file. Both streams are triggered every 5 seconds and append new data to their respective outputs. The code then waits for both streams to terminate.