In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("reddit").getOrCreate()

25/03/10 20:31:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Load Reddit dataset which is up on Spark.
file_path = "hdfs://de-project-g1-m:8020/data/reddit.json" # For now use a smaller subset to simply test.

df = spark.read.json(file_path)
rows_1 = df.count()
print(f"Current amount of rows: {rows_1}")



Current amount of rows: 3848330


                                                                                

In [3]:
from pyspark.sql.functions import col
# Select the only two we wanna analyze, the subreddit name and it's contents.
df = df.select(col("body"), col("subreddit"))
rows_2 = df.count()
print(f"Current amount of rows: {rows_2}")
df.show(5)

                                                                                

Current amount of rows: 3848330
+--------------------+-----------+
|                body|  subreddit|
+--------------------+-----------+
|I think it should...|       math|
|Art is about the ...|      funny|
|Ask me what I thi...|Borderlands|
|In Mechwarrior On...|   gamingpc|
|You are talking a...|     Diablo|
+--------------------+-----------+
only showing top 5 rows



In [4]:
# Cleanup the data by removing null rows as we can see some above.
df = df.dropna(subset=["body", "subreddit"])
rows_3 = df.count()
rows_dropped = rows_2 - rows_3
print(f"Current amount of rows: {rows_3}, NULL rows dropped: {rows_dropped}.")
df.show(10)



Current amount of rows: 3848194, NULL rows dropped: 136.
+--------------------+--------------------+
|                body|           subreddit|
+--------------------+--------------------+
|I think it should...|                math|
|Art is about the ...|               funny|
|Ask me what I thi...|         Borderlands|
|In Mechwarrior On...|            gamingpc|
|You are talking a...|              Diablo|
|All but one of my...|   RedditLaqueristas|
|I could give a sh...|               apple|
|So you're saying ...|               apple|
|I love this idea ...|RedditFilmsProduc...|
|Theres an entire ...|       AbandonedPorn|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [5]:
# Preprocess the data - in accordance with SparkNLP tutorial
from pyspark.sql.functions import regexp_replace, length # Function to replace substrings in column using regular expressions and just check length.

# Remove empty strings (i.e containing only whitespaces)
df = df.filter(~col("body").rlike("^\s*$"))
# Remove Non-ASCII characters like emojis and non-english characters and replaces with spaces.
#df = df.withColumn("body", regexp_replace(col("body"), "[^\x00-\x7F]+", " ")) 
# Keep comments with at least 5 letters in one word.
#df = df.filter(col("body").rlike("[a-zA-Z]{5,}"))  
# Remove punctuation and replace with an empty string to keep words intact
#df = df.withColumn("body", regexp_replace(col("body"), "[!\"#$%&'()*+,-./:;<=>?@\\[\\\\\\]^_`{|}~]", ""))
# Removes very short comments that don't give any meaning really.
#df = df.filter(length(col("body")) > 10)
#rows_4 = df.count()
#print(f"Rows after pre-processing finalized: {rows_4}, total rows dropped during pre-processing: {rows_2 - rows_4}")
#df.show(10)
# After this pre-processing has been finalized

In [6]:
# Rename body column to text
df = df.selectExpr("body as text", "subreddit as subreddit").repartition(150, "subreddit").cache()


In [7]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
 
spark = sparknlp.start()

document = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")
token = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normal")
vivekn = ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "normal"]) \
    .setOutputCol("result_sentiment")
finisher = Finisher() \
    .setInputCols(["result_sentiment"]) \
    .setOutputCols("final_sentiment")

sentiment_vivekn download started this may take some time.


25/03/10 20:40:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Approximate size to download 873.6 KB
[ | ]

25/03/10 20:40:35 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
25/03/10 20:40:36 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
Download done! Loading the resource.
[ / ]

                                                                                

[ — ]



[OK!]


In [8]:
pipeline = Pipeline().setStages([document, token, normalizer, vivekn, finisher])
pipelineModel = pipeline.fit(df)
result = pipelineModel.transform(df)



In [9]:
from pyspark.sql import functions as F
sentiment_count = result.groupBy("subreddit","final_sentiment").count().filter(col("count") > 50).orderBy("subreddit", "final_sentiment")
subreddit_counts = sentiment_count.groupBy("subreddit").count()
filtered_subreddits = subreddit_counts.filter(F.col("count") > 1).select("subreddit")
sentiment_count = sentiment_count.join(filtered_subreddits, on="subreddit", how="inner").orderBy("subreddit", "final_sentiment")

In [10]:
sentiment_counts = (
    sentiment_count.groupBy("subreddit")
    .pivot("final_sentiment")  # Pivot to create separate columns for positive and negative
    .sum("count")
    .fillna(0)  # Replace nulls with 0 in case a sentiment is missing
)

                                                                                

In [11]:
from pyspark.sql.functions import asc, desc
sentiment_counts = sentiment_counts.withColumnRenamed("[positive]", "positive_count") \
                                   .withColumnRenamed("[negative]", "negative_count")

# Compute the positive-to-negative ratio
sentiment_ratio = sentiment_counts.withColumn(
    "ratio", F.col("positive_count") / F.col("negative_count")
).cache()

print("\n== Most positive ==")
sentiment_ratio.orderBy(desc("ratio")).show(10)


print("\n== Most negative ==")
sentiment_ratio.orderBy(asc("ratio")).show(10)


== Most positive ==


                                                                                

+----------------+----+--------------+--------------+------------------+
|       subreddit|[na]|negative_count|positive_count|             ratio|
+----------------+----+--------------+--------------+------------------+
|       wowguilds|   0|           106|           308|2.9056603773584904|
|FFXIVRECRUITMENT|   0|            67|           181| 2.701492537313433|
|         Romania|   0|           226|           575|2.5442477876106193|
| booksuggestions|   0|            76|           190|               2.5|
|        dirtyr4r|   0|            73|           152|2.0821917808219177|
|         penpals|   0|            63|           131|2.0793650793650795|
|guildrecruitment|   0|            60|           114|               1.9|
|            EVEX|   0|            95|           179|1.8842105263157896|
|       RecruitCS|   0|            51|            93|1.8235294117647058|
|     MakeupRehab|   0|            51|            93|1.8235294117647058|
+----------------+----+--------------+-------------



In [47]:
spark.stop()