In [2]:
import os
import nltk

# https://stackoverflow.com/questions/51390676/how-to-visualize-pyspark-mls-lda-or-other-clustering

nltk.download('stopwords')
from nltk.corpus import stopwords

from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.sql import SparkSession, functions
from pyspark.ml.feature import Word2Vec,CountVectorizer,Tokenizer, StopWordsRemover
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.sql.functions import col, udf, countDistinct, regexp_replace
from pyspark.sql.types import IntegerType,ArrayType,StringType
import pandas as pd
import numpy as np
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit
import csv

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

months = ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]
month = "November"
years = ["2010", "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019"]
for year in years:
    for month in months:
        csv_path = "/" + year + "/" + month + ".csv"
        json_path = "/" + year + "/" + month + ".json"
        data_path = "../data" + csv_path # Data path for csv file
        spark_df = spark.read.csv(data_path, inferSchema = True, header=True) # checking the csv file
        spark_df = spark_df.withColumn('Title', regexp_replace('Title', '"', ''))
        # Topic Modelling on Title (Potentially do it on description if possible)
        node = "Title"
        # Get title data, filter out empty nodes
        title_data = spark_df.select(node).filter(functions.col(node).isNotNull())

        df2 = spark_df.select(countDistinct("Subreddit"))
        topic_num = df2.first()[0]

        tokenizer = Tokenizer(inputCol="Title", outputCol="words")
        tokenized = tokenizer.transform(spark_df)
        remover = StopWordsRemover(stopWords=stopwords.words('english'), inputCol="words", outputCol="filtered")
        result = remover.transform(tokenized)
        # result.select("filtered").show()

        cv = CountVectorizer(inputCol="filtered", outputCol="features")
        cvModel = cv.fit(result)
        cvResult = cvModel.transform(result)

        lda = LDA(maxIter=20, k = 10)
        ldaModel = lda.fit(cvResult)
        transformed = ldaModel.transform(cvResult).select("topicDistribution")
        #transformed.show(truncate=False)

        vocab = cvModel.vocabulary
        topics = ldaModel.describeTopics()
        topics_rdd = topics.rdd

        topics_words = topics_rdd\
               .map(lambda row: row['termIndices'])\
               .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
               .collect()
        topic_weights = topics_rdd\
               .map(lambda row: row['termWeights'])\
               .collect()

        file_path = "../processed_data" + csv_path
        if not os.path.exists("../processed_data/" + year):
            os.makedirs("../processed_data/" + year)
        with open(file_path, 'w') as file:
            header = ["term", "probability", "topic"]
            writer = csv.writer(file)
            writer.writerow(header)
            for idx, topic in enumerate(topics_words):
                i = 0
                for word in topic:
                    data = [word, topic_weights[idx][i], idx]
                    writer.writerow(data)
                    i = i+1

        json_df = pd.read_csv(file_path)
        json_df.to_json("../processed_data" + json_path, orient = "table")
        
        dist = ldaModel.transform(cvResult)

        ith = udf(ith_, DoubleType())
        df = dist.select(["Title"] + [ith("topicDistribution", lit(i)).alias('topic_'+str(i)) for i in range(10)] )

        df_p = dist.select('topicDistribution').toPandas()
        df_p1 = df_p.topicDistribution.apply(lambda x:np.array(x))
        df_p2 = pd.DataFrame(df_p1.tolist()).apply(lambda x:x.argmax(),axis=1)
        df_p3 = df_p2.reset_index()
        df_p3.columns = ['doc','topic']
        df2_p = dist.select('Title').toPandas()
        #print(df_p3)
        final_df = pd.concat([df2_p, df_p3], axis=1)
        topic_path = "../document_topics" + csv_path
        if not os.path.exists("../document_topics/" + year):
            os.makedirs("../document_topics/" + year)
        final_df.to_csv(topic_path, index=False)

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/alirahman/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


