In [1]:
!pip install pyspark==3.5.2

Collecting pyspark==3.5.2
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812366 sha256=ae5664796f0bb431998641c27ab716436992478d4cb020ab0b4e4e578be3e77a
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.3
    Uninstalling pyspark-3.5.3:
      Successfully uninstalled pyspark-3.5.3
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from transformers import pipeline
import logging
import os

logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

checkpoint_dir = "/kaggle/working/checkpoints/kafka_to_mongo"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
config = {
    "kafka": {
    "bootstrap.servers":"pkc-921jm.us-east-2.aws.confluent.cloud:9092",
    "security.protocol":"SASL_SSL",
    "sasl.mechanisms":"PLAIN",
    "sasl.username":"M2YWR6HS72YUNDIX",
    "sasl.password":"AJR15A/m51EqJ+/HDMH7OA7aXuihdL3hx45ZbrAgkGShXq04776YXML6iKtCJ6Ou",
    "client.id":"json-serial-producer"
},
    "mongodb": {
        "uri":"mongodb+srv://spark:spark123123@yelp-cluster.1nxmu.mongodb.net/?retryWrites=true&w=majority&appName=yelp-cluster",
        "database":"yelpdb",
        "collection":"enriched_reviews_collection"
    }
}

sentiment_pipeline = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english")

def analyze_sentiment(text):
    if text and isinstance(text, str):
        try:
            result = sentiment_pipeline(text)[0]
            return result['label']
        except Exception as e:
            logging.error(f"Error in sentiment analysis: {e}")
            return "Error"
    return "Empty or Invalid"

sentiment_udf = udf(analyze_sentiment, StringType())

def read_from_kafka_and_write_to_mongo(spark):
    topic = "raw_data_topic"
    
    schema = StructType([
        StructField("review_id",StringType()),
        StructField("user_id",StringType()),
        StructField("business_id",StringType()),
        StructField("stars",FloatType()),
        StructField("useful",IntegerType()),
        StructField("funny",IntegerType()),
        StructField("cool",IntegerType()),
        StructField("text",StringType()),
        StructField("date",StringType())
    ])
    
    stream_df = (spark.readStream
                 .format("kafka")
                 .option("kafka.bootstrap.servers",config['kafka']['bootstrap.servers'])
                 .option("subscribe",topic)
                 .option("kafka.security.protocol", config['kafka']['security.protocol'])
                 .option("kafka.sasl.mechanism",config['kafka']['sasl.mechanisms'])
                 .option("kafka.sasl.jaas.config",
                        f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{config["kafka"]["sasl.username"]}" '
                        f'password="{config["kafka"]["sasl.password"]}";')
                 .option("failOnDataLoss","false")
                 .load()
                )
    parsed_df = stream_df.select(from_json(col('value').cast("string"), schema).alias("data")).select("data.*")
    
    enriched_df = parsed_df.withColumn("sentiment", sentiment_udf(col('text')))
    
    query = (enriched_df.writeStream
             .format("mongodb")
             .option("spark.mongodb.connection.uri", config['mongodb']['uri'])
             .option("spark.mongodb.database", config['mongodb']['database'])
             .option("spark.mongodb.collection", config['mongodb']['collection'])
             .option("checkpointLocation", checkpoint_dir)
             .outputMode("append")
             .start()
             .awaitTermination()
            )
    
if __name__ == "__main__":
    spark = (SparkSession.builder
          .appName("KafkaStreamToMongo")
          .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
          .getOrCreate()
          )
    read_from_kafka_and_write_to_mongo(spark)