# Sentiment Streaming with PySpark and Netcat

We classify incoming text data into positive, negative, or neutral sentiment based on predefined keywords.

**Pipeline Overview:**
1. **Netcat** listens on a socket (`localhost:9999`) for input text messages.
2. **PySpark Streaming** connects to the socket and receives data in real-time batches (every 5 seconds).
3. Each incoming message is classified as **positive**, **negative**, or **neutral**.
4. The original message and its classification are printed to the console.


 Import required libraries

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Initialize Spark Streaming Context

In [2]:
sc = SparkContext(appName="SentimentStreaming")
ssc = StreamingContext(sc, 5)  # 5-second micro-batch interval

your 131072x1 screen size is bogus. expect trouble
25/05/08 15:04:53 WARN Utils: Your hostname, Gloria resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/08 15:04:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 15:04:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



----------------------------------------
Message: "I feel awesome today" → Sentiment: positive

----------------------------------------
Message: "This is such a good idea" → Sentiment: positive

----------------------------------------
Message: "I am very tired" → Sentiment: neutral

----------------------------------------
Message: "Hello world" → Sentiment: neutral

----------------------------------------
Message: "I had such a bad experience today" → Sentiment: negative

----------------------------------------
Message: "Nothing seems right anymore" → Sentiment: neutral

----------------------------------------
Message: "Terrible performance by the team" → Sentiment: negative

----------------------------------------
Message: "The movie was absolutely awful" → Sentiment: negative


Define sentiment words

In [3]:
positive_words = ["happy", "awesome", "good", "great", "amazing", "love", "like"]
negative_words = ["sad", "terrible", "bad", "worst", "awful", "hate", "angry"]

Connect to socket stream from Netcat

In [4]:
data = ssc.socketTextStream("localhost", 9999)

Define Sentiment Classification Function

In [5]:
def classify_sentiment(line):
    line = line.lower()
    pos = sum(1 for word in positive_words if word in line)
    neg = sum(1 for word in negative_words if word in line)
    if pos > neg:
        return ("positive", 1)
    elif neg > pos:
        return ("negative", 1)
    else:
        return ("neutral", 1)

Map Incoming Messages to Sentiment

In [6]:
sentiments = data.map(lambda line: (line, classify_sentiment(line)[0]))
counts = sentiments.reduceByKey(lambda a, b: a + b)


Define Display Function

In [7]:
def display(rdd):
    if not rdd.isEmpty():
        print("\n" + "-"*40)
        for record in rdd.collect():
            print(f"Message: \"{record[0]}\" → Sentiment: {record[1]}")

counts.foreachRDD(display)


#### Attach Processing Function and Start Streaming

In [None]:
ssc.start()

25/05/08 15:07:00 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999
java.net.ConnectException: Connection refused (Connection refused)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/java.net.Socket.connect(Socket.java:558)
	at java.base/java.net.Socket.<init>(Socket.java:454)
	at java.base/java.net.Socket.<init>(Socket.java:231)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSu

In [None]:
ssc.awaitTermination()

This notebook demonstrates a basic real-time streaming application with PySpark.

We processed text messages in near real-time and classified them into sentiments.