### Streaming Data from Kafka

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType

In [5]:
spark = SparkSession.builder \
    .appName("KafkaSparkConsumer1") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/kafka_checkpoint") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/anaconda3/envs/data_eng/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/bharathvelamala/.ivy2/cache
The jars for the packages stored in: /Users/bharathvelamala/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-06f90e5c-8985-46d0-974b-f3bd8e2abe38;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 323ms :: a

In [6]:
channel_schema = StructType() \
    .add("channel_id", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("custom_url", StringType()) \
    .add("published_at", StringType()) \
    .add("country", StringType()) \
    .add("subscriber_count", IntegerType()) \
    .add("view_count", IntegerType()) \
    .add("video_count", IntegerType()) \
    .add("hidden_subscriber_count", BooleanType()) \
    .add("high_thumbnail", StringType())

In [7]:
video_schema = StructType() \
    .add("video_id", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("description_summary", StringType()) \
    .add("channel_id", StringType()) \
    .add("channel_title", StringType()) \
    .add("published_at", StringType()) \
    .add("published_year", StringType()) \
    .add("view_count", IntegerType()) \
    .add("like_count", IntegerType()) \
    .add("comment_count", IntegerType()) \
    .add("favorite_count", IntegerType()) \
    .add("engagement_ratio", IntegerType()) \
    .add("likes_per_view", IntegerType()) \
    .add("comments_per_view", IntegerType()) \
    .add("thumbnail_url", StringType()) \
    .add("thumbnail_width", IntegerType()) \
    .add("thumbnail_height", IntegerType()) \
    .add("duration", StringType()) \
    .add("definition", StringType()) \
    .add("caption", BooleanType()) \
    .add("licensed_content", BooleanType()) \
    .add("tags", StringType()) \
    .add("tag_count", IntegerType()) \
    .add("category_id", StringType()) \
    .add("live_broadcast_content", StringType()) \
    .add("default_language", StringType()) \
    .add("default_audio_language", StringType()) \
    .add("privacy_status", StringType()) \
    .add("upload_status", StringType()) \
    .add("embeddable", BooleanType()) \
    .add("made_for_kids", BooleanType()) \
    .add("title_length", IntegerType()) \
    .add("description_length", IntegerType()) \
    .add("has_hashtags", BooleanType())

In [8]:
comment_schema = StructType() \
    .add("comment_id", StringType()) \
    .add("video_id", StringType()) \
    .add("author", StringType()) \
    .add("content", StringType()) \
    .add("published_at", StringType())

In [9]:
captions_schema = StructType() \
    .add("video_id", StringType()) \
    .add("caption_id", StringType()) \
    .add("language", StringType()) \
    .add("caption_text", StringType())

In [10]:
kafka_video_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_video_info") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_channel_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_channel_info") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_comment_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_video_comments") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_captions_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "youtube_video_captions") \
    .option("startingOffsets", "earliest") \
    .load()

In [11]:
video_parsed_df = kafka_video_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), video_schema).alias("data")) \
    .select("data.*")

channel_parsed_df = kafka_channel_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), channel_schema).alias("data")) \
    .select("data.*")

comment_parsed_df = kafka_comment_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), comment_schema).alias("data")) \
    .select("data.*")

captions_parsed_df = kafka_captions_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), captions_schema).alias("data")) \
    .select("data.*")