22/12/12 01:18:04 WARN DAGScheduler: Broadcasting large task binary with size 1930.4 KiB
22/12/12 01:18:08 WARN DAGScheduler: Broadcasting large task binary with size 1783.6 KiB
22/12/12 01:18:12 WARN DAGScheduler: Broadcasting large task binary with size 1930.8 KiB
22/12/12 01:18:16 WARN DAGScheduler: Broadcasting large task binary with size 1839.5 KiB
22/12/12 01:18:20 WARN DAGScheduler: Broadcasting large task binary with size 1868.5 KiB
22/12/12 01:18:24 WARN DAGScheduler: Broadcasting large task binary with size 1826.0 KiB
22/12/12 01:18:27 WARN DAGScheduler: Broadcasting large task binary with size 1844.7 KiB
22/12/12 01:18:31 WARN DAGScheduler: Broadcasting large task binary with size 1815.6 KiB
22/12/12 01:18:35 WARN DAGScheduler: Broadcasting large task binary with size 1776.5 KiB
22/12/12 01:18:39 WARN DAGScheduler: Broadcasting large task binary with size 1795.8 KiB
22/12/12 01:18:43 WARN DAGScheduler: Broadcasting large task binary with size 1713.9 KiB
22/12/12 01:18:47 WAR

[Stage 9510:>                                                       (0 + 1) / 1]                                                                                

22/12/12 01:22:01 WARN DAGScheduler: Broadcasting large task binary with size 1595.6 KiB
22/12/12 01:22:04 WARN DAGScheduler: Broadcasting large task binary with size 1653.2 KiB
22/12/12 01:22:08 WARN DAGScheduler: Broadcasting large task binary with size 1677.6 KiB
22/12/12 01:22:12 WARN DAGScheduler: Broadcasting large task binary with size 1659.2 KiB
22/12/12 01:22:16 WARN DAGScheduler: Broadcasting large task binary with size 1644.8 KiB
22/12/12 01:22:19 WARN DAGScheduler: Broadcasting large task binary with size 1619.6 KiB
22/12/12 01:22:23 WARN DAGScheduler: Broadcasting large task binary with size 1684.6 KiB
22/12/12 01:22:27 WARN DAGScheduler: Broadcasting large task binary with size 1680.6 KiB
22/12/12 01:22:31 WARN DAGScheduler: Broadcasting large task binary with size 1641.4 KiB
22/12/12 01:22:34 WARN DAGScheduler: Broadcasting large task binary with size 1684.7 KiB
22/12/12 01:22:38 WARN DAGScheduler: Broadcasting large task binary with size 1692.8 KiB
22/12/12 01:22:42 WAR

[Stage 10902:>                                                      (0 + 1) / 1]                                                                                

22/12/12 01:23:30 WARN DAGScheduler: Broadcasting large task binary with size 1565.9 KiB
22/12/12 01:23:34 WARN DAGScheduler: Broadcasting large task binary with size 1619.2 KiB
22/12/12 01:23:37 WARN DAGScheduler: Broadcasting large task binary with size 1584.3 KiB
22/12/12 01:23:41 WARN DAGScheduler: Broadcasting large task binary with size 1574.9 KiB
22/12/12 01:23:45 WARN DAGScheduler: Broadcasting large task binary with size 1608.3 KiB
22/12/12 01:23:48 WARN DAGScheduler: Broadcasting large task binary with size 1558.4 KiB
22/12/12 01:23:52 WARN DAGScheduler: Broadcasting large task binary with size 1596.1 KiB
22/12/12 01:23:56 WARN DAGScheduler: Broadcasting large task binary with size 1635.2 KiB
22/12/12 01:23:59 WARN DAGScheduler: Broadcasting large task binary with size 1475.8 KiB
22/12/12 01:24:03 WARN DAGScheduler: Broadcasting large task binary with size 1564.4 KiB
22/12/12 01:24:06 WARN DAGScheduler: Broadcasting large task binary with size 1541.1 KiB
22/12/12 01:24:10 WAR