# Streaming Youtube Comments Using kafka-pyspark Integration

This project uses youtube comments scraped off of the youtube API and saved ina  json file to explore data streaming using a kafka-pyspark integration. 

## A. Set up the docker environment, open Jupyter notebook
After downloading and running Docker Desktop, we used the `docker-compose.yaml` source file from [this github repository](https://github.com/subhamkharwal/docker-images) to create the required docker containers for our kafka-pyspark pipeline.

First, we compose the docker containers in the cmd, using the command:

`docker compose up`

Once the containers are up and running, we can open this jupyter notebook using the link: 

`localhost:8888`

## B. Create a Spark Session
We then create a spark session, and download the necessary jar packages for running kafka. In this case, the jar packages are: 

`'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0'`

In [1]:
#create a spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Streaming from Kafka")
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]")
    .getOrCreate()
)

#check if spark session is active
spark

The spark session is active, as the spark UI is visible. We can use the link "localhost:4040" to access the Spark UI and monitor the jobs in progress in the application. 

## C. Configure the kafka container
Next, in command prompt, we connect to the kafka container in the cmd using the following command:

`docker exec -it abd99b4a9cf2a00a745dc72aa2358b375e09a38f2506502650fc67290577f466 /bin/bash`

We will then create a new topic known as *comments-data*, using the command:

`kafka-topics --create --topic tweet-data --bootstrap-server localhost:29092`

To view if the topic has been created, we use the command: 

`kafka-topics --list --bootstrap-server localhost:29092`

We then pase some data into the topic, using the command:

`kafka-console-producer --topic comments-data --bootstrap-server localhost:29092`

## D. Create and configure Kafka_df
This is the dataframe that will be storing the streaming comments

In [2]:
#create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "comments-data")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
#View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
#Parse value from binary to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [7]:
#kafka_json_df.show()

+----+--------------------+-------------+---------+------+--------------------+-------------+
| key|               value|        topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-------------+---------+------+--------------------+-------------+
|null|{"author": "@bish...|comments-data|        0|     0|2024-04-04 03:35:...|            0|
|null|                    |comments-data|        0|     1|2024-04-04 03:35:...|            0|
|null|{"author": "@bish...|comments-data|        0|     2|2024-04-04 03:36:...|            0|
+----+--------------------+-------------+---------+------+--------------------+-------------+



We then specify the schema of the comments: 

In [5]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = StructType([
    StructField('author', StringType(), True),
    StructField('published_at', StringType(), True),
    StructField('updated_at', StringType(), True),
    StructField('like_count', StringType(), True),
    StructField('text', StringType(), True)
])

 

In [6]:
#Apply the schema to payload to read data
from pyspark.sql.functions import from_json, col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [7]:
#To the shcema of the data, place a sample json file and change readstream to read
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- author: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- like_count: string (nullable = true)
 |-- text: string (nullable = true)



## Post the Streaming Comments to kafka
We then run the following commands in the command prompt to run the post_to_kafka.py

`pip install kafka-python`

`python /home/jovyan/kafka_spark_streaming/utils/post_to_kafka.py`

## Write the streaming comments to the Kafka Container

In [None]:
#write the output to console sink to check the output- batch mode
(streaming_df
.writeStream
.format("console")
.outputMode('append')
.trigger(processingTime = '10 seconds')
.option("checkpointLocation", "checkpoint_dir_kafka2")
.start()
.awaitTermination())