In [25]:
from dotenv import load_dotenv
import os
import asyncpraw
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from textblob import TextBlob
from wordcloud import WordCloud
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Load environment variables from .env file
load_dotenv()

# Access the variables from the environment
CLIENT_ID = os.getenv("REDDIT_CLIENT_ID")
CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET")
USER_AGENT = os.getenv("REDDIT_USER_AGENT")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .getOrCreate()

# Initialize Reddit client using asyncpraw
reddit = asyncpraw.Reddit(
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    user_agent=USER_AGENT
)

# Function to fetch data from Reddit
async def fetch_data(subreddit_name):
    subreddit = await reddit.subreddit(subreddit_name)
    print(f"Fetching data from {subreddit_name}...")

    # Retrieve the latest 10 hot posts
    hot_posts = subreddit.hot(limit=10)

    # Stream through the posts and process them
    async for post in hot_posts:
        print(post.title)
        await process_post(post.title)

# Function to process the post title and analyze sentiment
async def process_post(post_title):
    # Remove non-alphabetical characters and tokenize the text
    text = re.sub('[^a-zA-Z]', ' ', post_title)
    text = text.lower()
    tokens = word_tokenize(text)
    tokens = [word for word in tokens if word not in stopwords.words('english')]

    words = " ".join(tokens)

    # Perform sentiment analysis with TextBlob
    blob = TextBlob(words)
    sentiment = blob.sentiment.polarity
    print(f"Sentiment polarity for post '{post_title}': {sentiment}")

    # You can now create a word cloud if needed
    wcloud = WordCloud(width=800, height=600, random_state=101).generate(words)
    wcloud.to_file("sentiment_analysis_wordcloud.png")

    # Process data with Spark
    # Create a DataFrame from the processed post data
    data = [(post_title, sentiment)]
    columns = ["title", "sentiment"]
    df = spark.createDataFrame(data, columns)

    # Perform a simple Spark transformation: Filter posts with positive sentiment
    positive_posts = df.filter(col("sentiment") > 0)
    positive_posts.show()

# Function to run the streaming data fetch
async def stream_reddit_data(subreddit_name):
    await fetch_data(subreddit_name)

# Run the async function in the current event loop
if __name__ == "__main__":
    subreddit_name = "worldnews"  # You can change this to any subreddit you like
    # Use await instead of asyncio.run()
    await stream_reddit_data(subreddit_name)


Fetching data from worldnews...
/r/WorldNews Live Thread: Russian Invasion of Ukraine Day 1041, Part 1 (Thread #1188)
Sentiment polarity for post '/r/WorldNews Live Thread: Russian Invasion of Ukraine Day 1041, Part 1 (Thread #1188)': 0.06818181818181818
+--------------------+-------------------+
|               title|          sentiment|
+--------------------+-------------------+
|/r/WorldNews Live...|0.06818181818181818|
+--------------------+-------------------+

/r/WorldNews Live Thread: Israel at War (Thread #83)
Sentiment polarity for post '/r/WorldNews Live Thread: Israel at War (Thread #83)': 0.13636363636363635
+--------------------+-------------------+
|               title|          sentiment|
+--------------------+-------------------+
|/r/WorldNews Live...|0.13636363636363635|
+--------------------+-------------------+

Ukraine's Defence Intelligence reports historic strike: naval drone destroys Russian Mi-8 helicopter in Crimea
Sentiment polarity for post 'Ukraine's Defenc

ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x78e32537e800>
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x78e32997d6c0>
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x78e32997d540>


+-----+---------+
|title|sentiment|
+-----+---------+
+-----+---------+

WHO urges China to share Covid origins data, five years on from pandemic’s emergence
Sentiment polarity for post 'WHO urges China to share Covid origins data, five years on from pandemic’s emergence': 0.0
+-----+---------+
|title|sentiment|
+-----+---------+
+-----+---------+

Sixty-mile drag mark found near damaged Baltic Sea cable, says Finland 
Sentiment polarity for post 'Sixty-mile drag mark found near damaged Baltic Sea cable, says Finland ': 0.0
+-----+---------+
|title|sentiment|
+-----+---------+
+-----+---------+

500 tonnes of Ukrainian grain to arrive in Syria on 31 December as part of humanitarian programme
Sentiment polarity for post '500 tonnes of Ukrainian grain to arrive in Syria on 31 December as part of humanitarian programme': 0.0
+-----+---------+
|title|sentiment|
+-----+---------+
+-----+---------+



In [18]:
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [20]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True