In [None]:
import os
import csv
import glob
import pandas as pd
from pathlib import Path
from scipy.stats import fisher_exact
import little_mallet_wrapper
import seaborn
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows

# Path to MALLET
path_to_mallet = 'mallet-2.0.8/bin/mallet'
selected_stopwords_file_path = None

# --- Utility Functions ---

def tokenize_file(filepath):
    """
    Reads a text file and tokenizes its contents into a list of words.

    Args:
        filepath (str): Path to the file to be tokenized.

    Returns:
        list: A list of words (tokens) from the file.
    """
    with open(filepath, 'r', encoding='utf-8') as file:
        return file.read().split()

def list_txt_files(directory):
    """
    Lists all .txt files in a specified directory.

    Args:
        directory (str): Path to the directory.

    Returns:
        list: A sorted list of .txt file names in the directory.
    """
    return sorted([f for f in os.listdir(directory) if f.endswith('.txt')])

def list_csv_files(directory):
    """
    Recursively lists all .csv files in a directory and its subdirectories.

    Args:
        directory (str): Path to the directory.

    Returns:
        list: A list of paths to .csv files.
    """
    csv_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.csv'):
                csv_files.append(os.path.join(root, file))
    return csv_files

def choose_directory(prompt):
    """
    Prompts the user to select a directory from the current working directory or any subdirectory.

    Args:
        prompt (str): Message to display to the user.

    Returns:
        str: Path to the selected directory.
    """
    base_directory = os.getcwd()
    subdirectories = [os.path.join(base_directory, o) for o in os.listdir(base_directory) if os.path.isdir(os.path.join(base_directory, o))]
    subdirectories.append(base_directory)
    print(prompt)
    for i, subdir in enumerate(subdirectories):
        print(f"{i + 1}. {subdir}")
    choice = int(input("Enter your choice: "))
    return subdirectories[choice - 1]

def choose_files(files):
    """
    Prompts the user to select files from a list. The user can specify individual files, ranges, or patterns.

    Args:
        files (list): List of files available for selection.

    Returns:
        list: A sorted list of selected files.
    """
    print("Select target .txt files (enter numbers separated by commas or ranges, or starting text patterns):")
    for i, file in enumerate(files):
        print(f"{i + 1}. {file}")
    choices = input("Enter your choices: ").strip()
    selected_files = []
    parts = choices.split(',')
    for part in parts:
        part = part.strip()
        if '-' in part:
            start, end = map(int, part.split('-'))
            selected_files.extend(files[start - 1:end])
        elif part.isdigit():
            selected_files.append(files[int(part) - 1])
        else:
            selected_files.extend([file for file in files if file.startswith(part)])
    return sorted(set(selected_files))  # Remove duplicates and sort

def choose_csv_file(csv_files):
    """
    Prompts the user to select a .csv file from a list.

    Args:
        csv_files (list): List of .csv file paths available for selection.

    Returns:
        str: Path to the selected .csv file.
    """
    print("Select a .csv file to use as a stopwords file:")
    for i, file in enumerate(csv_files):
        print(f"{i + 1}. {file}")
    choice = int(input("Enter your choice: "))
    return csv_files[choice - 1]

