## Imports

In [None]:
import re
import csv
import time
import itertools

from tika import parser

# Pysaprk
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer as sp_cv
from pyspark.ml.clustering import LDA, DistributedLDAModel

# Gensim
import gensim
from gensim.models import Phrases
from gensim.corpora import Dictionary
from gensim.utils import simple_preprocess
from gensim.parsing.preprocessing import STOPWORDS


# NLTK
from nltk.stem import WordNetLemmatizer, SnowballStemmer
from nltk.stem.porter import *
stemmer = SnowballStemmer("english")

In [None]:
# Directory to store
directory = '1987'

Initialize spark

In [None]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Annotation Platform") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark


## Data Preparation

In [None]:
spark = init_spark()

In [None]:
# Data should be in the folder data
filesRDD = spark.sparkContext.binaryFiles('data/*.pdf')

In [None]:
def process_files(filesRDD):
    def tikanize_file(filename):
        file_data = parser.from_file(filename)
        text = file_data['content']
        return text.lower()
    
    return filesRDD.map(lambda x: tikanize_file(x[0][5:]))


## Pre-process Data

In [None]:
filesContentRDD = process_files(filesRDD)

In [None]:
number_of_documents = filesRDD.count()
print('The number of files: ', number_of_documents)

In [None]:
def clean_data(filesRDD):
    
    # Extract and clean up text
    def extract_and_clean_up(text):
            end = min(text.find('acknowledgement'), text.find('references'), )
            text = text[text.find('abstract')+9:end]\
            .replace('-\n\n', '').replace('\n', ' ').replace('\'', '')
            return re.sub(r'[!@#$()©=\+\*:\[\]/0-9]{}', '', text) 

    # Tokenize
    def tokenize(text):
        result=[]
        for token in gensim.utils.simple_preprocess(text, deacc=True) :
            if token not in STOPWORDS and len(token) > 3:
                result.append(token)

        return result
    
    return filesRDD.map(lambda x: tokenize(extract_and_clean_up(x)))

In [None]:
cleanFilesRDD = clean_data(filesContentRDD)

### Create bigrams

In [None]:
data_words = cleanFilesRDD.collect()

# Build the bigram and trigram models
bigram = gensim.models.Phrases(data_words)

# Faster way to get a sentence clubbed as a trigram/bigram
bigram_mod = gensim.models.phrases.Phraser(bigram)

In [None]:
bingramDataRDD = cleanFilesRDD.map(lambda x : bigram_mod[x])

In [None]:
def lemmatize_data(dataRDD):
    def lemmatize_stemming(text):
        return stemmer.stem(WordNetLemmatizer().lemmatize(text))
    
    return dataRDD.map(lambda x : [lemmatize_stemming(word) for word in x])

In [None]:
dataRDD = lemmatize_data(bingramDataRDD)

Transform DataRDD to dataDF

In [None]:
dataDF = dataRDD.zipWithIndex().map(lambda x: (x[1], x[0])).toDF(['index', 'words'])

### Count the frequency

In [None]:
# Create the model
# set minDF to 1 if the data folder contains just 1 file
cv = sp_cv(inputCol="words", outputCol="features", minDF=2)

In [None]:
# Fit the model
modelCV = cv.fit(dataDF)

In [None]:
# Size
initial_vocab = modelCV.vocabulary

In [None]:
# Size of the initial vocab
len(initial_vocab)

### Remove 5% of the words

In [None]:
# threshold 
threshold = len(initial_vocab) / 40

In [None]:
words_to_remove = modelCV.transform(dataDF)\
                .select('features').rdd\
                .flatMap(lambda x: list(zip(x[0].indices, x[0].values)))\
                .reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1]).zipWithIndex()\
                .filter(lambda x: threshold > x[1] or x[1] > len(initial_vocab) - threshold)\
                .map(lambda x: initial_vocab[x[0][0]]).collect()

In [None]:
# Size of the removed words
len(words_to_remove)

In [None]:
word_remover = StopWordsRemover(inputCol="words", outputCol="new_words", stopWords=words_to_remove)

In [None]:
clean_dataDF = word_remover.transform(dataDF).select(['index', 'new_words'])
clean_dataDF = clean_dataDF.select(col('index'), col("new_words").alias("words"))

In [None]:
# Fit the model
modelCV = cv.fit(clean_dataDF)

In [None]:
# Size
clean_vocab = modelCV.vocabulary

In [None]:
# Size of the clean final vocab
len(clean_vocab)

In [None]:
counter_vectorized_dataDF = modelCV.transform(clean_dataDF)

Select needed columns

In [None]:
ldaDataDF = counter_vectorized_dataDF.select(['index', 'features'])

## LDA 

## Search Grid

