In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType, FloatType, ArrayType
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
import re
from pyspark.ml.clustering import LDA
import string
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, IDF
import matplotlib.pyplot as plt
import numpy as np
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator

In [3]:
spark = SparkSession.builder.master("local").appName("Topic Modelling").getOrCreate()
#Create a spark context in session
sc = spark.sparkContext
#Reduce output by only showing me the errors
sc.setLogLevel("ERROR")
#SQL Context
sqlContext = SQLContext(sc)

In [4]:
#Setting a schema with Paper ID and Text
schema = StructType([
    StructField("Id", StringType(), True),
    StructField("Text", StringType(), True)]
)

In [5]:
#Reading the Dataframe with a Pre-defined Schema
df = spark.read.csv("final_df.csv", schema = schema, header = True, sep = ",", multiLine=True)

In [None]:
#Make text lowercase, remove punctuations, remove text in square brackets and also remove and strip spaces
def clean_text(text):
    text = text.lower()
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub('[\W_]+', ' ', text)
    text = re.sub(r'[%s]' % re.escape(string.punctuation), '', text)
    text = re.sub(' +',' ',text.strip())
    return text

#Using User Defined Functions in Spark
clean_udf = udf(lambda z: clean_text(z), StringType())
df = df.withColumn("Text", clean_udf('Text'))

In [None]:
stop_words = set(['doi', 'preprint', 'copyright', 'peer',\
'reviewed', 'org', 'https', 'et', 'al', 'author', 'figure', \
    'rights', 'reserved', 'permission', 'used', 'using', 'biorxiv',\
'medrxiv', 'license', 'fig', 'al', 'Elsevier', 'PMC', 'CZI', 'www',"i", "me", "my", "myself", "we", "our",\
 "ours", "ourselves", "you", "your", "yours",\
 "yourself", "yourselves", "he", "him", "his", \
 "himself", "she", "her", "hers", "herself", "it", \
 "its", "itself", "they", "them", "their", "theirs", \
 "themselves", "what", "which", "who", "whom", "this", \
 "that", "these", "those", "am", "is", "are", "was", "were",\
 "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the",\
 "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", \
 "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up",\
 "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when",\
 "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor",\
 "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now"])

stopwords = [clean_text(word) for word in stop_words]

In [None]:
#Removing Stop Words using UDF's and a manual set of stopwords
def stop_word_removal(text):
    text = ' '.join([word for word in text.split() if word not in stopwords and len(word) > 2])
    return text

stop_words_udf = udf(lambda z: stop_word_removal(z), StringType())
df = df.withColumn("Text", stop_words_udf('Text'))

In [6]:
#Limiting upto 100,000 entries out of ~220,000
df = df.limit(100000)

In [None]:
#Sorting the count of all words in the documents in descending order 
#To make a graph having the top most 15 occuring terms in the corpus

