# jupyter lab --ip=0.0.0.0
# start-master.sh
# start-worker.sh spark://yash-kukrejade-6:7077

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark_session = SparkSession.builder\
        .master("spark://192.168.2.14:7077") \
        .appName("Lecture1_Example0_with_spark")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 2)\
        .config("spark.driver.host", "192.168.2.14")\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")

In [None]:
# lines = spark_context.textFile('hdfs://192.168.2.14:9000/home/ubuntu/i-have-a-dream.txt')
# lines.first()

In [None]:
# root
#  |-- author: string (nullable = true) 
#  |-- body: string (nullable = true)
#  |-- content: string (nullable = true)
#  |-- content_len: long (nullable = true)
#  |-- id: string (nullable = true)
#  |-- normalizedBody: string (nullable = true)
#  |-- subreddit: string (nullable = true)
#  |-- subreddit_id: string (nullable = true)
#  |-- summary: string (nullable = true)
#  |-- summary_len: long (nullable = true) 
#  |-- title: string (nullable = true)

reddit_data = spark_session.read.json("hdfs://192.168.2.14:9000/home/ubuntu/input/sample-2000.json")
reddit_data = reddit_data.drop(
    *[
        "content_len",
        "summary_len",
        "id",
        "subreddit_id",
        "body",
        "content",
        "summary",
        "title",
        "subreddit",
        "author"
    ]
)
reddit_data.show()

In [None]:
# https://www.cs.cmu.edu/~biglou/resources/bad-words.txt
bad_words = spark_context.textFile("hdfs://192.168.2.14:9000/home/ubuntu/bad_words.txt")
# bad_words = spark_session.createDataFrame(bad_words, ['bad_words'])
bad_words = bad_words.filter(lambda x: x != '').collect()
# print(bad_words)

In [None]:
# reddit_data = reddit_data.withColumn("splited_words", F.split("normalizedBody", ' '))
reddit_data = reddit_data.filter(F.col('normalizedBody').rlike('|'.join(bad_words)))

In [None]:
# Read bad words from text file
bad_words = spark_session.read.text("hdfs://192.168.2.14:9000/home/ubuntu/bad_words.txt") \
    .filter(F.col("value") != "") \
    .select(F.col("value").alias("bad_word"))

# Extract words from comments and explode them
words = reddit_data.select(F.explode(F.split(F.lower(F.col("normalizedBody")), "\\s+")).alias("word"))

# Filter out bad words
bad_words_counts = words.join(bad_words, words.word == bad_words.bad_word, "left_outer") \
    .filter(F.col("bad_word").isNotNull()) \
    .groupBy("bad_word") \
    .count() \
    .orderBy("count", ascending=False)

# # Show bad words counts
bad_words_counts.show()

# Stop SparkSession
spark_session.stop()
