In [34]:
#Read in the json file and covert to tabular format

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("JSON Reader") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

df = spark.read.option("multiline", "true").json("reddit_posts_data.json")
# df = spark.read.option("multiline", "true").json("125.json")

df.show(5)
print(df.count())

23/12/03 05:30:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+---+--------------------+
|         author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title|ups|                 url|
+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+---+--------------------+
| DONGBONGER3000

[Stage 114:>                                                        (0 + 1) / 1]

99177


                                                                                

In [35]:
#Trying to see what an example permalink looks like in order to extract the subreddit from it

first_row_permalink = df.select("permalink").first()[0]
print("Permalink of the first row:", first_row_permalink)

[Stage 117:>                                                        (0 + 1) / 1]

Permalink of the first row: /r/HistoryMemes/comments/17fxoki/our_boy_teddy_dug_a_really_long_sideways_hole/


                                                                                

In [36]:
#extracting subreddit name from permalink

from pyspark.sql.functions import regexp_extract
df_with_subreddit = df.withColumn("subreddit", regexp_extract(df["permalink"], r'/r/([^/]+)/', 1))
random_post = df_with_subreddit.select("subreddit").collect()[780][0]
print("Subreddit of the random row:", random_post)

                                                                                

Subreddit of the random row: HistoryMemes


In [37]:
#adding subreddit column to df

df_with_subreddit = df.withColumn("subreddit", regexp_extract(df["permalink"], r'/r/([^/]+)/', 1))

df_with_subreddit.show(50)

[Stage 119:>                                                        (0 + 1) / 1]

+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+-----+--------------------+------------+
|              author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title|  ups|                 url|   subreddit|
+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+------------------

                                                                                

In [38]:
#manually mapping categories to subreddits 

from pyspark.sql.functions import col, when

subreddit_categories = {
    "funny": "Funny/Humor",
    "AskReddit": "Learning and Education",
    "gaming": "Gaming",
    "aww": "Animals and Pets",
    "worldnews": "World News",
    "todayilearned": "Learning and Education",
    "Music": "Music",
    "movies": "Movies",
    "science": "Science",
    "pics": "Art",
    "Showerthoughts": "Funny/Humor",
    "memes": "Internet Culture and Memes",
    "Jokes": "Funny/Humor",
    "news": "World News",
    "videos": "Internet Culture and Memes",
    "askscience": "Science",
    "space": "Science",
    "EarthPorn": "Outdoors and Nature",
    "food": "Food and Drink",
    "books": "Reading, Writing, and Literature",
    "DIY": "Crafts and DIY",
    "nottheonion": "Funny/Humor",
    "mildlyinteresting": "Internet Culture and Memes",
    "explainlikeimfive": "Learning and Education",
    "IAmA": "Meta/Reddit",
    "LifeProTips": "Learning and Education",
    "gadgets": "Technology",
    "gifs": "Internet Culture and Memes",
    "sports": "Sports",
    "GetMotivated": "Ethics and Philosophy",
    "Documentaries": "Learning and Education",
    "dataisbeautiful": "Science",
    "UpliftingNews": "World News",
    "Futurology": "Technology",
    "photoshopbattles": "Funny/Humor",
    "tifu": "Meta/Reddit",
    "personalfinance": "Business, Economics, and Finance",
    "OldSchoolCool": "History",
    "listentothis": "Music",
    "nosleep": "Reading, Writing, and Literature",
    "history": "History",
    "philosophy": "Ethics and Philosophy",
    "WritingPrompts": "Reading, Writing, and Literature",
    "InternetIsBeautiful": "Internet Culture and Memes",
    "technology": "Technology",
    "creepy": "Internet Culture and Memes",
    "wholesomememes": "Internet Culture and Memes",
    "wallstreetbets": "Business, Economics, and Finance",
    "Damnthatsinteresting": "Internet Culture and Memes",
    "NatureIsFuckingLit": "Outdoors and Nature",
    "lifehacks": "Learning and Education",
    "interestingasfuck": "Internet Culture and Memes",
    "dadjokes": "Funny/Humor",
    "relationship_advice": "Family and Relationships",
    "oddlysatisfying": "Internet Culture and Memes",
    "AdviceAnimals": "Internet Culture and Memes",
    "pcmasterrace": "Technology",
    "travel": "Travel",
    "nba": "Sports",
    "MadeMeSmile": "Funny/Humor",
    "HistoryMemes": "History",
    "politics": "Politics",
    "anime": "Anime",
    "ContagiousLaughter": "Funny/Humor",
    "AnimalsBeingDerps": "Animals and Pets",
    "facepalm": "Funny/Humor",
    "BeAmazed": "Internet Culture and Memes",
    "tattoos": "Art",
    "CryptoCurrency": "Crypto",
    "EatCheapAndHealthy": "Food and Drink",
    "AnimalsBeingBros": "Animals and Pets",
    "leagueoflegends": "Gaming",
    "mildlyinfuriating": "Internet Culture and Memes",
    "buildapc": "Gaming",
    "AnimalsBeingJerks": "Animals and Pets",
    "FoodPorn": "Food and Drink",
    "NetflixBestOf": "Movies",
    "Tinder": "Family and Relationships",
    "gardening": "Home and Garden",
    "Bitcoin": "Crypto",
    "Parenting": "Family and Relationships",
    "WatchPeopleDieInside": "Funny/Humor",
    "programming": "Programming",
    "nfl": "Sports",
    "soccer": "Sports",
    "malefashionadvice": "Fashion",
    "PS4": "Gaming",
    "starterpacks": "Internet Culture and Memes",
    "Awwducational": "Animals and Pets",
    "HumansBeingBros": "Internet Culture and Memes",
    "bestof": "Meta/Reddit",
    "rarepuppers": "Animals and Pets",
    "itookapicture": "Art",
    "photography": "Art",
    "europe": "Place",
    "NintendoSwitch": "Gaming",
    "YouShouldKnow": "Learning and Education",
    "trippinthroughtime": "Internet Culture and Memes",
    "Overwatch": "Gaming",
    "woodworking": "Hobbies"
}