count_rdd = df.select('Text').rdd.flatMap(lambda x: x).flatMap(lambda x: x.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
count_rdd = count_rdd.map(lambda x: (x[1],x[0])).sortByKey(ascending=False).map(lambda x: ((x[1],x[0])))
top_15_words = count_rdd.take(15)
words,count = map(list,zip(*top_15_words))

plt.figure(figsize=(15,10))
plt.bar(words,count,color='navy')
plt.xlabel("Words") 
plt.ylabel("Count of the Words") 
plt.title("Top 15 most occuring words") 
plt.show() 


In [None]:
#Distribution of word counts across all documents

def doc_word_counts(text):
    text = text.split()
    return len(text)
doc_len_udf = udf(lambda z: doc_word_counts(z), IntegerType())
df = df.withColumn("DocLen", doc_len_udf('Text'))
answer = df.select("DocLen").collect()
lengths = [row.DocLen for row in answer]
lengths = sorted(lengths,reverse=True)

plt.figure(figsize=(10,10))
plt.hist(lengths, bins = 1000, color='navy')
plt.text(750, 700, "Mean   : " + str(round(np.mean(lengths))))
plt.text(750, 650, "Median : " + str(round(np.median(lengths))))
plt.text(750, 600, "Stdev   : " + str(round(np.std(lengths))))
plt.text(750, 550, "1%ile    : " + str(round(np.quantile(lengths, q=0.01))))
plt.text(750, 500, "99%ile  : " + str(round(np.quantile(lengths, q=0.99))))

plt.gca().set(xlim=(0, 1000), xlabel='Document Word Count', ylabel='Number of Documents')
plt.tick_params(size=16)
plt.xticks(np.linspace(0,1000,9))
plt.title('Distribution of Document Word Counts', fontdict=dict(size=22))
plt.show()

In [7]:
#Creating a Pipeline which has a Tokenizer, a Count Vectorizer and an IDF function
tokenizer = Tokenizer(inputCol="Text", outputCol="Text_Tokens")
cv = CountVectorizer(inputCol="Text_Tokens", outputCol="Vectors", maxDF = 0.9, minTF = 2, minDF = 0.005, vocabSize=10000)
idf = IDF(inputCol="Vectors", outputCol="IDFVectors")

#Setting up the Pipeline with the functions
pipeline = Pipeline(stages=[tokenizer,cv, idf])
model = pipeline.fit(df)
df_final = model.transform(df)

In [None]:
#Finding Perplexity and LogLikelihood for all values of k, for finding the optimum value of number of topics

k_trial = [10,25,30,35,40,50,75]
ll_final = []
lp_final = []
for k in k_trial:
    lda = LDA(featuresCol = 'IDFVectors', k = k)
    model_lda = lda.fit(df_final)
    ll = model_lda.logLikelihood(df_final)
    lp = model_lda.logPerplexity(df_final)
    print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
    print("The upper bound on perplexity: " + str(lp))
    ll_final.append(ll)
    lp_final.append(lp)
    
fig, axes = plt.subplots(1, 2, figsize=(10,10))
axes[0].plot(ll_final)
axes[1].plot(lp_final) 

In [None]:
#Fitting LDA for the best value of k
lda = LDA(featuresCol = 'IDFVectors', k = 25)
model_lda = lda.fit(df_final)

In [None]:
#Collecting all the topics
topics = model_lda.describeTopics()

In [None]:
#Function to find the term in every topic from the vocabulary
def termsIdx2Term(termIndices):
    return [vocab[int(index)] for index in termIndices]

In [None]:
#Collecting the vocabulary and all the topics with the terms
vectorizer = model.stages[1]     
vocab = vectorizer.vocabulary
termsIdx2Term = udf(termsIdx2Term, ArrayType(StringType()))
topics = topics.withColumn('Terms',termsIdx2Term('termIndices'))
terms = topics.select('Terms').collect()
terms = [row.Terms for row in terms]

In [None]:
#Plotting the wordcloud for every topic

fig, axes = plt.subplots(12, 2, figsize=(30,30))


cloud = WordCloud(background_color='white',
                  width=2500,
                  height=1800,
                  prefer_horizontal=1.0)


for i, ax in enumerate(axes.flatten()):
    fig.add_subplot(ax)
    text = ' '.join(terms[i])
    wordcloud = cloud.generate(text)
    plt.gca().imshow(wordcloud)
    plt.gca().set_title('Topic ' + str(i), fontdict=dict(size=16))
    plt.gca().axis('off')
    
plt.subplots_adjust(wspace=0, hspace=0)
plt.axis('off')
plt.margins(x=0, y=0)
plt.tight_layout()
plt.show()

In [None]:
#Collecting the Word Count for every term in the set of topics
word_counts = count_rdd.collect()
final_counts = []
for i in range(len(terms)):
    final_counts.append([item for item in word_counts if item[0] in terms[i]])

In [None]:
#Finding the topic for every document in the given DataFrame which is the topic which has highest distribution in the list
getMainTopic = udf(lambda l: int(np.argmax([float(x) for x in l])), IntegerType())
 
countTopDocs = (model_lda.transform(df_final).select(getMainTopic("topicDistribution").alias("idxMainTopic"))
                .groupBy("idxMainTopic").count().sort("idxMainTopic"))

In [None]:
#Plotting the number of documents for every topic

countTopDocs = countTopDocs.withColumnRenamed('count','counts')
count_topics = countTopDocs.select('counts').collect()
count_topics = [row.counts for row in count_topics]
topics = countTopDocs.select('idxMainTopic').collect()
topics = [row.idxMainTopic for row in topics]

#Sorting the two lists to get count from maximum to minimum count of topic vs document counts
sorting = [(a,b) for a,b in zip(topics,count_topics)]
sorting = sorted(sorting,key=lambda x: x[1], reverse=True)
topics = list(zip(*sorting))[0]
count_topics = list(zip(*sorting))[1]

plt.figure(figsize = (10,10))
plt.bar(range(len(count_topics)),count_topics,color='navy', align = 'center')
plt.xlabel("Topics") 
plt.ylabel("Count of the Topics") 
plt.title("Number of Documents every topic") 
plt.xticks(ticks=np.arange(0,24,1), labels = topics)
plt.show()

In [None]:
#Plotting the word counts for every term throughout every topic in the corpus

fig, axes = plt.subplots(12, 2, figsize=(70,70), sharex=False, sharey=False)
plt.rcParams.update({'font.size': 38})

for i, ax in enumerate(axes.flatten()):
    words = [item[0] for item in final_counts[i]]
    counts = [item[1] for item in final_counts[i]]
    fig.add_subplot(ax)
    ax.bar(words,counts)
    plt.gca().set_title('Topic ' + str(i), fontdict=dict(size=32))

plt.subplots_adjust(wspace=0, hspace=0)
plt.margins(x=0, y=0)
plt.tight_layout()
plt.show()