# Vulgarity Analysis of Subreddits

**Author:** Noah Wassberg

**Date Created:** March 4, 2024

**Description:** This notebook analyzes the average severity of posts in subreddits. This is done by using the reddit post dataset and the surge AI profanity list dataset. The analysis aims to find which subreddits on average has the most vulgar text content.

**Output:**
- A list of subreddits ranked by the average severity in language of posts.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, explode, split, col, lower, sum as spark_sum, first, broadcast, concat_ws

#Init Spark session
spark_session = SparkSession.builder\
        .master("spark://192.168.2.193:7077") \
        .appName("test")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 4)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

#Get Spark context
spark_context = spark_session.sparkContext
#Set log level to error
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 09:40:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#Import profanity list
profanity_unfiltered = spark_session.read.csv("hdfs://192.168.2.193:9000/user/hadoop/input/input/profanity_en.csv", 
                                         header='true', inferSchema='true')

#Import reddit post dataset
posts_unfiltered = spark_session.read.json("hdfs://192.168.2.193:9000/user/hadoop/input/input/corpus-webis-tldr-17.json")

                                                                                

In [3]:
#Remove unnecessary fields from profanity list.
profanity = profanity_unfiltered.drop("canonical_form_1", "canonical_form_2", "canonical_form_3",
                          "category_1","category_2","category_3","severity_description")

#Remove unnecessary fields from reddit post dataset.
posts_unfiltered = posts_unfiltered.drop("summary","summary_len","content_len","author","body","normalizedBody")

In [4]:
#Get the count of unique posts for each subreddit in the dataset
#and remove subredddits with fewer than 200 posts to avoid outliers
post_counts = posts_unfiltered.groupBy("subreddit_id").agg(countDistinct("id").alias("total_posts")) \
    .filter(col("total_posts") >= 200)

#Remove all posts from subreddits with fewar than 200 posts                                 
posts = posts_unfiltered.join(broadcast(post_counts), "subreddit_id")

In [5]:
#Preprocess the content of each reddit post and the profanity list

#Combine 'content' and 'title' into a single column for tokenization
posts_combined = posts.withColumn("combined", concat_ws(" ", col("content"), col("title")))

#Tokenize the combined content and title and convert to lowercase
posts_tokenized = posts_combined.withColumn("word", explode(split(lower(col("combined")), "\\s+")))

#Lowercase the 'text' in profanity DataFrame for case-insensitive matching
profanity = profanity.withColumn("text", lower(col("text")))

In [6]:
#Join the tokenized posts with the profanity DataFrame on the word matching 'text'
#Using a broadcast join since profanity is much smaller than posts_tokenized
joined_df = posts_tokenized.join(broadcast(profanity), posts_tokenized.word == profanity.text)

#Note: sparks sum function is imported as spark_sum to avoid ambiguity
#Aggregate the severity ratings for each word within each subreddit
severity_totals_per_subreddit = joined_df.groupBy("subreddit_id") \
                                         .agg(first("subreddit").alias("subreddit"), 
                                              spark_sum("severity_rating").alias("total_severity"))

In [7]:
#Join 'severity_totals_per_subreddit' with 'post_counts' on 'subreddit_id'
#calculate average severity per post for each subreddit
#and select relevant columns
avg_severity_per_post = severity_totals_per_subreddit.join(post_counts, "subreddit_id") \
    .withColumn("avg_severity", col("total_severity") / col("total_posts")) \
    .select("subreddit_id", "subreddit", "avg_severity")

In [8]:
#Cache the DataFrame to avoid recomputations
avg_severity_per_post.cache()

#Show the 20 least severe subreddits
print("Top 20 least severe subreddits")
avg_severity_per_post.orderBy(col("avg_severity").asc()).show()
#Show the 20 most severe subreddits
print("Top 20 most severe subreddits")
avg_severity_per_post.orderBy(col("avg_severity").desc()).show()

#clear the cache
avg_severity_per_post.unpersist()

Top 20 least severe subreddits


                                                                                

+------------+---------------+--------------------+
|subreddit_id|      subreddit|        avg_severity|
+------------+---------------+--------------------+
|    t5_2r6f3|   HomeworkHelp|0.021634615384615384|
|    t5_2r8ot|    learnpython|0.024733475479744135|
|    t5_2uas2|latterdaysaints| 0.03271028037383177|
|    t5_3c2d7|   TheSilphRoad| 0.03433734939759036|
|    t5_2qhwy|        grammar|0.034482758620689655|
|    t5_2zn9o|    rocketbeans| 0.03747454175152749|
|    t5_2zgq3|     dogemining|0.039647577092511016|
|    t5_2vrf0|       churning| 0.04832214765100671|
|    t5_2qmsf|           Hair| 0.04897959183673469|
|    t5_2r1wh|       chromeos|0.049999999999999996|
|    t5_2qkhk|          italy|0.052278177458033585|
|    t5_2srow|         spacex| 0.05649350649350649|
|    t5_2qhjq|      Wordpress|0.056708860759493676|
|    t5_2vv1m|     mtgfinance|0.057492354740061154|
|    t5_2qwj8|        Unity3D| 0.05817409766454353|
|    t5_2qm6c|        crochet| 0.06063348416289592|
|    t5_2te6

DataFrame[subreddit_id: string, subreddit: string, avg_severity: double]