In [None]:
def search_grid(param_dict, dataDF, vocab, dataRDD):
    params_coherence_list = []
    count = 0
    params_list = itertools.product(*param_dict.values())
    for params in params_list: # (k, maxIter, optimizer, terms_per_topic)
        count = count + 1
        print('The number of iteration is ', count,'========>')
        lda = LDA(k=params[0], maxIter=params[1], optimizer=params[2]) # Create LDA model
        start = time.time()
        ldaModel = lda.fit(dataDF) # Fit the data
        end = time.time()
        selected = ldaModel.describeTopics(maxTermsPerTopic = params[3]) # Select number of terms per topic
        topics = selected.rdd.map(lambda x : (x[0], list(zip([vocab[i] for i in x[1]], x[2])))) # Get topics
        coherence_topics = topics.map(lambda x : [i[0] for i in x[1]]) # Prepare data t oget coherence score
        coherence_model_lda = gensim.models.CoherenceModel(topics=coherence_topics.collect(), texts=dataRDD.collect(), dictionary=Dictionary(dataRDD.collect()), coherence='c_v')
        coherence_lda = coherence_model_lda.get_coherence()
        params_coherence_list.append({'model':ldaModel, 'k': params[0], 'max_iteration': params[1], 'optimizer': params[2], 'terms_per_topic': params[3], 'time': end - start, 'coherence': coherence_lda})
    return params_coherence_list

In [None]:
k = [2, 3, 5, 10]
maxIter = [20, 30, 50] 
optimizer = ['em']
terms_per_topic = [5, 8]

vocab = modelCV.vocabulary

param_dict = {'k': k , 'maxIter' : maxIter, 'optimizer': optimizer, 'terms_per_topic': terms_per_topic}
param_coherence_list = search_grid(param_dict, ldaDataDF, clean_vocab, clean_dataDF.rdd.map(lambda x : x[1]))

In [None]:
# Number of words to process
print('Number of words to process ', clean_dataDF.rdd.flatMap(lambda x: x[1]).count())

In [None]:
for model in param_coherence_list:
    print(model)

In [None]:
def get_best_n_models(models_list, n):
    return sorted(models_list, key=lambda x:  -x.get('coherence'))[:n]

In [None]:
# Given the model and the number of words per topic, print the words in each topic
def print_topics(model, number_of_words_per_topic, vocab):
    selected = model.describeTopics(maxTermsPerTopic = number_of_words_per_topic)
    topics = selected.rdd.map(lambda x : (x[0], list(zip([vocab[i] for i in x[1]], x[2]))))
    for topic in topics.collect():
        print('===Topic====', topic[0])
        for word in topic[1]:
            print('{: <20} {}'.format(word[0],word[1]))
            
    

In [None]:
# Save the best n models
def save_models(models, path):
    for model in models:
        signature = str(model.get('k')) + '_' + str(model.get('max_iteration')) + '_' + str(model.get('terms_per_topic'))
        model.get('model').save(path + '/'+signature)
    print('All models have been saved in ', path)

In [None]:
# Get the best n models
best = get_best_n_models(param_coherence_list, 3)

In [None]:
for model in best:
    print(model)

In [None]:
# print best model
print_topics(best[1].get('model'), best[1].get('terms_per_topic'), clean_vocab)

In [None]:
# save best n models
#save_models(param_coherence_list, 'models/' + directory)

In [None]:
def store_to_csv(path, filename, models_info):
    cols = ['model','k', 'max_iteration', 'optimizer', 'terms_per_topic', 'time', 'coherence']
    try:
        with open(path+'/'+filename, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=cols)
            writer.writeheader()
            for data in models_info:
                writer.writerow(data)
    except IOError:
        print("I/O error")

In [None]:
#store_to_csv('models/' + directory, 'data.csv', param_coherence_list)

### Optimization: Remove some words from the seen topics

In [None]:
# Create the model
opt_cv = sp_cv(inputCol="words", outputCol="features")

In [None]:
irrelevant_words = []

In [None]:
optimizing_remover = StopWordsRemover(inputCol="words", outputCol="new_words", stopWords=irrelevant_words)

In [None]:
optimized_dataDF = optimizing_remover.transform(clean_dataDF).select(['index', 'new_words'])
optimized_dataDF = optimized_dataDF.select(col('index'), col("new_words").alias("words"))

In [None]:
# Fit the model
modelCV = opt_cv.fit(optimized_dataDF)

In [None]:
# Size
optimized_vocab = modelCV.vocabulary

In [None]:
optimized_cv_dataDF = modelCV.transform(optimized_dataDF)

In [None]:
optimized_ldaDataDF = optimized_cv_dataDF.select(['index', 'features'])

In [None]:
k = list(set([model.get('k') for model in best])) # 6
maxIter = list(set([model.get('max_iteration') for model in best])) # 6
optimizer = ['em']
terms_per_topic = list(set([model.get('terms_per_topic') for model in best])) # 9

param_dict = {'k': k , 'maxIter' : maxIter, 'optimizer': optimizer, 'terms_per_topic': terms_per_topic}
opt_param_coherence_list = search_grid(param_dict, optimized_ldaDataDF, optimized_vocab, optimized_dataDF.rdd.map(lambda x : x[1]))

In [None]:
# Number of words to process
print('Number of words to process ', optimized_dataDF.rdd.flatMap(lambda x: x[1]).count())

In [None]:
for i in opt_param_coherence_list:
    print(i)

In [None]:
# Get the best n models
opt_best = get_best_n_models(opt_param_coherence_list, 1)

In [None]:
opt_best

In [None]:
# print best model
print_topics(opt_best[0].get('model'), opt_best[0].get('terms_per_topic'), optimized_vocab)

In [None]:
# save best n models
save_models(opt_best, 'models/' + directory + '/best')

In [None]:
store_to_csv('models/' + directory, 'best_data.csv', opt_param_coherence_list)

### END