In [1]:
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
import time
from operator import add
from pyspark import SparkConf, SparkContext
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
import dataframe_image as dfi
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
import numpy as np
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover

In [2]:
def preprocess(rdd):
    rdd= rdd.split(" ")
    entries = 0 
    rddnew=[]
    for x in rdd:
        entries += 1
        if "." in x:
            x=x.replace(".","")
        if ":" in x:
            x=x.replace(":","")
        if "," in x:
            x=x.replace(",","")
        if " " in x:
            x=x.replace(" ","")
        if '"' in x:
            x=x.replace('"',"")
        if "\n" in x:
            x=x.replace("\n"," ")
        rddnew.append(x.lower())
    print(entries)
    return rddnew

In [3]:
Sample_ratio=0.001 #Change this sample ratio to obtain running time of about 2mins

def get_most_popular(df):
    df=df.sample(Sample_ratio) 
    #Get df with 20 most popular (defined by number of comments) subreddits.
    subreddits = df.select("Subreddit").rdd.flatMap(lambda x:x).map(lambda x: (x,1)).reduceByKey(add).toDF(["Subreddit","comment_count"]).sort("comment_count",ascending=False).limit(20)

    #Get df with average comment lenght for those 20 most popular
    top_subreddits = subreddits.select("Subreddit").rdd.flatMap(lambda x:x).collect()
    top_subreddits_average_lenght=[]
    for x in top_subreddits:

        top_subreddits_average_lenght.append(int(df.where(df["Subreddit"]==x).select("body").rdd.flatMap(lambda x:x).map(lambda x: x.split(" ")).map(lambda x: len(x)).mean()))

    z=zip(top_subreddits_average_lenght,top_subreddits)
    
    df1=spark.createDataFrame(z,["average_lenght","Subreddit"])
    #Join the two dataframes
    df2=df1.join(subreddits,"Subreddit")
    pandas_df=df2.toPandas()

    df_styled = pandas_df.style.background_gradient().hide(axis="index")
    #dfi.export(df_styled,"popular.png",fontsize=3.8, dpi=800,  table_conversion='chrome', chrome_path=None)
    return df_styled

def get_groups(df):
    df=df.sample(Sample_ratio)
    #concatenate comments by subreddit
    subreddits = df.select("Subreddit").rdd.flatMap(lambda x:x).map(lambda x: (x,1)).reduceByKey(add).toDF(["Subreddit","comment_count"]).sort("comment_count",ascending=False).limit(20)

    #Get df with average comment lenght for those 20 most popular
    top_subreddits = subreddits.select("Subreddit").rdd.flatMap(lambda x:x).collect()
    df4=df.filter(df.subreddit.isin(top_subreddits)).select("Subreddit", "body").rdd.map(lambda x:x).map(lambda x: (x[0],x[1])).reduceByKey(add)
    df5=df4.toDF()

    #run pipeline
    
    tokenizer = Tokenizer(inputCol="_2", outputCol="tokens")
    remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
    hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=2000)
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
    from pyspark.ml.clustering import KMeans
    kmeans = KMeans(k=2)

    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf,kmeans])
    model = pipeline.fit(df5)
    results = model.transform(df5)

    #Merge cluster with previous table
    df3=results.selectExpr("_1 as Subreddit", "prediction as group").sort("group").toPandas()
    df_styled = df3.style.background_gradient().hide(axis="index")
    #dfi.export(df_styled,"groups.png",fontsize=3.8, dpi=800,  table_conversion='chrome', chrome_path=None)
    return df_styled

#time in the day of comments
def get_comments_time(df):
    
    fig, ax = plt.subplots()

    hist(ax, df.select('hour'), bins = 24, color=['grey'])
    plt.savefig('comments_time.png')
    plt.clf()

