In [None]:
#
# Same as Pipeline 4. but with Topic Discovery results
#

In [0]:
%pip install nltk

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import *
from itertools import islice
import requests
import nltk
import re
nltk.download('stopwords')
from nltk.corpus import stopwords
StopWords = stopwords.words("english")

stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
stopwords_2 = set(stopwords_list.decode().splitlines()) 

In [0]:
def stop_words_filter(x):
    return (~x.isin(StopWords)) & (~x.isin(stopwords_2)) & (x.isNotNull()) & (F.length(x) > 2)

def clean_body(x):
    punc='!"#$%&\()*+,-./:;<=>?@[\\]^_`{|}~'
    cleaned = x.lower()
    cleaned = re.sub(r'https?:\/\/.*[\r\n]*', '', cleaned, flags=re.MULTILINE)
    cleaned = re.sub(r'[u|a]\d+.*', '', cleaned, flags=re.MULTILINE) # remove special line break characters
        for ch in punc:
            cleaned = cleaned.replace(ch, '')
    return cleaned

def take(n, iterable):
    "Return first n items of the iterable as a list"
    return list(islice(iterable, n))

def word_count_agg(token_lists):
    word_count = {}
    for token_list in token_lists:
        for token in token_list:
            if token not in word_count:
                word_count[token] = 0
            word_count[token] += 1
    sorted_word_count = dict(sorted(word_count.items(), key=lambda item: item[1], reverse=True))
    n_items = take(50, sorted_word_count.items()) #Top 50
    return dict(n_items)

udf_word_count_agg = F.udf(word_count_agg , MapType(StringType(), IntegerType()))
clean_body_udf = F.udf(clean_body , StringType())


In [0]:
data_output = spark.read.option("header","true").parquet("dbfs:/mnt/group12/topic_discovery/")
data_chunk_tokens = data_output.withColumn('cleaned_body', clean_body_udf(F.col('body')))
data_chunk_tokens = data_chunk_tokens.withColumn('tokens', F.filter(F.split(F.col('cleaned_body'), ' '), stop_words_filter))

In [0]:
data_grouped = data_chunk_tokens.orderBy(F.col('message_topic_weight').desc()).groupby("created_at_month", "topic_discovery_unique_id").agg(
    F.count("id").alias("number_of_messages"),
	F.sum("like_count").alias("likes_total"), 
	F.avg("like_count").alias("average_likes"),  
	F.sum("dislike_count").alias("dislike_total"), 
	F.avg("dislike_count").alias("average_dislike"),
	F.sum("score").alias("score_total"), 
    F.avg("score").alias("average_score"), 
	F.avg("vader_sentiment").alias("avg_vader_sentiment"), 
    F.first(F.col("topic_discovery_title")).alias("topic_discovery_title"),
    F.first(F.col("topic_desc")).alias("topic_desc"),
    F.first(F.col("topic_discovery_id")).alias("topic_discovery_id"),
    #F.slice(F.collect_list(F.col("message_topic_weight")), 1, 20).alias('message_topic_weight')
    F.create_map(
        F.lit('negative_emotions'), F.avg("liwc_sentiment.negemo"),
        F.lit('positive_emotions'), F.avg("liwc_sentiment.posemo"),
        F.lit('anger'), F.avg("liwc_sentiment.anger"), 
        F.lit('sad'), F.avg("liwc_sentiment.sad"), 
        F.lit('money'), F.avg("liwc_sentiment.money"),	
        F.lit('health'), F.avg("liwc_sentiment.health"), 
        F.lit('social'), F.avg("liwc_sentiment.social"),
        F.lit('anxiety'), F.avg("liwc_sentiment.anx"), 
        F.lit('humans'), F.avg("liwc_sentiment.friends"), 
        F.lit('family'), F.avg("liwc_sentiment.family"), 
        F.lit('friends'), F.avg("liwc_sentiment.friends"), 
        F.lit('focusfuture'), F.avg("liwc_sentiment.focusfuture"), 
        F.lit('focuspast'), F.avg("liwc_sentiment.focuspast"), 
        F.lit('focuspresent'), F.avg("liwc_sentiment.focuspresent"), 
        F.lit('work'), F.avg("liwc_sentiment.work"),
        F.lit('drives'), F.avg("liwc_sentiment.drives"), 
        F.lit('discrepancies'), F.avg("liwc_sentiment.discrep"),
        F.lit('time'), F.avg("liwc_sentiment.time"), 
        F.lit('leisure'), F.avg("liwc_sentiment.leisure"),
        F.lit('death'), F.avg("liwc_sentiment.death"), 
        F.lit('religion'), F.avg("liwc_sentiment.relig"),
    ).alias('liwc_sentiment_map'),
    udf_word_count_agg(F.collect_list("tokens")).alias('word_count'),
    F.slice(F.collect_list(F.col('body')), 1, 20).alias('messages_sample')
)

