In [None]:
"""
Advanced Text Processing System
-------------------------------
A comprehensive NLP system that provides:
- N-gram language modeling
- Text generation
- Spelling correction with multiple methods
- Auto-completion
- Web interface using Gradio

Authors: El Guelta Mohamad Saber , El Hadifi Soukaina
Date: April 13, 2025
"""

import re
import numpy as np
import random
import multiprocessing
import tempfile
import os
import gradio as gr
from collections import defaultdict, Counter
from functools import partial


class TextProcessor:
    """Main class handling all text processing functionality."""

    def __init__(self, corpus_path, keyboard_graph_path, ngram_size=2, smoothing_k=0.1, min_frequency=2):
        """
        Initialize the text processor with necessary parameters and paths.

        Args:
            corpus_path: Path to the text corpus
            keyboard_graph_path: Path to the keyboard layout graph
            ngram_size: Size of n-grams to use (default: 2 for bigrams)
            smoothing_k: Smoothing parameter for add-k smoothing (default: 0.1)
            min_frequency: Minimum word frequency for dictionary inclusion (default: 2)
        """
        self.ngram_size = ngram_size
        self.smoothing_k = smoothing_k

        print("Loading data...")
        self.dictionary, self.word_frequencies = self._create_dictionary_from_corpus(corpus_path, min_frequency)
        self.keyboard_graph = self._load_keyboard_graph(keyboard_graph_path)

        print("Training the n-gram model...")
        sample_file = self._extract_sample_for_training(corpus_path)
        self.ngram_counts = self._train(sample_file, self.dictionary, ngram_size, smoothing_k)
        os.unlink(sample_file)  # Cleanup

        print(f"Model ready! {len(self.ngram_counts)} contexts in the model.")

    # ========== DATA PREPARATION METHODS ==========

    def _preprocess_text(self, text):
        """
        Clean and tokenize text.

        Args:
            text: Input text string

        Returns:
            List of tokens
        """
        text = text.lower()
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
        return text.split()

    def _create_dictionary_from_corpus(self, corpus_file, min_frequency=2):
        """
        Create a dictionary from frequent words in the corpus.

        Args:
            corpus_file: Path to corpus file
            min_frequency: Minimum count for a word to be included

        Returns:
            A tuple with (dictionary set, word frequency counter)
        """
        word_counts = Counter()

        with open(corpus_file, 'r', encoding='utf-8') as f:
            for line in f:
                words = self._preprocess_text(line)
                word_counts.update(words)

        # Only keep words that appear at least min_frequency times
        dictionary = {word for word, count in word_counts.items() if count >= min_frequency}
        return dictionary, word_counts

    def _extract_sample_for_training(self, corpus_file, sample_size=100000):
        """
        Extract a sample from corpus for faster training.

        Args:
            corpus_file: Path to corpus file
            sample_size: Number of characters to sample

        Returns:
            Path to temporary file with sample
        """
        with open(corpus_file, 'r', encoding='utf-8') as f:
            sample_text = f.read(sample_size)

        temp_sample = tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8')
        temp_sample.write(sample_text)
        temp_sample.close()
        return temp_sample.name

    def _prepare_data(self, infile, vocab):
        """
        Read and prepare data for training.

        Args:
            infile: Path to input file
            vocab: Dictionary of valid words

        Returns:
            List of processed tokens
        """
        with open(infile, 'r', encoding='utf-8') as f:
            tokens = self._preprocess_text(f.read())
        tokens = [w if w in vocab else '<UNK>' for w in tokens]
        return ['<s>'] + tokens + ['</s>']

    # ========== LANGUAGE MODEL METHODS ==========

    def _train_worker(self, tokens, start, end, ngram_size):
        """
        Worker function for parallel n-gram counting.

        Args:
            tokens: List of tokens
            start: Start index
            end: End index
            ngram_size: Size of n-grams

        Returns:
            Tuple of (ngram_counts, total_counts)
        """
        local_ngram_counts = defaultdict(Counter)
        local_total_counts = defaultdict(int)
        for i in range(start, end - ngram_size + 1):
            context = tuple(tokens[i:i + ngram_size - 1])
            word = tokens[i + ngram_size - 1]
            local_ngram_counts[context][word] += 1
            local_total_counts[context] += 1
        return local_ngram_counts, local_total_counts

    def _train(self, infile, vocab, ngram_size=2, smoothing_k=0.1):
        """
        Train the n-gram language model with parallel processing.

        Args:
            infile: Path to input file
            vocab: Dictionary of valid words
            ngram_size: Size of n-grams
            smoothing_k: Parameter for add-k smoothing

        Returns:
            Dictionary of n-gram probabilities
        """
        tokens = self._prepare_data(infile, vocab)
        n_workers = multiprocessing.cpu_count()
        chunk_size = len(tokens) // n_workers
        args = [(tokens, i, min(i + chunk_size, len(tokens)), ngram_size) for i in range(0, len(tokens), chunk_size)]

        with multiprocessing.Pool(n_workers) as pool:
            results = pool.starmap(self._train_worker, args)

        ngram_counts = defaultdict(Counter)
        total_counts = defaultdict(int)
        vocab_size = len(vocab) + 1  # To include <UNK>

        for local_ngram_counts, local_total_counts in results:
            for context, words in local_ngram_counts.items():
                ngram_counts[context].update(words)
                total_counts[context] += local_total_counts[context]

        # Apply smoothing
        for context in ngram_counts:
            total = total_counts[context] + smoothing_k * vocab_size
            ngram_counts[context] = {word: (count + smoothing_k) / total
                                     for word, count in ngram_counts[context].items()}
        return ngram_counts

    def predict_sentence_probability(self, sentence):
        """
        Calculate probability of a sentence given the model.

        Args:
            sentence: Input sentence

        Returns:
            Log probability of the sentence
        """
        tokens = ['<s>'] + self._preprocess_text(sentence) + ['</s>']
        prob = 0
        for i in range(len(tokens) - self.ngram_size + 1):
            context = tuple(tokens[i:i + self.ngram_size - 1])
            word = tokens[i + self.ngram_size - 1]
            prob += self.ngram_counts.get(context, {}).get(word, np.log(1e-10))
        return prob

    def test_perplexity(self, test_file):
        """
        Calculate perplexity of a test corpus.

        Args:
            test_file: Path to test file

        Returns:
            Perplexity score
        """
        tokens = self._prepare_data(test_file, self.dictionary)
        sentence_probs = [self.predict_sentence_probability(' '.join(tokens[i:i+10]))
                         for i in range(0, len(tokens), 10)]
        return np.exp(-np.mean(sentence_probs) / 10)

    # ========== TEXT GENERATION METHODS ==========

    def generate_text(self, max_length=20):
        """
        Generate text using the trained model.

        Args:
            max_length: Maximum length of generated text in words

        Returns:
            Generated text string
        """
        sentence = ['<s>']
        while len(sentence) < max_length and sentence[-1] != '</s>':
            context = tuple(sentence[-(self.ngram_size - 1):])
            if context not in self.ngram_counts:
                break

            words = list(self.ngram_counts[context].keys())
            probs = list(self.ngram_counts[context].values())

            # Normalize probabilities to sum to 1
            probs_sum = sum(probs)
            if probs_sum > 0:
                probs = [p/probs_sum for p in probs]

            sentence.append(np.random.choice(words, p=probs))

        return ' '.join(sentence[1:-1])

    def generate_continuation(self, text, max_length=20):
        """
        Generate a continuation of the input text.

        Args:
            text: Input text to continue
            max_length: Maximum length of generated continuation

        Returns:
            Original text plus generated continuation
        """
        words = ['<s>'] + self._preprocess_text(text)

        # Create an initial context from the last words
        context_size = self.ngram_size - 1
        if len(words) >= context_size:
            context = tuple(words[-context_size:])
        else:
            # Padding if necessary
            context = tuple(['<s>'] * (context_size - len(words)) + words)

        # Generate the continuation
        continuation = []
        for _ in range(max_length):
            if context not in self.ngram_counts:
                break

            words_list = list(self.ngram_counts[context].keys())
            probs_list = list(self.ngram_counts[context].values())

            # Normalize probabilities
            probs_sum = sum(probs_list)
            if probs_sum > 0:
                probs_list = [p/probs_sum for p in probs_list]

            next_word = np.random.choice(words_list, p=probs_list)

            if next_word == '</s>':
                break

            continuation.append(next_word)

            # Update the context for the next word
            context = context[1:] + (next_word,)

        return text + " " + " ".join(continuation)

    def auto_complete(self, text):
        """
        Predict the next word after the input text.

        Args:
            text: Input text

        Returns:
            Most likely next word
        """
        tokens = self._preprocess_text(text)
        context = tuple(tokens[-(self.ngram_size - 1):])
        predictions = self.ngram_counts.get(context, {})

        if predictions:
            word, _ = max(predictions.items(), key=lambda item: item[1])
            return word
        else:
            return random.choice(list(self.dictionary))

    def autocomplete_text(self, text, num_words=5):
        """
        Auto-complete text with multiple words.

        Args:
            text: Input text
            num_words: Number of words to add

        Returns:
            Completed text
        """
        current_text = text
        result = current_text

        for _ in range(num_words):
            next_word = self.auto_complete(current_text)
            result += " " + next_word
            current_text += " " + next_word

        return result

    # ========== SPELLING CORRECTION METHODS ==========

    def _load_keyboard_graph(self, file_path):
        """
        Load keyboard layout graph for spelling correction.

        Args:
            file_path: Path to keyboard graph file

        Returns:
            Dictionary of adjacent keys
        """
        adjacency = {}
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                keys = line.strip().split()
                adjacency[keys[0]] = set(keys[1:])
        return adjacency

    def _soundex(self, word):
        """
        Compute Soundex code for phonetic matching.

        Args:
            word: Input word

        Returns:
            Soundex code string
        """
        soundex_dict = {
            "bfpv": "1", "cgjkqsxz": "2", "dt": "3",
            "l": "4", "mn": "5", "r": "6"
        }
        # Normalize the word and initialize with the first letter
        word = word.lower()
        first_letter = word[0]
        encoded = first_letter.upper()
        # Dictionary to map each letter to the corresponding Soundex number
        letter_to_code = {}
        for chars, code in soundex_dict.items():
            for char in chars:
                letter_to_code[char] = code
        # Iterate through the rest of the characters
        for char in word[1:]:
            if char in letter_to_code:
                code = letter_to_code[char]
                if encoded[-1] != code:  # Prevent consecutive identical codes
                    encoded += code
        # Ensure the code is exactly 4 characters long
        encoded = encoded.ljust(4, "0")[:4]
        return encoded

    def _levenshtein_distance(self, s1, s2, keyboard_graph=None):
        """
        Calculate Levenshtein edit distance between strings.

        Args:
            s1: First string
            s2: Second string
            keyboard_graph: Optional keyboard layout for weighted distance

        Returns:
            Edit distance as an integer
        """
        len_s1, len_s2 = len(s1), len(s2)
        dp = np.zeros((len_s1 + 1, len_s2 + 1))

        for i in range(len_s1 + 1):
            dp[i][0] = i
        for j in range(len_s2 + 1):
            dp[0][j] = j

        for i in range(1, len_s1 + 1):
            for j in range(1, len_s2 + 1):
                cost = 0 if s1[i - 1] == s2[j - 1] else 1

                if keyboard_graph and s1[i - 1] in keyboard_graph and s2[j - 1] in keyboard_graph[s1[i - 1]]:
                    cost = 0.5  # Adjust cost for adjacent keys

                dp[i][j] = min(
                    dp[i - 1][j] + 1,  # Deletion
                    dp[i][j - 1] + 1,  # Insertion
                    dp[i - 1][j - 1] + cost  # Substitution
                )

                if i > 1 and j > 1 and s1[i - 1] == s2[j - 2] and s1[i - 2] == s2[j - 1]:
                    dp[i][j] = min(dp[i][j], dp[i - 2][j - 2] + 1)  # Transposition

        return int(dp[len_s1, len_s2])

    def _correct_word(self, word, dictionary=None, keyboard_graph=None, k=5):
        """
        Basic spell correction using edit distance.

        Args:
            word: Word to correct
            dictionary: Dictionary to use (default is self.dictionary)
            keyboard_graph: Keyboard graph (default is self.keyboard_graph)
            k: Number of suggestions to return

        Returns:
            List of k closest corrections
        """
        dictionary = dictionary or self.dictionary
        keyboard_graph = keyboard_graph or self.keyboard_graph

        candidates = [(dict_word, self._levenshtein_distance(word, dict_word, keyboard_graph))
                      for dict_word in dictionary]
        candidates = sorted(candidates, key=lambda x: x[1])
        return [word for word, _ in candidates[:k]]

    def _smart_correct(self, word, k=5):
        """
        Optimized correction with initial filtering.

        Args:
            word: Word to correct
            k: Number of suggestions to return

        Returns:
            List of k closest corrections
        """
        filtered_dict = {w for w in self.dictionary if abs(len(w) - len(word)) <= 2 and w[0] == word[0]}
        return self._correct_word(word, filtered_dict, self.keyboard_graph, k)

    def _phonetic_correct(self, word, k=5):
        """
        Phonetic correction using Soundex.

        Args:
            word: Word to correct
            k: Number of suggestions to return

        Returns:
            List of k closest corrections
        """
        word_soundex = self._soundex(word)
        filtered_dict = {w for w in self.dictionary if self._soundex(w) == word_soundex}
        return self._correct_word(word, filtered_dict, self.keyboard_graph, k)

    def get_best_correction(self, word):
        """
        Get best spelling correction by combining methods.

        Args:
            word: Word to correct

        Returns:
            Best correction suggestion
        """
        suggestions = set(self._smart_correct(word) +
                          self._phonetic_correct(word) +
                          self._correct_word(word))

        suggestions_with_scores = [(s, self._levenshtein_distance(word, s, self.keyboard_graph) -
                                    0.1 * np.log(self.word_frequencies.get(s, 1)))
                                   for s in suggestions]

        return min(suggestions_with_scores, key=lambda x: x[1])[0] if suggestions_with_scores else word

    # ========== PUBLIC API METHODS ==========

    def correct_text(self, text):
        """
        Correct spelling errors in text.

        Args:
            text: Input text with potential errors

        Returns:
            Corrected text
        """
        words = self._preprocess_text(text)
        corrected_words = []

        for word in words:
            if word in self.dictionary:
                corrected_words.append(word)
            else:
                corrected_word = self.get_best_correction(word)
                corrected_words.append(corrected_word)

        return ' '.join(corrected_words)

    def correct_and_autocomplete(self, text, num_words=5):
        """
        Correct text and predict its continuation.

        Args:
            text: Input text with potential errors
            num_words: Number of words to add

        Returns:
            Dictionary containing original, corrected, and completed text
        """
        corrected_text = self.correct_text(text)
        completed_text = self.autocomplete_text(corrected_text, num_words)

        return {
            "original_text": text,
            "corrected_text": corrected_text,
            "completed_text": completed_text
        }


