In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import trim

# New API
spark_session = SparkSession.builder\
        .master("spark://192.168.2.156:7077") \
        .appName("Group 7 Reddit Test Code")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 2)\
        .config("spark.dynamicAllocation.maxExecutors", 4)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 14:08:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/06 14:08:53 WARN Utils: Service 'sparkDriver' could not bind on port 9999. Attempting port 10000.


In [2]:
#reading in the reddit data from the corpus which was the suggested dataset
df = spark_session.read.json("hdfs://192.168.2.156:9000/data/reddit/corpus-webis-tldr-17.json")
df.printSchema()



root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- normalizedBody: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)
 |-- title: string (nullable = true)



                                                                                

In [4]:
df.rdd.getNumPartitions()

147

In [5]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------------+--------------------+--------------------+-----------+-------+--------------------+--------------------+------------+--------------------+-----------+--------+
|            author|                body|             content|content_len|     id|      normalizedBody|           subreddit|subreddit_id|             summary|summary_len|   title|
+------------------+--------------------+--------------------+-----------+-------+--------------------+--------------------+------------+--------------------+-----------+--------+
|  raysofdarkmatter|I think it should...|I think it should...|        178|c69al3r|I think it should...|                math|    t5_2qh0n|Shifting seasonal...|          8|    NULL|
|           Stork13|Art is about the ...|Art is about the ...|        148|c6a9nxd|Art is about the ...|               funny|    t5_2qh33|Personal opinions...|          4|    NULL|
|     Cloud_dreamer|Ask me what I thi...|Ask me what I thi...|         76|c6acx4l|Ask me what I thi.

                                                                                

In [None]:
#number of reddit posts we have
df.count()



In [None]:
#lets group by subreddit and count the number of posts in each subreddit
df_subreddit = df.groupBy(df.subreddit).count().orderBy("count", ascending = False)

In [15]:
#get the most popular subreddits based on number of posts
df_subreddit.show()



+-------------------+------+
|          subreddit| count|
+-------------------+------+
|          AskReddit|589947|
|      relationships|352049|
|    leagueoflegends|109307|
|               tifu| 52219|
|relationship_advice| 50416|
|              trees| 47286|
|             gaming| 43851|
|            atheism| 43268|
|      AdviceAnimals| 40783|
|              funny| 40171|
|           politics| 36518|
|               pics| 35098|
|                sex| 28806|
|                WTF| 25781|
|  explainlikeimfive| 25482|
|      todayilearned| 25004|
|            Fitness| 22694|
|               IAmA| 22689|
|          worldnews| 22577|
|              DotA2| 22405|
+-------------------+------+
only showing top 20 rows



                                                                                

In [16]:
#look at the average content length
from pyspark.sql.functions import avg
df.select(avg("content_len")).collect()[0][0]

                                                                                

271.93096278125836

In [19]:
df_askReddit = df.where(df.subreddit == 'AskReddit') 

In [20]:
df_askReddit.show()

+----------------+--------------------+--------------------+-----------+-------+--------------------+---------+------------+--------------------+-----------+--------------------+
|          author|                body|             content|content_len|     id|      normalizedBody|subreddit|subreddit_id|             summary|summary_len|               title|
+----------------+--------------------+--------------------+-----------+-------+--------------------+---------+------------+--------------------+-----------+--------------------+
|        phyzishy|Yeah, but most fo...|Yeah, but most fo...|         75|c6b52m8|Yeah, but most fo...|AskReddit|    t5_2qh1i|       stupid stuff.|          2|                NULL|
|     fallsuspect|You probably won'...|You probably won'...|         79|c6bncqn|You probably won'...|AskReddit|    t5_2qh1i|just get both of ...|         11|                NULL|
|    SinglesRazor|I want to say thi...|I want to say thi...|        375|c6c72m7|I want to say thi...|AskR

In [21]:
#find the most frequent user in the most popular subreddit
df_askReddit_most_frequent_user = df_askReddit.groupBy(df_askReddit.author).count().orderBy("count", ascending = False)

In [22]:
df_askReddit_most_frequent_user.show()



+-------------------+-----+
|             author|count|
+-------------------+-----+
|          [deleted]|29118|
|    RamsesThePigeon|  700|
|          redweasel|  245|
|         Viperbunny|  205|
|          laterdude|  150|
|            cobysev|  126|
|            DesCo83|  125|
|     Business-Socks|  124|
|           mortaine|  108|
|   Fearlessleader85|  108|
|Late_Night_Grumbler|  102|
|      bowhunter_fta|  101|
|         MadLintElf|  101|
|       josiahpapaya|   98|
|       HalfysReddit|   94|
|             Lots42|   93|
| lil-praying-mantis|   84|
|        cromemako83|   83|
|     IrrelevantTLDR|   83|
|           m_bishop|   80|
+-------------------+-----+
only showing top 20 rows



                                                                                

Most Frequent user is [deleted] but this doesn't count. Next most frequent user is RamsesThePigeon. Let's look at some of their posts

In [24]:
df_frequent_user = df_askReddit.where(df_askReddit.author == 'RamsesThePigeon') 

In [40]:
#get the rdd of the content from this user
rdd = df_frequent_user.select(df_frequent_user.content).rdd
#store it locally since we now have a smaller dataset
content = rdd.collect()

                                                                                

In [56]:
import numpy as np
content = np.array(content).flatten()
content[99][:]

'Gandalf \n "You\'re an idiot," you might say. "Or, at the very least, you misread the question." \n Everyone views Gandalf as being a hero. Sure, the fellow might seem a little bit aloof at times, but that\'s only because he\'s working behind the scenes to ensure victory for The Fellowship (or for Thorin and Company, depending on which story you\'re trying to follow). Even when his own life is on the line, the wizard known as Mithrandir was willing to sacrifice himself for the good of his compatriots. \n Here\'s the thing, though: Via a series of various of events that were  seemingly  beyond his control, Gandalf managed to ensure that he was the most powerful being in all of Middle Earth. \n First of all, we know that Gandalf was one of five wizards in the known world, the most powerful of which was Saruman. He was ostensibly tasked with making sure that Sauron - the floating eyeball dude who made The One Ring - didn\'t get up to too much mischief... and in the course of that quest, 

In [9]:
 spark_session.stop()