In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StringType
from pymongo import MongoClient

In [2]:
packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1',
    'org.apache.kafka:kafka-clients:3.2.0'
]

spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

kafka_params = {
    "kafka.bootstrap.servers": "kafka:9092",
    "subscribe": "test-topic",
    "startingOffsets": "earliest"
}



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b7306dd3-a40e-441c-9437-13e1f0ae5bcd;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.apache.kafka#kafka-clients;3.2.0 in central


In [3]:
# Read data from Kafka
raw_data = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_params) \
    .load()

In [4]:
schema = "user_id STRING, video_id STRING, watched_at TIMESTAMP"

In [5]:
# Parse JSON data
parsed_data = raw_data \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Group by date and video_id to count occurrences
trending_videos = parsed_data \
    .groupBy(window(col("watched_at"), "1 day"), "video_id") \
    .count() \
    .orderBy(col("window"), col("count").desc())

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["bigdata"]
collection = db["trending_videos"]

# Define a function to write top 10 videos to MongoDB
def write_to_mongo(rows):
    top_videos = [row.video_id for row in rows[:10]]  # Extract only the top 10 videos
    collection.insert_one({"top_videos": top_videos})

# Start streaming query to continuously update the results
query = trending_videos \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(write_to_mongo) \
    .start()

# Await termination
query.awaitTermination()

24/05/09 01:48:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-85a89290-b673-44e4-9776-feb4c5469970. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/05/09 01:48:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/05/09 01:49:48 ERROR MicroBatchExecution: Query [id = 77750f59-96a1-4930-9797-d12e95cbfabc, runId = dd33a1be-5d82-4dd7-aad1-b0b67f2aebb8] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 581, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/conda/lib/python3.8/s

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 581, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
TypeError: write_to_mongo() takes 1 positional argument but 2 were given

=== Streaming Query ===
Identifier: [id = 77750f59-96a1-4930-9797-d12e95cbfabc, runId = dd33a1be-5d82-4dd7-aad1-b0b67f2aebb8]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[test-topic]]: {"test-topic":{"0":20}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Sort [window#29 ASC NULLS FIRST, count#34L DESC NULLS LAST], true
+- Aggregate [window#35, video_id#24], [window#35 AS window#29, video_id#24, count(1) AS count#34L]
   +- Filter isnotnull(watched_at#25)
      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) as double) = (cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) THEN (CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 86400000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) as double) = (cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) THEN (CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(watched_at#25, TimestampType, LongType) - 0) as double) / cast(86400000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 86400000000) + 0) + 86400000000), LongType, TimestampType)) AS window#35, user_id#23, video_id#24, watched_at#25]
         +- Project [data#21.user_id AS user_id#23, data#21.video_id AS video_id#24, data#21.watched_at AS watched_at#25]
            +- Project [from_json(StructField(user_id,StringType,true), StructField(video_id,StringType,true), StructField(watched_at,TimestampType,true), cast(value#8 as string), Some(Etc/UTC)) AS data#21]
               +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@5d648f88, KafkaV2[Subscribe[test-topic]]
