# Costumer Kafka

In [1]:
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('MoviesRatings' , bootstrap_servers=['localhost:9092'])

def process_msg(msg):
    print(msg.offset)
    dico = dict(json.loads(msg.value))
    print(dico)

for msg in consumer:
    process_msg(msg)

79
{'userId': 79, 'movieId': 835, 'rating': 1.9}
80
{'userId': 3, 'movieId': 152, 'rating': 1.6}
81
{'userId': 45, 'movieId': 263, 'rating': 2.0}
82
{'userId': 48, 'movieId': 685, 'rating': 3.7}
83
{'userId': 78, 'movieId': 956, 'rating': 3.4}
84
{'userId': 10, 'movieId': 134, 'rating': 2.4}
85
{'userId': 33, 'movieId': 972, 'rating': 1.6}
86
{'userId': 17, 'movieId': 112, 'rating': 4.7}
87
{'userId': 17, 'movieId': 868, 'rating': 1.4}
88
{'userId': 73, 'movieId': 911, 'rating': 2.3}


KeyboardInterrupt: 

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, IntegerType, FloatType
from pyspark.ml.recommendation import ALSModel

# 1) SparkSession avec le connecteur Kafka
spark = SparkSession.builder \
    .appName("KafkaConsumerTest") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
    .getOrCreate()

# 2) Schéma du JSON
schema = StructType() \
    .add("userId", IntegerType()) \
    .add("movieId", IntegerType()) \
    .add("rating", FloatType())

# 3) Lecture du stream Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "MoviesRatings") \
    .option("startingOffsets", "earliest") \
    .load()

# 4) Parsing du JSON
parsed = df.selectExpr("CAST(value AS STRING) AS json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

# 5) Charger le modèle
model = ALSModel.load("hdfs://namenode:9000/models/als")

# 6) Appliquer les prédictions
predictions = model.transform(parsed)

# 7) Afficher les données entrantes
query_raw = parsed.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

# 8) Afficher les prédictions
query_pred = predictions.select("userId", "movieId", "prediction") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

# 9) Attendre la fin
query_pred.awaitTermination()


25/05/04 15:46:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/05/04 15:46:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-98dddfbc-ee00-4a0a-8dd3-2ce381e6d74e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/04 15:46:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/04 15:46:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7954df3c-e00c-4852-96bc-9bc4b0a873ae. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp 

-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    44|     32|  4.142208|
|    92|    823| 3.6180375|
|    40|    732|   2.24864|
|     9|    924| 3.0846865|
|     3|    105| 3.2714458|
|    15|    345| 2.8742886|
|    80|    920| 3.8585312|
|    54|    147| 3.1860905|
|    88|    510| 1.2883444|
|    15|    584| 2.3848782|
|     8|    584| 3.3149283|
|    51|    474| 3.7456388|
|    71|    282| 3.6165023|
|    82|    917|  3.519478|
|    28|    627| 3.1859777|
|    98|    263| 3.4373155|
|    73|     15| 1.8240993|
|    27|    698|  2.658212|
|    87|    625| 2.4870253|
|    20|    790|   2.84205|
+------+-------+----------+
only showing top 20 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    44|     32|   1.9|
|    92|    

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, IntegerType, FloatType
import pymongo

# 1. SparkSession avec packages nécessaires
spark = SparkSession.builder \
    .appName("KafkaToMongoTest") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# 2. Schéma du JSON attendu depuis Kafka
schema = StructType() \
    .add("userId", IntegerType()) \
    .add("movieId", IntegerType()) \
    .add("rating", FloatType())

# 3. Lecture du topic Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "MoviesRatings") \
    .option("startingOffsets", "earliest") \
    .load()

# 4. Décoder le JSON
parsed = df.selectExpr("CAST(value AS STRING) AS json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

# 5. Fonction pour écrire dans MongoDB
def write_to_mongo(batch_df, batch_id):
    if not batch_df.isEmpty():
        docs = [row.asDict() for row in batch_df.collect()]
        client = pymongo.MongoClient("mongodb://localhost:27017/")
        db = client["ratings"]
        collection = db["test_predictions"]
        collection.insert_many(docs)

# 6. Déclencher le stream
query = parsed.writeStream \
    .foreachBatch(write_to_mongo) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/test") \
    .start()

query.awaitTermination()


25/05/04 16:15:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/05/04 16:15:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/04 16:15:20 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/05/04 16:15:54 ERROR MicroBatchExecution: Query [id = 3ab88abf-17ea-4711-bf66-58807e854b66, runId = 5c075444-d4f4-4cff-8a04-e8975bcf1821] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 120, in call
    raise e


StreamingQueryException: [STREAM_FAILED] Query [id = 3ab88abf-17ea-4711-bf66-58807e854b66, runId = 5c075444-d4f4-4cff-8a04-e8975bcf1821] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/ipykernel_4584/1257270886.py", line 39, in write_to_mongo
    collection.insert_many(docs)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/_csot.py", line 119, in csot_wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/collection.py", line 975, in insert_many
    blk.execute(write_concern, session, _Op.INSERT)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/bulk.py", line 751, in execute
    return self.execute_command(generator, write_concern, session, operation)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/bulk.py", line 604, in execute_command
    _ = client._retryable_write(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 2061, in _retryable_write
    return self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 1947, in _retry_with_session
    return self._retry_internal(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/_csot.py", line 119, in csot_wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 1993, in _retry_internal
    ).run()
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 2730, in run
    return self._read() if self._is_read else self._write()
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 2840, in _write
    self._server = self._get_server()
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 2823, in _get_server
    return self._client._select_server(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/mongo_client.py", line 1812, in _select_server
    server = topology.select_server(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/topology.py", line 409, in select_server
    server = self._select_server(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/topology.py", line 387, in _select_server
    servers = self.select_servers(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/topology.py", line 294, in select_servers
    server_descriptions = self._select_servers_loop(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/synchronous/topology.py", line 344, in _select_servers_loop
    raise ServerSelectionTimeoutError(
pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 6817929badf9ca1d90201b9b, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>


In [8]:
from pymongo import MongoClient

def write_to_mongo(batch_df, batch_id):
    if not batch_df.rdd.isEmpty():
        docs = [row.asDict() for row in batch_df.collect()]
        try:
            client = MongoClient("mongodb://localhost:27017/")
            db = client["MongoTest"]  # nom de ta base
            collection = db["ratings"]  # nom de la collection
            collection.insert_many(docs)
            print(f"✅ Batch {batch_id} inséré dans MongoDB")
        except Exception as e:
            print(f"❌ Erreur MongoDB pour batch {batch_id} :", e)