df_with_categories = df_with_subreddit.withColumn(
    "category",
    when(col("subreddit") == "funny", subreddit_categories["funny"])
    .when(col("subreddit") == "AskReddit", subreddit_categories["AskReddit"])
    .when(col("subreddit") == "gaming", subreddit_categories["gaming"])
    .when(col("subreddit") == "aww", subreddit_categories["aww"])
    .when(col("subreddit") == "worldnews", subreddit_categories["worldnews"])
    .when(col("subreddit") == "todayilearned", subreddit_categories["todayilearned"])
    .when(col("subreddit") == "Music", subreddit_categories["Music"])
    .when(col("subreddit") == "movies", subreddit_categories["movies"])
    .when(col("subreddit") == "science", subreddit_categories["science"])
    .when(col("subreddit") == "pics", subreddit_categories["pics"])
    .when(col("subreddit") == "Showerthoughts", subreddit_categories["Showerthoughts"])
    .when(col("subreddit") == "memes", subreddit_categories["memes"])
    .when(col("subreddit") == "Jokes", subreddit_categories["Jokes"])
    .when(col("subreddit") == "news", subreddit_categories["news"])
    .when(col("subreddit") == "videos", subreddit_categories["videos"])
    .when(col("subreddit") == "askscience", subreddit_categories["askscience"])
    .when(col("subreddit") == "space", subreddit_categories["space"])
    .when(col("subreddit") == "EarthPorn", subreddit_categories["EarthPorn"])
    .when(col("subreddit") == "food", subreddit_categories["food"])
    .when(col("subreddit") == "books", subreddit_categories["books"])
    .when(col("subreddit") == "DIY", subreddit_categories["DIY"])
    .when(col("subreddit") == "nottheonion", subreddit_categories["nottheonion"])
    .when(col("subreddit") == "mildlyinteresting", subreddit_categories["mildlyinteresting"])
    .when(col("subreddit") == "explainlikeimfive", subreddit_categories["explainlikeimfive"])
    .when(col("subreddit") == "IAmA", subreddit_categories["IAmA"])
    .when(col("subreddit") == "LifeProTips", subreddit_categories["LifeProTips"])
    .when(col("subreddit") == "gadgets", subreddit_categories["gadgets"])
    .when(col("subreddit") == "gifs", subreddit_categories["gifs"])
    .when(col("subreddit") == "sports", subreddit_categories["sports"])
    .when(col("subreddit") == "GetMotivated", subreddit_categories["GetMotivated"])
    .when(col("subreddit") == "Documentaries", subreddit_categories["Documentaries"])
    .when(col("subreddit") == "dataisbeautiful", subreddit_categories["dataisbeautiful"])
    .when(col("subreddit") == "UpliftingNews", subreddit_categories["UpliftingNews"])
    .when(col("subreddit") == "Futurology", subreddit_categories["Futurology"])
    .when(col("subreddit") == "photoshopbattles", subreddit_categories["photoshopbattles"])
    .when(col("subreddit") == "tifu", subreddit_categories["tifu"])
    .when(col("subreddit") == "personalfinance", subreddit_categories["personalfinance"])
    .when(col("subreddit") == "OldSchoolCool", subreddit_categories["OldSchoolCool"])
    .when(col("subreddit") == "listentothis", subreddit_categories["listentothis"])
    .when(col("subreddit") == "nosleep", subreddit_categories["nosleep"])
    .when(col("subreddit") == "history", subreddit_categories["history"])
    .when(col("subreddit") == "philosophy", subreddit_categories["philosophy"])
    .when(col("subreddit") == "WritingPrompts", subreddit_categories["WritingPrompts"])
    .when(col("subreddit") == "InternetIsBeautiful", subreddit_categories["InternetIsBeautiful"])
    .when(col("subreddit") == "technology", subreddit_categories["technology"])
    .when(col("subreddit") == "creepy", subreddit_categories["creepy"])
    .when(col("subreddit") == "wholesomememes", subreddit_categories["wholesomememes"])
    .when(col("subreddit") == "wallstreetbets", subreddit_categories["wallstreetbets"])
    .when(col("subreddit") == "Damnthatsinteresting", subreddit_categories["Damnthatsinteresting"])
    .when(col("subreddit") == "NatureIsFuckingLit", subreddit_categories["NatureIsFuckingLit"])
    .when(col("subreddit") == "lifehacks", subreddit_categories["lifehacks"])
    .when(col("subreddit") == "interestingasfuck", subreddit_categories["interestingasfuck"])
    .when(col("subreddit") == "dadjokes", subreddit_categories["dadjokes"])
    .when(col("subreddit") == "relationship_advice", subreddit_categories["relationship_advice"])
    .when(col("subreddit") == "oddlysatisfying", subreddit_categories["oddlysatisfying"])
    .when(col("subreddit") == "AdviceAnimals", subreddit_categories["AdviceAnimals"])
    .when(col("subreddit") == "pcmasterrace", subreddit_categories["pcmasterrace"])
    .when(col("subreddit") == "travel", subreddit_categories["travel"])
    .when(col("subreddit") == "nba", subreddit_categories["nba"])
    .when(col("subreddit") == "MadeMeSmile", subreddit_categories["MadeMeSmile"])
    .when(col("subreddit") == "HistoryMemes", subreddit_categories["HistoryMemes"])
    .when(col("subreddit") == "politics", subreddit_categories["politics"])
    .when(col("subreddit") == "anime", subreddit_categories["anime"])
    .when(col("subreddit") == "ContagiousLaughter", subreddit_categories["ContagiousLaughter"])
    .when(col("subreddit") == "AnimalsBeingDerps", subreddit_categories["AnimalsBeingDerps"])
    .when(col("subreddit") == "facepalm", subreddit_categories["facepalm"])
    .when(col("subreddit") == "BeAmazed", subreddit_categories["BeAmazed"])
    .when(col("subreddit") == "tattoos", subreddit_categories["tattoos"])
    .when(col("subreddit") == "CryptoCurrency", subreddit_categories["CryptoCurrency"])
    .when(col("subreddit") == "EatCheapAndHealthy", subreddit_categories["EatCheapAndHealthy"])
    .when(col("subreddit") == "AnimalsBeingBros", subreddit_categories["AnimalsBeingBros"])
    .when(col("subreddit") == "leagueoflegends", subreddit_categories["leagueoflegends"])
    .when(col("subreddit") == "mildlyinfuriating", subreddit_categories["mildlyinfuriating"])
    .when(col("subreddit") == "buildapc", subreddit_categories["buildapc"])
    .when(col("subreddit") == "AnimalsBeingJerks", subreddit_categories["AnimalsBeingJerks"])
    .when(col("subreddit") == "FoodPorn", subreddit_categories["FoodPorn"])
    .when(col("subreddit") == "NetflixBestOf", subreddit_categories["NetflixBestOf"])
    .when(col("subreddit") == "Tinder", subreddit_categories["Tinder"])
    .when(col("subreddit") == "gardening", subreddit_categories["gardening"])
    .when(col("subreddit") == "Bitcoin", subreddit_categories["Bitcoin"])
    .when(col("subreddit") == "Parenting", subreddit_categories["Parenting"])
    .when(col("subreddit") == "WatchPeopleDieInside", subreddit_categories["WatchPeopleDieInside"])
    .when(col("subreddit") == "programming", subreddit_categories["programming"])
    .when(col("subreddit") == "nfl", subreddit_categories["nfl"])
    .when(col("subreddit") == "soccer", subreddit_categories["soccer"])
    .when(col("subreddit") == "malefashionadvice", subreddit_categories["malefashionadvice"])
    .when(col("subreddit") == "PS4", subreddit_categories["PS4"])
    .when(col("subreddit") == "starterpacks", subreddit_categories["starterpacks"])
    .when(col("subreddit") == "Awwducational", subreddit_categories["Awwducational"])
    .when(col("subreddit") == "HumansBeingBros", subreddit_categories["HumansBeingBros"])
    .when(col("subreddit") == "bestof", subreddit_categories["bestof"])
    .when(col("subreddit") == "rarepuppers", subreddit_categories["rarepuppers"])
    .when(col("subreddit") == "itookapicture", subreddit_categories["itookapicture"])
    .when(col("subreddit") == "photography", subreddit_categories["photography"])
    .when(col("subreddit") == "europe", subreddit_categories["europe"])
    .when(col("subreddit") == "NintendoSwitch", subreddit_categories["NintendoSwitch"])
    .when(col("subreddit") == "YouShouldKnow", subreddit_categories["YouShouldKnow"])
    .when(col("subreddit") == "trippinthroughtime", subreddit_categories["trippinthroughtime"])
    .when(col("subreddit") == "Overwatch", subreddit_categories["Overwatch"])
    .when(col("subreddit") == "woodworking", subreddit_categories["woodworking"])
    .otherwise("Other")
)

