In [1]:
import numpy as np # linear algebra
import pandas as pd
import matplotlib.pyplot as plt
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import * 
from pyspark.sql import functions as f

In [2]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.172:7077") \
        .appName("Investigation on Authors based on content length")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/07 16:12:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark_session

In [4]:
df = spark_session.read.json('hdfs://de-i-16-hdfs:9000/user/ubuntu/corpus-webis-tldr-17.json')

                                                                                

In [5]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- normalizedBody: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)
 |-- title: string (nullable = true)



In [6]:
df.agg({"content_len": "max"}).collect()[0][0]

                                                                                

9952

In [7]:
df.agg({"content_len": "min"}).collect()[0][0]

                                                                                

2

In [8]:
df_filtered=df.filter(df.author != '[deleted]')

In [9]:
long_content_result = df_filtered.select('author', 'content_len').sort('content_len', ascending = False)

In [10]:
long_content_result.show()
#plt.bar(long_content_result['author'],long_content_result['content_len'])



+--------------------+-----------+
|              author|content_len|
+--------------------+-----------+
|         Didimeister|       9952|
|PmMeUrGrammerMistake|       7259|
|         iluhdatmass|       7249|
|           Atlas2088|       7215|
|               nlofe|       7197|
|    doigotoaustralia|       7076|
|            POVsocks|       7052|
|    doigotoaustralia|       7027|
|         fakie_cakes|       7014|
|  supermanthrows1212|       6948|
| redditmakesmegiggle|       6829|
|           thedeejus|       6790|
|             jackou2|       6730|
|           interfail|       6713|
|             olicope|       6703|
|             olicope|       6694|
|     PM_ME_NUDE_BITS|       6642|
|         sorryfriend|       6627|
|        Candle_Jack_|       6597|
|Vincethatwaspromised|       6595|
+--------------------+-----------+
only showing top 20 rows



                                                                                

In [11]:
df_filtered_2 = df_filtered.filter(df_filtered.content_len < 1000)

In [12]:
author_grouped = df_filtered_2.groupBy('author').mean('content_len')

In [13]:
author_grouped.show(20)



+----------------+------------------+
|          author|  avg(content_len)|
+----------------+------------------+
|  hollaback_girl| 97.52380952380952|
|       burncycle|120.79310344827586|
|        Gonziago|             225.0|
|      scubamikey|             181.5|
|  The_ESC_artist|             165.0|
|      leethestud|             180.0|
|        vaughnm1| 646.6666666666666|
|       occamsrzr|            124.75|
|   twisted_spoon|             398.5|
|        malkouri| 188.2941176470588|
|       cucchiaio|378.85714285714283|
| MedeaDemonblood| 443.1818181818182|
|      Purrmaster|             148.0|
|  HiddenInSorrow|             332.0|
|             OIP| 112.0990099009901|
|       arcbeetle|158.33333333333334|
|Fearlessleader85| 289.8854166666667|
|     panthanator|             337.0|
|         ChoiceD|               4.0|
|     SpartacvsZA|             806.0|
+----------------+------------------+
only showing top 20 rows



                                                                                

In [35]:
#author_subreddit_grouped = df_filtered_2.groupBy('author').count()

In [14]:
author_subreddit_grouped = df_filtered_2.groupBy('author').agg(f.collect_list('subreddit'))

In [15]:
author_subreddit_grouped.withColumn('collect_list(subreddit)', f.explode(f.col('collect_list(subreddit)'))).show()



+---------------+-----------------------+
|         author|collect_list(subreddit)|
+---------------+-----------------------+
|   ---JustMe---|              worldnews|
|    ---annon---|          TrueChristian|
|    ---annon---|         rapecounseling|
|    ---annon---|              AskReddit|
|--Beetlejuice--|              HappyWars|
|        --Chaos|                  DotA2|
|         --MG--|          DunderMifflin|
|         --MG--|          clevelandcavs|
|       --Ping--|                   IAmA|
|       --Ping--|                 movies|
|         --TT--|                    40k|
|     --Unidan--|                 Nexus5|
|           -10-|             conspiracy|
|           -10-|                    law|
|           -10-|             conspiracy|
|           -3k-|             Guildwars2|
|  -AbracadaveR-|                nosleep|
| -Ace_Rockolla-|                  wakfu|
|         -Ahab-|              AskReddit|
|         -Ahab-|          AdviceAnimals|
+---------------+-----------------

                                                                                

In [16]:
spark_context.stop()