25/03/09 00:31:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [12]:
video_parsed_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- description_summary: string (nullable = true)
 |-- channel_id: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- published_year: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- engagement_ratio: integer (nullable = true)
 |-- likes_per_view: integer (nullable = true)
 |-- comments_per_view: integer (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- thumbnail_width: integer (nullable = true)
 |-- thumbnail_height: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- definition: string (nullable = true)
 |-- caption: boolean (nullable = true)
 |-- licensed_content: boolean (nullable = true)
 |-- tags

In [13]:
channel_parsed_df.printSchema()

root
 |-- channel_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- custom_url: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- country: string (nullable = true)
 |-- subscriber_count: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- video_count: integer (nullable = true)
 |-- hidden_subscriber_count: boolean (nullable = true)
 |-- high_thumbnail: string (nullable = true)



In [14]:
comment_parsed_df.printSchema()

root
 |-- comment_id: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- published_at: string (nullable = true)



In [15]:
captions_parsed_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- caption_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- caption_text: string (nullable = true)



In [16]:
video_query = video_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Await termination for 10 seconds
video_query.awaitTermination(10)

25/03/09 00:31:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/09 00:31:41 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+----------+----------+-------------+--------------+----------------+--------------+-----------------+--------------------+---------------+----------------+--------+----------+-------+----------------+--------------------+---------+-----------+----------------------+----------------+----------------------+--------------+-------------+----------+-------------+------------+------------------+------------+
|   video_id|               title|         description| description_summary|          channel_id|       channel_title|        published_at|published_year|view_count|like_count|comment_count|favorite_count|engagement_ratio|likes_per_view|comments_per_view|       thumbnail_url|thumbnail_width|thumbnail_height|duration|definition|caption|licens

False

In [17]:

channel_query = channel_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

channel_query.awaitTermination(10)

25/03/09 00:31:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/09 00:31:51 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|          channel_id|               title|         description|          custom_url|        published_at|country|subscriber_count|view_count|video_count|hidden_subscriber_count|      high_thumbnail|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+----------------+----------+-----------+-----------------------+--------------------+
|UCY1kMZp36IQSyNx_...|          Mark Rober|Former NASA engin...|          @markrober|2011-10-20T06:17:58Z|     US|        65000000|      NULL|        199|                  false|https://yt3.ggpht...|
|UCHnyfMqiRRG1u-2M...|          Veritasium|An element of tru...|         @veritasium|2010-07-21T07:18:0

False

In [18]:
comment_query = comment_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

comment_query.awaitTermination(10)

25/03/09 00:32:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/09 00:32:01 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-----------+------+-------+--------------------+
|          comment_id|   video_id|author|content|        published_at|
+--------------------+-----------+------+-------+--------------------+
|Ugxqcek3RUzxWv-4V...|rfscVS0vtbw|  NULL|   NULL|2021-03-09T18:41:00Z|
|Ugwq4YOvmRyQ2fsLw...|rfscVS0vtbw|  NULL|   NULL|2025-03-08T20:43:55Z|
|UgwVFMfeiPB31-qw2...|rfscVS0vtbw|  NULL|   NULL|2025-03-08T17:51:36Z|
|Ugzawl_y07yvoVdXG...|rfscVS0vtbw|  NULL|   NULL|2025-03-08T17:12:55Z|
|Ugz7QegE71kp7xEFe...|rfscVS0vtbw|  NULL|   NULL|2025-03-07T12:56:53Z|
|Ugz2N2o8Ng9dxWCkc...|HeQX2HjkcNo|  NULL|   NULL|2025-03-08T20:21:54Z|
|Ugxr_KxzAi9S1Pea5...|HeQX2HjkcNo|  NULL|   NULL|2025-03-08T18:13:12Z|
|UgxCz9B4OURllu82-...|HeQX2HjkcNo|  NULL|   NULL|2025-03-08T00:25:02Z|
|Ugz64UrqfYiZAEYxd...|HeQX2HjkcNo|  NULL|   NULL|2025-03-07T22:31:47Z|
|UgwZyfVDT6rVGiqZW...|HeQX2HjkcNo|  NULL|   NULL|20

False

In [19]:
captions_query = captions_parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

captions_query.awaitTermination(10)

25/03/09 00:32:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/09 00:32:11 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+--------+------------+
|   video_id|          caption_id|language|caption_text|
+-----------+--------------------+--------+------------+
|rfscVS0vtbw|AUieDaZvswHvw8s1O...|      es|        NULL|
|rfscVS0vtbw|AUieDaalEVHfBYqan...|      ro|        NULL|
|rfscVS0vtbw|AUieDaafT99UerDMT...|      hi|        NULL|
|rfscVS0vtbw|AUieDaZMHyzB52Qby...|   zh-CN|        NULL|
|rfscVS0vtbw|AUieDaYrzqWNIrQUq...|      bg|        NULL|
|rfscVS0vtbw|AUieDabvb8bdpB5FA...|      id|        NULL|
|rfscVS0vtbw|AUieDaZRdp8cgoDta...|      pl|        NULL|
|rfscVS0vtbw|AUieDabmSOqF0gefj...|      pt|        NULL|
|rfscVS0vtbw|AUieDaY01nROBDzz-...|      ru|        NULL|
|rfscVS0vtbw|AUieDaZx0-GUIUgil...|      ko|        NULL|
|rfscVS0vtbw|AUieDaYbLR4VDZKJI...|      ar|        NULL|
|rfscVS0vtbw|AUieDabnPBrrcphB9...|      en|        NULL|
|rfscVS0vtbw|AUieDabnU-QoWqFDg...|      iw|     

False