df_with_categories.show(5)

[Stage 120:>                                                        (0 + 1) / 1]

+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+---+--------------------+------------+--------+
|         author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title|ups|                 url|   subreddit|category|
+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+-------------------

                                                                                

In [39]:
#printing a random category

random_post = df_with_categories.select("Category").collect()[1651][0]
print("Category of the random row:", random_post)

                                                                                

Category of the random row: Anime


In [40]:
#sentiment analysis on text body (if text body is empty, using title instead)

from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def determinePolarity(title, text_body):
    input_text = title if not text_body or text_body == "" else text_body
    blob = TextBlob(input_text)
    sentiment = blob.sentiment
    polarity, subjectivity = sentiment

    return polarity

polarity_udf = udf(determinePolarity, FloatType())

new_test_df = df_with_categories.withColumn("Polarity", polarity_udf(df_with_categories["title"], df_with_categories["text_body"]))

filtered_df = new_test_df
filtered_df.show(5)


[Stage 122:>                                                        (0 + 1) / 1]

+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+---+--------------------+------------+--------+------------+
|         author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title|ups|                 url|   subreddit|category|    Polarity|
+---------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------

                                                                                

In [41]:
#Query 1: Finding averge polarity of each subreddit

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("AveragePolarity").getOrCreate()

filtered_df.createOrReplaceTempView("reddit_posts")