class TextProcessorApp:
    """Gradio interface for the TextProcessor system."""

    def __init__(self, processor):
        """
        Initialize the app with a TextProcessor instance.

        Args:
            processor: TextProcessor instance
        """
        self.processor = processor
        self.demo = self._create_interface()

    def _create_interface(self):
        """Create the Gradio interface."""
        demo = gr.Blocks(title="Text Correction and Auto-completion")

        with demo:
            gr.Markdown("# Spell Checker and Auto-completion")
            gr.Markdown(f"Dictionary loaded with {len(self.processor.dictionary)} words. "
                        f"{self.processor.ngram_size}-gram model with {len(self.processor.ngram_counts)} contexts.")

            with gr.Tab("Text correction"):
                with gr.Row():
                    text_input1 = gr.Textbox(label="Text to correct", lines=3,
                                           placeholder="Enter text with some spelling errors...")
                    text_output1 = gr.Textbox(label="Corrected text", lines=3)
                correct_btn = gr.Button("Correct the text")
                correct_btn.click(fn=self.processor.correct_text, inputs=text_input1, outputs=text_output1)

            with gr.Tab("Auto-completion"):
                with gr.Row():
                    text_input2 = gr.Textbox(label="Beginning of text", lines=2,
                                           placeholder="Enter the beginning of a sentence...")
                    slider = gr.Slider(minimum=1, maximum=10, value=5, step=1,
                                     label="Number of words to generate")
                    text_output2 = gr.Textbox(label="Completed text", lines=3)
                complete_btn = gr.Button("Complete the text")
                complete_btn.click(fn=self.processor.autocomplete_text,
                                 inputs=[text_input2, slider], outputs=text_output2)

            with gr.Tab("Correction + Auto-completion"):
                with gr.Row():
                    text_input3 = gr.Textbox(label="Text with errors", lines=3,
                                           placeholder="Enter text with errors...")
                    slider2 = gr.Slider(minimum=1, maximum=10, value=5, step=1,
                                      label="Number of words to generate")
                with gr.Row():
                    corr_output = gr.JSON(label="Results")
                process_btn = gr.Button("Correct and complete")
                process_btn.click(fn=self.processor.correct_and_autocomplete,
                                inputs=[text_input3, slider2], outputs=corr_output)

            with gr.Tab("Text generation"):
                with gr.Row():
                    text_input4 = gr.Textbox(label="Beginning of the text", lines=2,
                                           placeholder="Enter the beginning of a text...")
                    slider3 = gr.Slider(minimum=5, maximum=50, value=20, step=5,
                                      label="Maximum length (words)")
                    text_output4 = gr.Textbox(label="Generated text", lines=5)
                generate_btn = gr.Button("Generate the continuation")
                generate_btn.click(fn=self.processor.generate_continuation,
                                 inputs=[text_input4, slider3], outputs=text_output4)

        return demo

    def launch(self, **kwargs):
        """Launch the Gradio interface."""
        self.demo.launch(**kwargs)


def main():
    """Main function to run the application."""

    # Replace these paths with your actual file paths

    corpus_path = "/content/sample_data/big_data.txt"
    keyboard_graph_path = "/content/sample_data/qwerty_graph.txt"

    # Create the text processor
    processor = TextProcessor(
        corpus_path=corpus_path,
        keyboard_graph_path=keyboard_graph_path,
        ngram_size=2,
        smoothing_k=0.1,
        min_frequency=2
    )

    # Create and launch the app
    app = TextProcessorApp(processor)
    app.launch(share=True)

main()