def read_stopwords(filepath):
    """
    Reads a stopwords file and extracts the stopwords into a list.

    Args:
        filepath (str): Path to the stopwords file.

    Returns:
        list: A list of stopwords.
    """
    with open(filepath, 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        stopwords = []
        for row in reader:
            for word in row:
                stopwords.extend(word.split(','))
        return [word.strip() for word in stopwords]

def get_fishers(word, count_dict, rate_dict, alternative='greater'):
    """
    Calculates the p-value for Fisher's Exact Test for a given word.

    Args:
        word (str): The word to evaluate.
        count_dict (dict): A dictionary of word counts in the target document.
        rate_dict (dict): A dictionary of word rates across the corpus.
        alternative (str): The type of hypothesis test ('greater', 'less', or 'two-sided').

    Returns:
        float: The p-value from Fisher's Exact Test.
    """
    r = rate_dict.get(word, 0)
    total_words = sum(count_dict.values())
    observed = count_dict.get(word, 0)
    remainder = total_words - observed
    expected = round(r * total_words)
    complement_expected = total_words - expected
    p_value = fisher_exact([[observed, remainder], [expected, complement_expected]], alternative=alternative).pvalue
    return p_value

# --- Core Functions ---

def calculate_rate_dictionary(rate_files, rate_directory):
    """
    Calculates a rate dictionary of word frequencies across a set of rate files.

    Args:
        rate_files (list): List of rate files to process.
        rate_directory (str): Directory containing the rate files.

    Returns:
        dict: A dictionary where keys are words and values are their relative frequencies.
    """
    rate_word_counts = {}
    total_word_count = 0
    for file in rate_files:
        words = tokenize_file(os.path.join(rate_directory, file))
        for word in words:
            rate_word_counts[word] = rate_word_counts.get(word, 0) + 1
            total_word_count += 1
    rate_dict = {word: count / total_word_count for word, count in rate_word_counts.items()}
    return rate_dict

def prepare_training_data(target_files, target_directory, stopwords, rate_dict, alpha):
    """
    Filters and prepares training data by removing stopwords and words with p-values above the alpha threshold.

    Args:
        target_files (list): List of target files to process.
        target_directory (str): Directory containing the target files.
        stopwords (list): List of stopwords to exclude.
        rate_dict (dict): Rate dictionary for calculating p-values.
        alpha (float): Alpha threshold for Fisher's Exact Test.

    Returns:
        list: A list of preprocessed and filtered text documents.
    """
    filtered_data = []
    for file in target_files:
        words = tokenize_file(os.path.join(target_directory, file))
        filtered_words = [
            word for word in words if word not in stopwords and get_fishers(word, {word: words.count(word) for word in words}, rate_dict) < alpha
        ]
        filtered_data.append(' '.join(filtered_words))
    return filtered_data

def train_topic_model(training_data, num_topics, output_directory):
    """
    Trains a topic model using MALLET with the given training data.

    Args:
        training_data (list): List of preprocessed documents for training.
        num_topics (int): Number of topics to generate.
        output_directory (str): Directory to save the model output.

    Returns:
        list: A list of topics generated by the model.
    """
    Path(output_directory).mkdir(parents=True, exist_ok=True)
    little_mallet_wrapper.quick_train_topic_model(
        path_to_mallet, output_directory, num_topics, training_data
    )
    topics = little_mallet_wrapper.load_topic_keys(f"{output_directory}/mallet.topic_keys.{num_topics}")
    return topics

def save_results_to_excel(filename, topics, topic_distributions, target_files):
    """
    Saves topics and topic distributions to an Excel file with multiple sheets.

    Args:
        filename (str): Path to the output Excel file.
        topics (list): List of generated topics.
        topic_distributions (list): Topic distributions for each document.
        target_files (list): List of target file names.
    """
    workbook = Workbook()
    # Save topics
    sheet = workbook.active
    sheet.title = 'Topics'
    for topic_number, topic in enumerate(topics):
        sheet.append([f"Topic {topic_number}"] + topic)

    # Save topic distributions
    for file, distribution in zip(target_files, topic_distributions):
        sheet_name = Path(file).stem
        ws = workbook.create_sheet(title=sheet_name)
        for row in dataframe_to_rows(pd.DataFrame(distribution), index=False, header=True):
            ws.append(row)

    workbook.save(filename)

def display_success_message():
    """
    Displays a success message to the user upon completion of the notebook.
    """
    print("✅ The notebook has successfully completed processing!")

# --- Main Function ---

def main():
    """
    Main function to execute the integrated workflow for Fisher's Exact Test-based
    topic modeling with MALLET. Handles user input, file selection, preprocessing,
    model training, and result output.
    """
    # Step 1: Select stopwords file
    stopwords_file = choose_csv_file(list_csv_files(os.getcwd()))
    stopwords = read_stopwords(stopwords_file)

    # Step 2: Select rate dictionary files
    rate_directory = choose_directory("Select the directory for rate dictionary files:")
    rate_files = choose_files(list_txt_files(rate_directory))
    rate_dict = calculate_rate_dictionary(rate_files, rate_directory)

    # Step 3: Select target files
    target_directory = choose_directory("Select the directory for target files:")
    target_files = choose_files(list_txt_files(target_directory))

    # Step 4: Set alpha threshold
    alpha = float(input("Enter the alpha threshold for Fisher's Exact Test: "))

    # Step 5: Prepare training data
    training_data = prepare_training_data(target_files, target_directory, stopwords, rate_dict, alpha)

    # Step 6: Specify number of topics
    num_topics = int(input("Enter the number of topics to generate: "))

    # Step 7: Train topic model
    output_directory = input("Enter a name for the output subfolder: ").strip()
    topics = train_topic_model(training_data, num_topics, output_directory)

    # Step 8: Save results to Excel
    output_excel = f"{output_directory}/topic_model_results.xlsx"
    save_results_to_excel(output_excel, topics, training_data, target_files)

    # Step 9: Display success message
    display_success_message()

# --- Run the Notebook ---

if __name__ == "__main__":
    main()