result = spark.sql("""
    SELECT
        subreddit as subreddit,
        AVG(Polarity) as avg_polarity
    FROM
        reddit_posts
    GROUP BY
        subreddit
    ORDER BY
        avg_polarity DESC
""")

result.show(result.count(), truncate=False)

23/12/03 05:31:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 129:>                                                        (0 + 1) / 1]

+--------------------+----------------------+
|subreddit           |avg_polarity          |
+--------------------+----------------------+
|wholesomememes      |0.2198957654511628    |
|NetflixBestOf       |0.1731863810482488    |
|malefashionadvice   |0.16902207882490725   |
|RichTogether        |0.15651741862297058   |
|MadeMeSmile         |0.14795933141814252   |
|photography         |0.14250632130808544   |
|anime               |0.14075309817573323   |
|BeAmazed            |0.13572215759127407   |
|GetMotivated        |0.131471516266633     |
|buildapc            |0.12942730780143696   |
|InternetIsBeautiful |0.1293749220947933    |
|EatCheapAndHealthy  |0.12863450191047432   |
|travel              |0.1265032302326617    |
|books               |0.11578407458430527   |
|investing           |0.11439308808253439   |
|AnimalsBeingBros    |0.11339307545560849   |
|aww                 |0.11322636955863453   |
|movies              |0.1109714375854843    |
|UndervaluedStonks   |0.1024370964

                                                                                

In [42]:
#Query 2: Finding average polarity of any post that includes a certain keyword

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("FindKeywordPosts").getOrCreate()

keyword = "tax"

keywordPosts = filtered_df.filter(col("text_body").contains(keyword.lower()))

selected_columns = ["Polarity", "text_body", "subreddit"]

average_polarity = keywordPosts.agg(avg("Polarity")).collect()[0][0]
print(f"Average Polarity of posts containing the keyword 'tax': {average_polarity}")

keywordPosts.select(selected_columns).show(truncate=False)

23/12/03 05:32:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Average Polarity of posts containing the keyword 'tax': 0.09186172920276034


[Stage 135:>                                                        (0 + 1) / 1]

+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [43]:
#Query 3: Finding average sentiment analysis of each category 

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("AveragePolarity").getOrCreate()

filtered_df.createOrReplaceTempView("reddit_posts")

result = spark.sql("""
    SELECT
        category as category,
        AVG(Polarity) as avg_polarity
    FROM
        reddit_posts
    GROUP BY
        category
    ORDER BY
        avg_polarity DESC
""")

result.show()

23/12/03 05:32:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 136:>                                                        (0 + 1) / 1]

+--------------------+-------------------+
|            category|       avg_polarity|
+--------------------+-------------------+
|             Fashion|0.16902207882490725|
|              Movies|0.14265749467729708|
|               Anime|0.14075309817573323|
|              Travel| 0.1265032302326617|
|                 Art|0.10209752414490018|
|     Home and Garden|0.09058691778524253|
|Ethics and Philos...|0.08815171331864097|
|               Other|0.08799351714070137|
|    Animals and Pets|0.08727621410532745|
|              Gaming|0.08385787232678286|
|Business, Economi...|0.07562524158871534|
|              Crypto|0.07494473063031037|
|             Science|0.07377717372310089|
|Family and Relati...|   0.06985298815951|
|Internet Culture ...|0.06629375887439326|
|         Programming|0.06224433768355677|
|          Technology|0.06157291740459828|
| Outdoors and Nature|0.06138610749113426|
|Learning and Educ...|0.06093820211819578|
|             History|0.05889183806102227|
+----------

                                                                                

