In [3]:
import numpy as np
import pandas as pd
import os
import shutil
import json
import gensim
from gensim import corpora, models
import spacy
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from tqdm import tqdm

from collections import defaultdict
from datetime import datetime
import math
from operator import itemgetter
import os
import random
import re

In [5]:
def load_processed_text(file_name):
    '''
    load processed documents back
    '''
    text_list = []

    with open(file_name, 'r', encoding='utf-8') as file:
        for line in file:
            text_list.append(line.strip()) # no need to split for mallet

    return text_list

In [4]:
def generate_random_seeds(num_seeds=10):
    '''
    generate n random seeds
    '''
    seeds = np.random.randint(low=0, high=10000, size=num_seeds).tolist()
        
    return seeds

In [None]:
def load_topic_words(topic_keys_file):
    '''
    load top topic words generated by mallet
    '''
    topic_words = []
    with open(topic_keys_file, 'r') as file:
        for line in file:
            parts = line.strip().split()
            words = parts[2:]  # Skip the topic ID and weight
            topic_words.append(words)
    return topic_words

The 2 functions belowa are inspired by the little-mallet-wrapper - https://github.com/maria-antoniak/little-mallet-wrapper/tree/master

In [None]:
def import_data(path_to_mallet,
                path_to_training_data,
                path_to_formatted_training_data,
                training_data,
                training_ids=None,
                use_pipe_from=None):
    '''
    import data to mallet
    '''

    training_data_file = open(path_to_training_data, 'w')
    for i, d in enumerate(training_data):
        if training_ids:
            training_data_file.write(str(training_ids[i]) + ' no_label ' + d + '\n')
        else:
            training_data_file.write(str(i) + ' no_label ' + d + '\n')
    training_data_file.close()

    if use_pipe_from:
        print('Importing data using pipe...')
        os.system(path_to_mallet + ' import-file --input "' + path_to_training_data + '"' 
                                             + ' --output "' + path_to_formatted_training_data + '"' \
                                             + ' --keep-sequence' \
                                             + ' --use-pipe-from "' + use_pipe_from + '"'
                                             + ' --preserve-case')
        
    else:
        print('Importing data...')
        os.system(path_to_mallet + ' import-file --input "' + path_to_training_data + '"' 
                                             + ' --output "' + path_to_formatted_training_data + '"' \
                                             + ' --keep-sequence'
                                             + ' --preserve-case')

    print('Complete')

In [None]:
def train_topic_model(path_to_mallet,
                      path_to_formatted_training_data,
                      path_to_topic_keys,
                      path_to_topic_distributions,
                      num_topics,
                      interval,
                      burnin,
                      random_state):
    '''
    train LDA model using mallet
    '''

#     print('Training topic model...')
    os.system(path_to_mallet + ' train-topics --input "' + path_to_formatted_training_data + '"' \
                                          + ' --num-topics ' + str(num_topics) \
                                          + ' --num-top-words ' + str(50) \
                                          + ' --random-seed ' + str(random_state) \
                                          + ' --output-topic-keys "' + path_to_topic_keys + '"' \
                                          + ' --output-doc-topics "' + path_to_topic_distributions + '"' \
                                          + ' --optimize-interval ' + str(interval) \
                                          + ' --optimize-burn-in ' + str(burnin))

#     print('Complete')

In [None]:
def prepare_mallet_tests(seed, topic_words):
    '''
    generate intrusion tests for a lda model
    '''
    num_topics = len(topic_words)  # Determine the number of topics dynamically
    intrusion_tests = {}
        
    # Create Intrusion Tests
    for i, words in enumerate(topic_words):
        
        # Randomly sample 5 words from the top 10 words of the topic
        np.random.seed(seed)
        sample_words = np.random.choice(words[:10], 5, replace=False).tolist()
        
        # Prepare list of other topics to explore for intruder word
        other_topics = list(range(num_topics))
        other_topics.remove(i)  # Remove the current topic

        # Ensure the intruder word is not in the top 50 words of the current topic
        intruder_word = None
        while other_topics and intruder_word is None:
            np.random.seed(seed)
            intruder_topic = np.random.choice(other_topics)
            other_topics.remove(intruder_topic)  # Remove the explored topic
            
            # Attempt to find a suitable intruder word
            for word in topic_words[intruder_topic]:
                if word not in words:
                    intruder_word = word
                    break

        # Save the sample and the intruder into a dictionary
        intrusion_tests[i] = {'top 5 sample': sample_words,
                              'intruder': intruder_word}
                              
    return intrusion_tests

In [None]:
def tune_lda_mallet(path_to_mallet, path_to_formatted_training_data, texts, id2word, num_topics, seeds, grid):
    '''
    tune lda for each num_topics and seeds using grid search
    '''
    results = []

    total_iter = len(num_topics)*len(seeds)*len(grid['interval'])*len(grid['burnin'])
    progress_bar = tqdm(total=total_iter, desc="Total progress")
    
    for k in num_topics:
        
        for seed in seeds:
            
            # tune lda
            best_score = float('-inf')
            best_interval = None
            best_burnin = None
            best_topic_words = None
            
            for interval in grid['interval']:
                for burnin in grid['burnin']:
                    path_to_topic_keys           = output_directory_path + '/mallet.topic_keys.' + str(k) + '_' + str(seed) + '_' + str(interval) + '_' + str(burnin) + '.txt'
                    path_to_topic_distributions  = output_directory_path + '/mallet.topic_distributions.' + str(k) + '_' + str(seed) + '_' + str(interval) + '_' + str(burnin) + '.txt'

                    train_topic_model(
                        path_to_mallet,
                        path_to_formatted_training_data,
                        path_to_topic_keys,
                        path_to_topic_distributions,
                        num_topics = k,
                        interval = interval,
                        burnin = burnin,
                        random_state = seed)

                    topic_words = load_topic_words(path_to_topic_keys)
                    coherence_model = models.CoherenceModel(topics=topic_words, texts=texts, dictionary=id2word, coherence='c_v')
                    score = coherence_model.get_coherence()

                    if score > best_score:
                        best_score = score
                        best_interval = interval
                        best_burnin = burnin
                        best_topic_words = topic_words

                    progress_bar.update(1)
                 
            # make tests
            tests = prepare_mallet_tests(seed, best_topic_words)
            
            results.append({
                    'num_topics': k,
                    'seed': seed,
                    'score': best_score,
                    'interval': best_interval,
                    'burnin': best_burnin,
                    'tests': tests})
    
    progress_bar.close()
   
    df = pd.DataFrame(results)
    return df

In [None]:
def df_to_csv(df, file_path):
    '''saves df to local csv'''
    df.to_csv(file_path, index=False)