In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, pandas_udf
import re
import nltk
import pandas as pd
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.sentiment import SentimentIntensityAnalyzer
from time import perf_counter

nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('vader_lexicon')

[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/ubuntu/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [2]:
spark_session = SparkSession.builder \
    .master("spark://192.168.2.47:7077") \
    .appName("manual_test") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.instances", 1) \
    .config("spark.executor.memory", "5G") \
    .config("spark.cores.max", 12) \
    .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/20 01:48:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/20 01:48:28 WARN StandaloneSchedulerBackend: Dynamic allocation enabled without spark.executor.cores explicitly set, you may get more executors allocated than expected. It's recommended to set spark.executor.cores explicitly. Please check SPARK-30299 for more details.


In [4]:
# Loading the dataset without the corrupted values
df = spark_session.read.option("mode", "DROPMALFORMED").json("hdfs://192.168.2.47:9000/data-project/reddit_50k.json")

                                                                                

In [5]:
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
sia = SentimentIntensityAnalyzer()

@pandas_udf("string")
def preprocess_text_udf(text_series: pd.Series) -> pd.Series:
    def preprocess(text):
        # Lowercase the text
        text = text.lower()
        
        # Remove special characters and extra spaces
        text = re.sub(r'\W+', ' ', text)
        
        # Split text into words
        words = text.split()
    
        # Remove stopwords
        words = [w for w in words if w not in stop_words]
    
        # Lemmatize words
        words = [lemmatizer.lemmatize(w) for w in words]
    
        out_text = ' '.join(words)
    
        return out_text

    return text_series.apply(preprocess)

@pandas_udf("float")
def sentiment_score_udf(text_series: pd.Series) -> pd.Series:
    return text_series.apply(lambda text: sia.polarity_scores(text)['compound'])

@pandas_udf("string")
def sentiment_label_udf(score_series: pd.Series) -> pd.Series:
    def label(score):
        if score > 0.05:
            return 'positive'
        elif score < -0.05:
            return 'negative'
        else:
            return 'neutral'
    return score_series.apply(label)

In [8]:
init_time = perf_counter()

df_prep = df.drop('body', 'content', 'id', 'subreddit_id', 'title', 'author', 'content_len', 'summary', 'summary_len')

k = 25 # number of subreddits to consider
df_top = df_prep.select('subreddit') \
                .groupBy('subreddit').count() \
                .sort('count', ascending=False) \
                .limit(k)

topk_subreddits = [df_top.collect()[i][0] for i in range(k)]

df_topk = df_prep.filter(df_prep['subreddit'].isin(topk_subreddits))

df_topk = df_topk.withColumn("clean_text", preprocess_text_udf(df_topk['normalizedBody']))

df_vader = df_topk.withColumn("sentiment_score", sentiment_score_udf(df_topk['clean_text']))
df_vader = df_vader.withColumn("sentiment_label", sentiment_label_udf(df_vader['sentiment_score']))

df_avg = df_vader.groupBy("subreddit") \
                 .agg(F.round(F.avg("sentiment_score"), 4).alias("avg_sentiment_score")) \
                 .limit(k)

df_avg = df_avg.withColumn("sentiment_label", sentiment_label_udf(df_avg['avg_sentiment_score']))

df_avg.show(k)

end_time = perf_counter()

elapsed_time = end_time - init_time
print(elapsed_time)


Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)

+-------------------+-------------------+---------------+
|          subreddit|avg_sentiment_score|sentiment_label|
+-------------------+-------------------+---------------+
|         offmychest|             0.2412|       positive|
|          AskReddit|             0.1281|       positive|
|             videos|             0.0718|       positive|
|              DotA2|             0.3839|       positive|
|      todayilearned|             0.1156|       positive|
|      AdviceAnimals|              0.074|       positive|
|     DestinyTheGame|              0.377|       positive|
|      relationships|             0.4971|       positive|
|               pics|             0.0994|       positive|
|            Fitness|             0.3721|       positive|
|         reddit.com|              0.242|       positive|
|          worldnews|            -0.1374|       negative|
|    TwoXChromosomes|             0.3153|       positive|
|           politics|             0.0821|       positive|
|             

                                                                                

In [9]:
spark_session.stop()

In [10]:
spark_session = SparkSession.builder \
    .master("spark://192.168.2.47:7077") \
    .appName("manual_test") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.memory", "5G") \
    .config("spark.cores.max", 12) \
    .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

In [13]:
df = spark_session.read.option("mode", "DROPMALFORMED").json("hdfs://192.168.2.47:9000/data-project/reddit_50k.json")

                                                                                

In [14]:
init_time = perf_counter()

df_prep = df.drop('body', 'content', 'id', 'subreddit_id', 'title', 'author', 'content_len', 'summary', 'summary_len')

k = 25 # number of subreddits to consider
df_top = df_prep.select('subreddit') \
                .groupBy('subreddit').count() \
                .sort('count', ascending=False) \
                .limit(k)

topk_subreddits = [df_top.collect()[i][0] for i in range(k)]