In [44]:
from pyspark.sql.functions import to_date
from pyspark.sql.types import DateType

formatted_df = filtered_df.withColumn("date", to_date(filtered_df["date"], "dd/MM/yyyy").cast(DateType()))

formatted_df.createOrReplaceTempView("reddit_posts")

formatted_df.show()


[Stage 139:>                                                        (0 + 1) / 1]

+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+----+--------------------+------------+--------+------------+
|              author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title| ups|                 url|   subreddit|category|    Polarity|
+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+------

                                                                                

In [45]:
from pyspark.sql.functions import col, max, expr

def manual_sqrt(value):
    return value ** 0.5

max_score = formatted_df.agg(max("score").alias("max_score")).collect()[0]["max_score"]
max_score_sqrt = manual_sqrt(max_score)
print("Max Score:", max_score)

max_comments = formatted_df.agg(max("num_comments").alias("max_num_comments")).collect()[0]["max_num_comments"]
max_comments_sqrt = manual_sqrt(max_comments)
print("Max Num Comments:", max_comments)

formatted_df = formatted_df.withColumn(
    "interactiveness_score",
    (0.1 * expr("abs(polarity)") + 0.45 * (manual_sqrt(col("score")) / max_score_sqrt) + 0.45 * (manual_sqrt(col("num_comments")) / max_comments_sqrt)) * 100
)

formatted_df.show()




                                                                                

Max Score: 192262


                                                                                

Max Num Comments: 31014


[Stage 146:>                                                        (0 + 1) / 1]

+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+--------------------+-----+-----+-------+--------+--------------------+--------------------+----+--------------------+------------+--------+------------+---------------------+
|              author|   author_flair_text|clicked|            comments|  created_utc|      date|distinguished|downs|edited|     id|is_original_content|is_self|link_flair_text|locked|      name|num_comments|over_18|           permalink|saved|score|spoiler|stickied|           text_body|               title| ups|                 url|   subreddit|category|    Polarity|interactiveness_score|
+--------------------+--------------------+-------+--------------------+-------------+----------+-------------+-----+------+-------+-------------------+-------+---------------+------+----------+------------+-------+---

                                                                                

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("AveragePolarity2").getOrCreate()

formatted_df.createOrReplaceTempView("reddit_posts")

result = spark.sql("""
    SELECT
        category as category,
        date as date,
        AVG(Polarity) as avg_polarity,
        AVG(num_comments) as avg_num_comments,
        AVG(score) as avg_score
    FROM
        reddit_posts
    GROUP BY
        category, date
    ORDER BY
        category, date
""")

result.show()


23/12/03 05:33:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 147:>                                                        (0 + 1) / 1]

+----------------+----------+--------------------+------------------+------------------+
|        category|      date|        avg_polarity|  avg_num_comments|         avg_score|
+----------------+----------+--------------------+------------------+------------------+
|Animals and Pets|2022-11-27| 0.19125000527128577|             134.5|            7584.0|
|Animals and Pets|2022-11-28| 0.20753969003756842|              47.0|3914.6666666666665|
|Animals and Pets|2022-11-29| 0.08392857387661934|             186.0|            9035.5|
|Animals and Pets|2022-11-30| 0.12780611962080002|             100.5|            4958.0|
|Animals and Pets|2022-12-01| 0.06666667014360428|             296.0|            6071.0|
|Animals and Pets|2022-12-02| 0.07999999821186066|              79.0|            8807.0|
|Animals and Pets|2022-12-03| 0.07023809726039569|116.33333333333333| 6441.666666666667|
|Animals and Pets|2022-12-04|  0.1190476194024086|             160.0|           11565.0|
|Animals and Pets|202

                                                                                

In [47]:
#Storing to mysql

from textblob import TextBlob
from pyspark.sql.functions import udf, sum, col
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
import mysql.connector

spark = SparkSession.builder.appName("example").getOrCreate()

pandas_df = result.toPandas()

import mysql.connector

db_connection = mysql.connector.connect(user="group7", password="group7pass")
db_cursor = db_connection.cursor()
db_cursor.execute("USE cs179g;")

db_cursor.execute("""
    DROP TABLE IF EXISTS average_insights4
""")

db_cursor.execute("""
    CREATE TABLE IF NOT EXISTS average_insights4 (
        id INT AUTO_INCREMENT PRIMARY KEY,
        category VARCHAR(255),
        date DATE,
        Polarity FLOAT,
        num_comments FLOAT,
        score FLOAT
    )
""")


for row in result.collect():
    query = "INSERT INTO average_insights4 (category, date, Polarity, num_comments, score) VALUES (%s, %s, %s, %s, %s)"
    values = (row['category'], row['date'], row['avg_polarity'], row['avg_num_comments'], row['avg_score'])
    db_cursor.execute(query, values)