In [0]:
data_grouped.count()

In [0]:
data_user = data_chunk_tokens.groupby("created_at_month", "topic_discovery_unique_id", "username").agg(F.sum("score").alias("user_score"))
user_order = data_user.orderBy(F.col('created_at_month').asc(), F.col('topic_discovery_unique_id'), F.col('user_score').desc())

data_ordered = data_grouped.orderBy(
  F.col('created_at_month').asc(), 
  F.col('score_total').desc()
)

data_user_order = data_ordered.alias("dO").join(user_order.alias("UO"),
                          ((F.col("dO.created_at_month") == F.col("UO.created_at_month")) & (F.col("dO.topic_discovery_unique_id") == F.col("UO.topic_discovery_unique_id"))), "inner").orderBy(F.col('dO.created_at_month').asc(), F.col('dO.topic_discovery_unique_id'), F.col('user_score').desc())

In [0]:
aggregated_result = data_user_order.groupby("dO.created_at_month", "dO.topic_discovery_unique_id").agg(
  F.slice(F.collect_list(F.col('username')), 1, 10).alias('users'),
  F.first('number_of_messages').alias('number_of_messages'),
  F.first('likes_total').alias('likes_total'),
  F.first('average_likes').alias('average_likes'),
  F.first('dislike_total').alias('dislike_total'),
  F.first('average_dislike').alias('average_dislike'),
  F.first('score_total').alias('score_total'),
  F.first('average_score').alias('average_score'),
  F.first('avg_vader_sentiment').alias('avg_vader_sentiment'),
  F.first('liwc_sentiment_map').alias('liwc_sentiment_map'),
  F.first('word_count').alias('word_count'),
  F.first('messages_sample').alias('messages_sample'),
  F.first(F.col("topic_discovery_title")).alias("topic_discovery_title"),
  F.first(F.col("topic_desc").alias("topic_desc")),
  F.first(F.col("topic_discovery_id").alias("topic_discovery_id")),
)

In [0]:
aggregated_result.count()

In [0]:
TOP_RESULTS = 10

windowed_aggregated_result = aggregated_result.withColumn("row_num", F.row_number().over(Window.partitionBy("created_at_month").orderBy(F.col('score_total').desc())))
windowed_aggregated_result = windowed_aggregated_result.filter(F.col('row_num') < TOP_RESULTS)
aggregated_result_top_n = windowed_aggregated_result.orderBy(F.col('created_at_month').asc(), F.col('score_total').desc())

In [0]:
aggregated_result_top_n.coalesce(1).write.mode('overwrite').json("dbfs:/mnt/group12/data_output/topics_discovery_top_10.json")

In [0]:
aggregated_result.coalesce(1).write.mode('overwrite').json("dbfs:/mnt/group12/data_output/topics_discovery.json")

In [0]:
display(dbutils.fs.ls('mnt/group12/data_output/topics_discovery_top_10.json'))

path,name,size
dbfs:/mnt/group12/data_output/topics_discovery_top_10.json/_SUCCESS,_SUCCESS,0
dbfs:/mnt/group12/data_output/topics_discovery_top_10.json/_committed_457039126842578981,_committed_457039126842578981,117
dbfs:/mnt/group12/data_output/topics_discovery_top_10.json/_started_457039126842578981,_started_457039126842578981,0
dbfs:/mnt/group12/data_output/topics_discovery_top_10.json/part-00000-tid-457039126842578981-7e7c615f-a89b-4e41-bf34-444fd976ce90-1255960-1-c000.json,part-00000-tid-457039126842578981-7e7c615f-a89b-4e41-bf34-444fd976ce90-1255960-1-c000.json,1030871