df_topk = df_prep.filter(df_prep['subreddit'].isin(topk_subreddits))

df_topk = df_topk.withColumn("clean_text", preprocess_text_udf(df_topk['normalizedBody']))

df_vader = df_topk.withColumn("sentiment_score", sentiment_score_udf(df_topk['clean_text']))
df_vader = df_vader.withColumn("sentiment_label", sentiment_label_udf(df_vader['sentiment_score']))

df_avg = df_vader.groupBy("subreddit") \
                 .agg(F.round(F.avg("sentiment_score"), 4).alias("avg_sentiment_score")) \
                 .limit(k)

df_avg = df_avg.withColumn("sentiment_label", sentiment_label_udf(df_avg['avg_sentiment_score']))

df_avg.show(k)

end_time = perf_counter()

elapsed_time = end_time - init_time
print(elapsed_time)

25/03/20 01:57:36 ERROR DAGScheduler: Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 3
org.apache.spark.SparkException: EOF reached before Python server acknowledged
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:751)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1694)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1685)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1685)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1838)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
	at o

+-------------------+-------------------+---------------+
|          subreddit|avg_sentiment_score|sentiment_label|
+-------------------+-------------------+---------------+
|         offmychest|             0.2412|       positive|
|          AskReddit|             0.1281|       positive|
|             videos|             0.0718|       positive|
|              DotA2|             0.3839|       positive|
|      todayilearned|             0.1156|       positive|
|      AdviceAnimals|              0.074|       positive|
|     DestinyTheGame|              0.377|       positive|
|      relationships|             0.4971|       positive|
|               pics|             0.0994|       positive|
|            Fitness|             0.3721|       positive|
|         reddit.com|              0.242|       positive|
|          worldnews|            -0.1374|       negative|
|    TwoXChromosomes|             0.3153|       positive|
|           politics|             0.0821|       positive|
|             

25/03/20 01:57:39 ERROR DAGScheduler: Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 0
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:747)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1694)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1685)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.fore

In [15]:
spark_session.stop()

In [17]:
spark_session = SparkSession.builder \
    .master("spark://192.168.2.47:7077") \
    .appName("manual_test") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.instances", 3) \
    .config("spark.executor.memory", "5G") \
    .config("spark.cores.max", 12) \
    .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

In [18]:
df = spark_session.read.option("mode", "DROPMALFORMED").json("hdfs://192.168.2.47:9000/data-project/reddit_50k.json")

                                                                                

In [19]:
init_time = perf_counter()

df_prep = df.drop('body', 'content', 'id', 'subreddit_id', 'title', 'author', 'content_len', 'summary', 'summary_len')

k = 25 # number of subreddits to consider
df_top = df_prep.select('subreddit') \
                .groupBy('subreddit').count() \
                .sort('count', ascending=False) \
                .limit(k)

topk_subreddits = [df_top.collect()[i][0] for i in range(k)]

df_topk = df_prep.filter(df_prep['subreddit'].isin(topk_subreddits))

df_topk = df_topk.withColumn("clean_text", preprocess_text_udf(df_topk['normalizedBody']))

df_vader = df_topk.withColumn("sentiment_score", sentiment_score_udf(df_topk['clean_text']))
df_vader = df_vader.withColumn("sentiment_label", sentiment_label_udf(df_vader['sentiment_score']))

df_avg = df_vader.groupBy("subreddit") \
                 .agg(F.round(F.avg("sentiment_score"), 4).alias("avg_sentiment_score")) \
                 .limit(k)

df_avg = df_avg.withColumn("sentiment_label", sentiment_label_udf(df_avg['avg_sentiment_score']))

df_avg.show(k)

end_time = perf_counter()

elapsed_time = end_time - init_time
print(elapsed_time)

25/03/20 01:59:36 ERROR DAGScheduler: Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 3
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:747)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1694)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1685)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.fore

+-------------------+-------------------+---------------+
|          subreddit|avg_sentiment_score|sentiment_label|
+-------------------+-------------------+---------------+
|         offmychest|             0.2412|       positive|
|          AskReddit|             0.1281|       positive|
|             videos|             0.0718|       positive|
|              DotA2|             0.3839|       positive|
|      todayilearned|             0.1156|       positive|
|      AdviceAnimals|              0.074|       positive|
|     DestinyTheGame|              0.377|       positive|
|      relationships|             0.4971|       positive|
|               pics|             0.0994|       positive|
|            Fitness|             0.3721|       positive|
|         reddit.com|              0.242|       positive|
|          worldnews|            -0.1374|       negative|
|    TwoXChromosomes|             0.3153|       positive|
|           politics|             0.0821|       positive|
|             

25/03/20 01:59:39 ERROR DAGScheduler: Failed to update accumulator 0 (org.apache.spark.api.python.PythonAccumulatorV2) for task 0
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:747)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1(DAGScheduler.scala:1694)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$updateAccumulators$1$adapted(DAGScheduler.scala:1685)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.fore

In [None]:
spark_session.stop()