# db_cursor.execute("SELECT category, date, AVG(score) FROM average_insights3 WHERE date BETWEEN 10-1-2023 AND 10-31-2023 AND category = 'finance' GROUP BY date")
# print(db_cursor.fetchall())

db_connection.commit()
db_connection.close()


23/12/03 05:34:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [51]:
#Storing to mysql

from textblob import TextBlob
from pyspark.sql.functions import udf, sum, col
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
import mysql.connector

spark = SparkSession.builder.appName("example").getOrCreate()

pandas_df = result.toPandas()

import mysql.connector

db_connection = mysql.connector.connect(user="group7", password="group7pass")
db_cursor = db_connection.cursor()
db_cursor.execute("USE cs179g;")

db_cursor.execute("SELECT date, score FROM average_insights4 WHERE category = 'Animals and Pets' AND date >= '2023-10-01' AND date <= '2023-10-31'")
print(db_cursor.fetchall())

db_connection.commit()
db_connection.close()


[(datetime.date(2023, 10, 1), 5285.83), (datetime.date(2023, 10, 2), 2368.0), (datetime.date(2023, 10, 3), 2302.67), (datetime.date(2023, 10, 4), 2774.6), (datetime.date(2023, 10, 5), 4451.29), (datetime.date(2023, 10, 6), 2214.46), (datetime.date(2023, 10, 7), 1319.5), (datetime.date(2023, 10, 8), 1639.11), (datetime.date(2023, 10, 9), 5618.33), (datetime.date(2023, 10, 10), 2359.0), (datetime.date(2023, 10, 11), 1907.5), (datetime.date(2023, 10, 12), 10661.2), (datetime.date(2023, 10, 13), 2977.6), (datetime.date(2023, 10, 14), 10035.8), (datetime.date(2023, 10, 15), 6980.12), (datetime.date(2023, 10, 16), 4229.5), (datetime.date(2023, 10, 17), 4312.8), (datetime.date(2023, 10, 18), 4598.12), (datetime.date(2023, 10, 19), 1121.47), (datetime.date(2023, 10, 20), 1226.13), (datetime.date(2023, 10, 21), 983.445), (datetime.date(2023, 10, 22), 1621.6), (datetime.date(2023, 10, 23), 1218.01), (datetime.date(2023, 10, 24), 969.267), (datetime.date(2023, 10, 25), 8.45455)]


In [20]:
#Query 4: Finding top 10 most upvoted posts

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs

spark = SparkSession.builder.appName("MostUpvotedPost").getOrCreate()

selected_columns = ["author", "title", "subreddit", "permalink", "ups"]

Most_Upvoted_Post = filtered_df.orderBy("ups", ascending=False).limit(10).select(selected_columns)

Most_Upvoted_Post.show(truncate=False)

23/12/02 02:12:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 72:>                                                         (0 + 1) / 1]

+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+------------------------------------------------------------------------------------------+------+
|author            |title                                                                                                                                                                                                   |subreddit           |permalink                                                                                 |ups   |
+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+----------------------------------------------------------------------------

                                                                                

In [21]:
#Query 5: Finding top subreddits based on number of comments
from pyspark.sql.functions import sum, col

filtered_df = filtered_df.withColumn("num_comments", filtered_df["num_comments"].cast("int"))

total_comments_df = filtered_df.groupBy("subreddit").agg(sum("num_comments").alias("total_comments"))

total_comments_df = total_comments_df.orderBy(col("total_comments").desc())

total_comments_df.show()

[Stage 73:>                                                         (0 + 1) / 1]

+--------------------+--------------+
|           subreddit|total_comments|
+--------------------+--------------+
|      wallstreetbets|        690397|
|WatchPeopleDieInside|        593507|
|      CryptoCurrency|        363186|
|            politics|        357182|
|     oddlysatisfying|        323622|
|                 nfl|        288292|
|           worldnews|        255113|
|            facepalm|        253834|
|     dataisbeautiful|        217145|
|          technology|        212245|
|         nottheonion|        204393|
|Damnthatsinteresting|        189216|
|            BeAmazed|        179849|
|                IAmA|        176994|
|       todayilearned|        164635|
|             science|        161322|
|     HumansBeingBros|        160029|
|          Futurology|        136621|
|           lifehacks|        135307|
|              stocks|        132463|
+--------------------+--------------+
only showing top 20 rows



                                                                                

In [22]:
#Query 6: Open ended text from user, determine average polarity based on input

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

spark = SparkSession.builder.appName("AveragePolarity").getOrCreate()
text_df = filtered_df.filter(col("text_body").contains("Israel"))
average_polarity = text_df.agg(avg("Polarity").alias("average_polarity"))

average_polarity.show(truncate=False)


23/12/02 02:12:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 76:>                                                         (0 + 1) / 1]

+-------------------+
|average_polarity   |
+-------------------+
|0.05225340135075385|
+-------------------+



                                                                                