def get_words_use(df):
    first_word="night"
    second_word="midnight"
    third_word="morning"
    word1=[]
    word2=[]
    word3=[]
    x=list(range(24))
    for _ in x:

        df1=df.filter(df.hour==_).select("body").rdd.flatMap(lambda x:x).flatMap(lambda x: preprocess(x)).map(lambda x: (x,1)).reduceByKey(add).toDF(["word","count"])
        try:
            word1.append(df1.filter(df1.word==f"{first_word}").collect()[0][1]/df1.groupBy().sum().collect()[0][0])
        except IndexError:
            word1.append(0)
        try:
            word2.append(df1.filter(df1.word==f"{second_word}").collect()[0][1]/df1.groupBy().sum().collect()[0][0])
        except IndexError:
            word2.append(0)
        try:
            word3.append(df1.filter(df1.word==f"{third_word}").collect()[0][1]/df1.groupBy().sum().collect()[0][0])
        except IndexError:
            word3.append(0)

    fig = plt.figure(facecolor=(1, 1, 1))
    plt.plot(x, word1,label=f"word={first_word}")
    plt.plot(x, word2,label=f"word={second_word}")
    plt.plot(x, word3,label=f"word={third_word}")
    plt.legend()
    plt.xlabel("hour time")
    plt.ylabel("word frequency")
    plt.savefig('words_frequency.png')
    plt.clf()

In [16]:

#working with optimal setup, 4 workers and 8 cores 
spark = SparkSession.builder\
    .master("spark://192.168.2.102:7077") \
    .appName("Analysis")\
    .config("spark.cores.max", f"8")\
    .getOrCreate()

sqlContext = SQLContext(spark.sparkContext)

In [24]:
import pyspark.sql.types as T

data_schema = T.StructType([
  T.StructField('author', T.StringType(),True),
  T.StructField('author_flair_css_class',T.StringType(),True),
  T.StructField('author_flair_text',T.StringType(),True),
  T.StructField('body',T.StringType(),True)
    ])

In [26]:
df = spark.read.json('hdfs://130.238.29.139:9000/reddit/*.json/') 
df.createOrReplaceTempView('park') #preparing dataframe sql
df = sqlContext.sql("select body,created_utc,subreddit from park").where(df["body"]!="[deleted]").withColumn("hour",hour(from_unixtime(col("created_utc"))))

                                                                                

In [28]:
df.show()

+--------------------+-----------+---------------+----+
|                body|created_utc|      subreddit|hour|
+--------------------+-----------+---------------+----+
|    Good lord.  Yes.| 1309478400|leagueoflegends|   0|
|I don't know abou...| 1309478401|      runescape|   0|
|Explain something...| 1309478400|         Israel|   0|
|I would add that ...| 1309478400|         loseit|   0|
|care to explain #...| 1309478402|            PHP|   0|
|A society where t...| 1309478402|      AskReddit|   0|
|Normally, when I ...| 1309478402|            WTF|   0|
|No one wants to t...| 1309478401|          codbo|   0|
|    I don't get it. | 1309478404|    Philippines|   0|
|I wonder whether ...| 1309478405|    environment|   0|
|I would suggest j...| 1309478409|        writing|   0|
|Remember, don't c...| 1309478409|        Fitness|   0|
|I made it [here](...| 1309478408|     California|   0|
|I think you may h...| 1309478407|            tf2|   0|
|Not clever but "I...| 1309478408|        firefl

                                                                                

In [29]:
get_comments_time(df)

                                                                                

<Figure size 640x480 with 0 Axes>

In [30]:
get_words_use(df)

                                                                                

<Figure size 640x480 with 0 Axes>

In [31]:
get_groups(df)

23/03/20 21:24:18 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/20 21:24:18 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Subreddit,group
videos,0
IAmA,0
gonewild,0
worldnews,0
reddit.com,0
gaming,0
funny,0
leagueoflegends,0
politics,0
mylittlepony,0


In [32]:
get_most_popular(df)

                                                                                

Subreddit,average_lenght,comment_count
AskReddit,32,6493
pics,19,3533
gaming,27,1852
funny,18,1769
reddit.com,25,1708
fffffffuuuuuuuuuuuu,17,1492
IAmA,37,1295
politics,43,1285
atheism,45,1028
trees,22,977


In [36]:
df.count()


                                                                                

44667631