In [10]:
from kafka import KafkaConsumer
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import split, col
from pymongo import MongoClient
import time
import subprocess
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.ml import PipelineModel
from pyspark.ml.classification import LogisticRegression


conf = SparkConf().setAppName('myapp')
conf.set('spark.python.worker.timeout', '600')
conf.set('spark.executor.memory', '4g')
conf.set('spark.driver.memory', '4g')
conf.set('spark.executor.cores', '2')
conf.set('spark.python.worker.reuse', 'true')
conf.set('spark.executor.heartbeatInterval', '200s')
conf.set('spark.yarn.executor.memoryOverhead', '2048')
conf.set('spark.network.timeout', '300s')


sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()


model_path = 'C:/Users/hanane/Desktop/Kafka/Model/pipeline_model/'

try:
    pipeline_model = PipelineModel.load(model_path)
except Exception as e:
    print(f"Error loading model from {model_path}: {e}")
    raise

def process_with_model(data):
    try:
        
        row_data = Row(**data)
        df = spark.createDataFrame([row_data])
        prediction = pipeline_model.transform(df)
        return prediction.collect()
    except Exception as ex:
        print("Model processing error: ", ex)
        return None

In [11]:
client = MongoClient('mongodb://localhost:27017')
db = client.BigData
collection = db.new

In [12]:
consumer = KafkaConsumer('twitter_data', bootstrap_servers='localhost:9092', group_id='1', value_deserializer=lambda x: json.loads(x.decode('utf-8')))
try:
    for message in consumer:
        message_value = message.value
        print("Received message:", message_value)

        other_fields = {key: value for key, value in message_value.items() if key != 'Sentiment'}
        
        
        prediction = process_with_model(other_fields)
        per = prediction[0]['prediction']
        if per is not None:
                print("Prediction:", per)
           
        tweet_doc = {
        "id": message_value.get('Tweet ID'),
        "topic": message_value.get('Entity'),
        "content": message_value.get('Tweet content'),
        "prediction": per
            }
        print(tweet_doc)
        print("-------------------------")
        collection.insert_one(tweet_doc)

except KeyboardInterrupt:
    pass  

Received message: {'Tweet ID': 7925, 'Entity': 'MaddenNFL', 'Sentiment': 'Positive', 'Tweet content': 'Thank you @EAMaddenNFL!! \n\nNew TE Austin Hooper in the ORANGE & BROWN!! \n\n#Browns | @AustinHooper18 \n\n pic.twitter.com/GRg4xzFKOn'}
Prediction: 1.0
{'id': 7925, 'topic': 'MaddenNFL', 'content': 'Thank you @EAMaddenNFL!! \n\nNew TE Austin Hooper in the ORANGE & BROWN!! \n\n#Browns | @AustinHooper18 \n\n pic.twitter.com/GRg4xzFKOn', 'prediction': 1.0}
-------------------------
Received message: {'Tweet ID': 11332, 'Entity': 'TomClancysRainbowSix', 'Sentiment': 'Positive', 'Tweet content': 'Rocket League, Sea of Thieves or Rainbow Six: Siege🤔? I love playing all three on stream but which is the best? #stream #twitch #RocketLeague #SeaOfThieves #RainbowSixSiege #follow'}
Prediction: 1.0
{'id': 11332, 'topic': 'TomClancysRainbowSix', 'content': 'Rocket League, Sea of Thieves or Rainbow Six: Siege🤔? I love playing all three on stream but which is the best? #stream #twitch #RocketLeagu

TypeError: 'NoneType' object is not subscriptable

In [8]:
sc.stop()

In [9]:
spark.stop()