In [23]:
#Query 6 (part 2): Show the top 5 posts ordered by polarity given a word (In this case, Palestine)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TopPostsByPolarity").getOrCreate()


test_df = filtered_df.filter(col("text_body").contains("Palestine"))


top_posts_by_polarity = test_df.orderBy("Polarity", ascending=True).limit(5).select("text_body", "score", "Polarity")

top_posts_by_polarity.show(truncate=False)


23/12/02 02:12:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 79:>                                                         (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.appName("CustomScoreCalculation").getOrCreate()

custom_score_df = (
    filtered_df
    .withColumn("Downvotes", (col("ups") - col("score")))
    .withColumn("CustomScore", (col("score") * 100 + col("num_comments") * 100) * abs(col("polarity")))
    .select("author", "title", "subreddit", "permalink", "ups", "score", "polarity", "CustomScore")
)

custom_score_df.show(truncate=False)


23/12/02 02:12:53 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 80:>                                                         (0 + 1) / 1]

+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------------------------------------------------------+----+-----+------------+-----------+
|author              |title                                                                                                                                      |subreddit   |permalink                                                                          |ups |score|polarity    |CustomScore|
+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------------------------------------------------------+----+-----+------------+-----------+
|DONGBONGER3000      |Our boy Teddy dug a really long sideways hole.                                            

                                                                                

In [25]:
#Counting number of posts within 1 month time frame of fetch day

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DateType

filtered_df = filtered_df.withColumn("date", to_date(col("date"), "dd/MM/yyyy").cast(DateType()))

october_posts_df = filtered_df.filter(col("date").between("2023-09-25", "2023-10-25"))

october_posts_df.count()

                                                                                

53379

In [26]:
#Connecting mysql 

import mysql.connector

db_connection = mysql.connector.connect(user="group7", password="group7pass")
db_cursor = db_connection.cursor()
db_cursor.execute("USE cs179g;")


In [28]:
# Insert filtered_Df into MYSQL

from textblob import TextBlob
from pyspark.sql.functions import udf, sum, col
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
import mysql.connector


new_pandas_df = formatted_df.toPandas()

db_cursor.execute("""
    DROP TABLE IF EXISTS whole_df2
""")

db_cursor.execute("""
CREATE TABLE IF NOT EXISTS whole_df2 (
    id INT AUTO_INCREMENT PRIMARY KEY,
    author VARCHAR(255),
    date DATE,
    num_comments INT,
    score INT,
    text_body TEXT,
    title TEXT,
    subreddit VARCHAR(255),
    category VARCHAR(255),
    polarity DOUBLE, 
    interactiveness_score FLOAT
);
""")

for index, row in new_pandas_df.iterrows():
    query = """
        INSERT INTO whole_df2 (
            author, date, num_comments, score, text_body, title, subreddit, category, polarity, interactiveness_score
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    values = (
        row['author'], row['date'], row['num_comments'],
        row['score'], row['text_body'], row['title'], 
        row['subreddit'], row['category'], row['Polarity'], row['interactiveness_score']
    )

    db_cursor.execute(query, values)

db_cursor.execute("SELECT * FROM whole_df2 LIMIT 5;")
print(db_cursor.fetchall())

db_connection.commit()
db_connection.close()




                                                                                

[(1, 'DONGBONGER3000', datetime.date(2023, 10, 24), 1, 11, '', 'Our boy Teddy dug a really long sideways hole.', 'HistoryMemes', 'History', -0.05000000074505806, 1.0959), (2, 'Redbaron1701', datetime.date(2023, 10, 24), 1, 10, '', "I'd say let's string him up, but...", 'HistoryMemes', 'History', 0.0, 0.580063), (3, 'DanPowah', datetime.date(2023, 10, 24), 1, 57, '', 'That is actually their name, I kid you not', 'HistoryMemes', 'History', 0.0, 1.03035), (4, 'Titab-talaiy', datetime.date(2023, 10, 24), 34, 136, '', 'Why they are so stupid', 'HistoryMemes', 'History', -0.800000011920929, 10.6868), (5, 'One-Childs-Path', datetime.date(2023, 10, 24), 0, 11, ' The New Dance of Death-The Green Wreath and Dress Mongers. In a world before easily synthesized color, there was actual gold at the end of the 🌈 . In 1775, Carl Wilhelm Scheele experimented with arsenic & produced a green pigment out of copper arsenites. These copper arsenic greens are pigments, but used to color fabrics. Arsenic is mo

In [29]:
#Storing a random query on mysql

#Connecting mysql 

import mysql.connector

db_connection = mysql.connector.connect(user="group7", password="group7pass")
db_cursor = db_connection.cursor()
db_cursor.execute("USE cs179g;")

from textblob import TextBlob
from pyspark.sql.functions import udf, sum, col
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
import mysql.connector

spark = SparkSession.builder.appName("example").getOrCreate()

pandas_df = total_comments_df.toPandas()

db_cursor.execute("""
    CREATE TABLE IF NOT EXISTS total_comments (
        id INT AUTO_INCREMENT PRIMARY KEY,
        subreddit VARCHAR(255),
        total_comments INT
    )
