In [1]:
# Import necessary libraries
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.clustering import StreamingKMeans
from textblob import TextBlob
import json
import sqlite3

# Function to create a database table for storing messages
def create_db_table():
    conn = sqlite3.connect("messages.db")
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS messages
                 (date TEXT, username TEXT, sentiment TEXT, message TEXT, cluster INTEGER)''')
    conn.commit()
    conn.close()

# Function to save a message to the database
def save_message(date, username, sentiment, message, cluster):
    conn = sqlite3.connect("messages.db")
    c = conn.cursor()
    c.execute("INSERT INTO messages (date, username, sentiment, message, cluster) VALUES (?, ?, ?, ?, ?)",
              (date, username, sentiment, message, cluster))
    conn.commit()
    conn.close()

# Function to analyze the sentiment of a message
def analyze_sentiment(message):
    analysis = TextBlob(message)
    sentiment = 0
    if analysis.sentiment.polarity > 0:
        sentiment = 1
    elif analysis.sentiment.polarity < 0:
        sentiment = -1
    return sentiment

# Function to process each RDD in the DStream
def process_rdd(rdd):
    if not rdd.isEmpty():
        messages = rdd.collect()
        for sentiment_value, msg in messages:
            msg_data = json.loads(msg)
            message = msg_data['message']
            sentiment = analyze_sentiment(message)
            cluster = model.latestModel().predict(sentiment_value)

            # Print the cluster sizes in the sliding window
            cluster_counts = rdd.map(lambda x: (model.latestModel().predict(x[0]), 1)).reduceByKey(lambda a, b: a + b).collect()
            print("Cluster sizes in the sliding window:")
            for cluster, count in cluster_counts:
                print(f"  Cluster {cluster}: {count} messages")
            print("\n")

            # Convert sentiment value to string
            if sentiment == 0:
                sentiment = "Neutral"
            elif sentiment == 1:
                sentiment = "Positive"
            elif sentiment == -1:
                sentiment = "Negative"

            # Save the message to the database and print it
            save_message(msg_data['date'], msg_data['username'], sentiment, message, cluster)

            print("--------------------------------------------------")
            print(f"Timestamp:\t[{msg_data['date']}]")
            print(f"Username:\t{msg_data['username']}")
            print(f"Sentiment:\t{sentiment}")
            print(f"Message:\t{message}")
            print(f"Cluster:\t{cluster}")
            print("--------------------------------------------------\n")

# Create the database table
create_db_table()

# Initialize the Spark context and streaming context
sc = SparkContext("local[2]", "TwitchSentimentAnalysis")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)  # 1-second window

# Server configuration
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 8081
lines = ssc.socketTextStream(SERVER_HOST, SERVER_PORT)

# Initialize the streaming k-means model
model = StreamingKMeans(k=3, decayFactor=1.0).setRandomCenters(1, 1.0, 0)
sentiment_values = lines.map(lambda msg: (Vectors.dense(analyze_sentiment(json.loads(msg)['message'])), msg))
model.trainOn(sentiment_values.map(lambda x: x[0]))

# Create a windowed DStream with a window duration of 10 seconds and a slide duration of 5 seconds
windowed_sentiments = sentiment_values.window(windowDuration=10, slideDuration=5)
windowed_sentiments.foreachRDD(process_rdd)

# Start the streaming context and wait for it to terminate
ssc.start()
ssc.awaitTermination()


23/05/02 02:26:27 WARN Utils: Your hostname, etiennebobo-Prestige-15-A10SC resolves to a loopback address: 127.0.1.1; using 172.20.10.3 instead (on interface wlo1)
23/05/02 02:26:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 02:26:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 1 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:31]
Username:	estiimated
Sentiment:	Neutral
Message:	MyHonestReaction 󠀀
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:31]
Username:	estiimated
Sentiment:	Neutral
Message:	MyHonestReaction 󠀀
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	zsungat
Sentiment:	Neutral
Message:	2:56 Pirate_hat: @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	estiimated
Sentiment:	Neutral
Message:	MyHonestReaction
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	ersieeswambo
Sentiment:	Neutral
Message:	Gushii is my alt
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:36]
Username:	zsungat
Sentiment:	Neutral
Message:	catEat nymnCorn
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 6 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:37]
Username:	pirate_hat
Sentiment:	Neutral
Message:	@Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat
:nate_thesnake!nate_thesnake@nate_thesnake.tmi.twitch.tv PRIVMSG #pokelawls :ban everyone tbh
:pirate_hat!pirate_hat@pirate_hat.tmi.twitch.tv PRIVMSG #pokelawls :@Pirate_hat:  Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat
Cluster:	1
--------------------------------------------------

                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	zsungat
Sentiment:	Neutral
Message:	2:56 Pirate_hat: @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	estiimated
Sentiment:	Neutral
Message:	MyHonestReaction
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:34]
Username:	ersieeswambo
Sentiment:	Neutral
Message:	Gushii is my alt
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:36]
Username:	zsungat
Sentiment:	Neutral
Message:	catEat nymnCorn
Cluster:	1
--------------------------------------------------



                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:37]
Username:	pirate_hat
Sentiment:	Neutral
Message:	@Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat
:nate_thesnake!nate_thesnake@nate_thesnake.tmi.twitch.tv PRIVMSG #pokelawls :ban everyone tbh
:pirate_hat!pirate_hat@pirate_hat.tmi.twitch.tv PRIVMSG #pokelawls :@Pirate_hat:  Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat
Cluster:	1
--------------------------------------------------

                                                                                

Cluster sizes in the sliding window:
  Cluster 1: 7 messages


--------------------------------------------------
Timestamp:	[2023-05-02 02:26:39]
Username:	zsungat
Sentiment:	Negative
Message:	catEat  nymnCorn
:estiimated!estiimated@estiimated.tmi.twitch.tv PRIVMSG #pokelawls :MyHonestReaction 󠀀
:pirate_hat!pirate_hat@pirate_hat.tmi.twitch.tv PRIVMSG #pokelawls :@Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat @Pirate_hat: Nerdge Nerdge @zsungat @HiMemeMan @riotskate @Pirate_hat: Nerdge @zsungat
:ersieeswambo!ersieeswambo@ersieeswambo.tmi.twitch.tv PRIVMSG #pokelawls :ill stop
Cluster:	1
--------------------------------------------------



[Stage 0:>                                                          (0 + 1) / 1]