""")

for index, row in pandas_df.iterrows():
    query = "INSERT INTO total_comments (subreddit, total_comments) VALUES (%s, %s)"
    values = (row['subreddit'], row['total_comments'])
    db_cursor.execute(query, values)

db_cursor.execute("SELECT * FROM total_comments LIMIT 5;")
print(db_cursor.fetchall())

db_connection.commit()
db_connection.close()


23/12/02 02:17:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

[(1, 'worldnews', 255113), (2, 'nottheonion', 204393), (3, 'IAmA', 176994), (4, 'todayilearned', 164635), (5, 'gadgets', 109514)]


In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("AveragePolarity3").getOrCreate()

formatted_df.createOrReplaceTempView("reddit_posts")

result = spark.sql("""
    SELECT
        category as category,
        AVG(Polarity) as avg_polarity,
        AVG(num_comments) as avg_num_comments,
        AVG(score) as avg_score,
        date as date
    FROM
        reddit_posts
    GROUP BY
        category, date
""")

result.show()

print("Number of rows in the result DataFrame:", result.count())


23/12/02 02:17:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 94:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------------+------------------+----------+
|            category|        avg_polarity|  avg_num_comments|         avg_score|      date|
+--------------------+--------------------+------------------+------------------+----------+
|    Animals and Pets| 0.03369791805744171|            129.25|          16358.75|2023-04-28|
|    Animals and Pets| 0.06773964166641236|              52.4|            6539.0|2023-04-02|
|              Movies| 0.22621001799901327|120.83333333333333|160.16666666666666|2023-09-04|
|              Movies|  0.4808223843574524|119.66666666666667|60.666666666666664|2023-07-15|
|     Home and Garden| 0.06480016073369434|20.154929577464788|135.33098591549296|2023-10-20|
|         Funny/Humor|                 0.0|              32.0|             127.0|2022-09-28|
|Internet Culture ...| 0.11297670686765322|  96.3015873015873| 3052.222222222222|2023-09-02|
|    Animals and Pets|-0.02844576537609...|              18.0| 513.666

[Stage 97:>                                                         (0 + 1) / 1]

Number of rows in the result DataFrame: 3468


                                                                                

In [31]:
#Storing a random query on mysql

#Connecting mysql 

import mysql.connector

db_connection = mysql.connector.connect(user="group7", password="group7pass")
db_cursor = db_connection.cursor()
db_cursor.execute("USE cs179g;")

from textblob import TextBlob
from pyspark.sql.functions import udf, sum, col
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
import mysql.connector

spark = SparkSession.builder.appName("example").getOrCreate()

print("TEST 1")
pandas_df = result.toPandas()
print("TEST 2")

db_cursor.execute("""
    DROP TABLE IF EXISTS averages3
""")

print("TEST 3")

db_cursor.execute("""
    CREATE TABLE IF NOT EXISTS averages3 (
        id INT AUTO_INCREMENT PRIMARY KEY,
        category VARCHAR(255),
        avg_polarity FLOAT,
        avg_num_comments FLOAT,
        avg_score FLOAT,
        date DATE
    );
""")

print("TEST 4")

for index, row in pandas_df.iterrows():
    print(f"Processing row {index + 1}/{len(pandas_df)}")
    query = "INSERT INTO averages3 (category, avg_polarity, avg_num_comments, avg_score, date) VALUES (%s, %s, %s, %s, %s)"
    values = (row['category'], row['avg_polarity'], row['avg_num_comments'], row['avg_score'], row['date'])
    db_cursor.execute(query, values)

db_cursor.execute("SELECT * FROM averages3 LIMIT 5;")
print(db_cursor.fetchall())

db_connection.commit()
db_connection.close()


23/12/02 02:18:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


TEST 1


                                                                                

TEST 2
TEST 3
TEST 4
Processing row 1/3468
Processing row 2/3468
Processing row 3/3468
Processing row 4/3468
Processing row 5/3468
Processing row 6/3468
Processing row 7/3468
Processing row 8/3468
Processing row 9/3468
Processing row 10/3468
Processing row 11/3468
Processing row 12/3468
Processing row 13/3468
Processing row 14/3468
Processing row 15/3468
Processing row 16/3468
Processing row 17/3468
Processing row 18/3468
Processing row 19/3468
Processing row 20/3468
Processing row 21/3468
Processing row 22/3468
Processing row 23/3468
Processing row 24/3468
Processing row 25/3468
Processing row 26/3468
Processing row 27/3468
Processing row 28/3468
Processing row 29/3468
Processing row 30/3468
Processing row 31/3468
Processing row 32/3468
Processing row 33/3468
Processing row 34/3468
Processing row 35/3468
Processing row 36/3468
Processing row 37/3468
Processing row 38/3468
Processing row 39/3468
Processing row 40/3468
Processing row 41/3468
Processing row 42/3468
Processing row 43/3468