In [42]:
# Install Libraries
!pip install emoji
!pip install nx2vos
!pip install python-louvain
!pip install nx2vos



In [43]:
# All Imports
import os
import io
import re
import csv
import time
import hashlib
import unicodedata
import json
import requests
from decimal import Decimal
from itertools import islice, combinations
from collections import deque, defaultdict, OrderedDict, namedtuple, Counter
from typing import Any, List, Dict, Set, Optional
from enum import Enum
from emoji import EMOJI_DATA
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path
import string
from math import factorial
from operator import itemgetter
from nx2vos import write_vos_json
import networkx as nx
from matplotlib.patches import FancyArrowPatch
from matplotlib.colors import to_rgba
import numpy as np
from community import community_louvain
from sklearn.manifold import MDS

In [44]:
# StopWordsRemover and Stopwords

class StopWordsRemover:
    def __init__(self, min_word_length: int, lang: str):
        self.min_word_length = min_word_length
        self.lang = lang
        self.max_accepted_garbage = 3
        self.nb_stop_words = 5000
        self.nb_stop_words_short = 500

        self.set_stop_words_field_specific_or_short: Set[str] = set()
        self.set_stop_words_short: Set[str] = set()
        self.set_stopwords_field_specific: Set[str] = set()
        self.set_stop_words: Set[str] = set()
        self.set_keep_words: Set[str] = set()
        self.set_remove_words: Set[str] = set()

        stop_words_long_and_short = Stopwords.get_stop_words(lang)
        self.stopwords_long = list(stop_words_long_and_short.get("long", []))
        self.nb_stop_words_short = min(self.nb_stop_words_short, max(0, len(self.stopwords_long) - 1))
        self.nb_stop_words = min(5000, max(0, len(self.stopwords_long) - 1))

        try:
            self.init()
        except Exception as ex:
            print(f"Exception: {ex}")

    def add_stop_words_to_keep(self, words_to_keep: Set[str]):
        if words_to_keep:
            self.set_keep_words.update(words_to_keep)

    def add_words_to_remove(self, words_to_remove: Set[str]):
        if words_to_remove:
            self.set_remove_words.update(words_to_remove)

    def use_user_supplied_stopwords(self, user_supplied_stopwords: Set[str], user_stopwords_replace_default: bool):
        if user_stopwords_replace_default:
            self.set_stop_words_field_specific_or_short = set(user_supplied_stopwords)
            self.set_stop_words_short = set(user_supplied_stopwords)
            self.set_stopwords_field_specific = set(user_supplied_stopwords)
            self.set_stop_words = set(user_supplied_stopwords)
        else:
            self.set_stop_words_field_specific_or_short.update(user_supplied_stopwords)
            self.set_stop_words_short.update(user_supplied_stopwords)
            self.set_stop_words.update(user_supplied_stopwords)

    def add_field_specific_stop_words(self, field_specific_stop_words_to_remove: Set[str]):
        if field_specific_stop_words_to_remove:
            self.set_stop_words_field_specific_or_short.update(field_specific_stop_words_to_remove)
            self.set_stop_words.update(field_specific_stop_words_to_remove)

    def init(self):
        self.set_keep_words = set()
        self.set_stop_words_short = set()

        list_general_stopwords_large = self.stopwords_long[:self.nb_stop_words]
        list_general_stopwords_short = self.stopwords_long[:self.nb_stop_words_short]

        self.set_stop_words.update(list_general_stopwords_large)
        self.set_stop_words.update(Stopwords.get_stopwords_valid_for_all_languages())

        short_words = Stopwords.get_stop_words(self.lang).get("short", [])
        if not short_words:
            self.set_stop_words_short.update(list_general_stopwords_short)
        else:
            self.set_stop_words_short.update(short_words)

        self.set_stop_words_field_specific_or_short.update(self.set_stop_words_short)

    def should_it_be_removed(self, term: str) -> bool:
        entry_word = term
        multiple_word = " " in entry_word

        write = True

        if multiple_word:
            words_ngrams = entry_word.split(" ")
            words_ngrams_length = len(words_ngrams)

            for words_ngram in words_ngrams:
                if len(words_ngram) < self.min_word_length:
                    write = False
                    break

            if words_ngrams_length == 2 and (
                words_ngrams[0].lower().strip() in self.set_stop_words_field_specific_or_short or
                words_ngrams[1].lower().strip() in self.set_stop_words_field_specific_or_short
            ):
                write = False

            if words_ngrams_length > 2:
                score_garbage = 0

                for i, current_term in enumerate(words_ngrams):
                    current_term = current_term.lower().strip()

                    if (i == 0 or i == (words_ngrams_length - 1)) and current_term in self.set_stop_words_field_specific_or_short:
                        score_garbage = self.max_accepted_garbage + 1
                        continue

                    if (i == 0 or i == (words_ngrams_length - 1)) and current_term in self.set_stop_words_short:
                        write = False
                        continue

                    if current_term in self.set_stop_words_short:
                        score_garbage += 3
                        continue

                    if current_term in self.set_stopwords_field_specific:
                        score_garbage += 2
                        continue

                if entry_word in self.set_stop_words:
                    score_garbage = self.max_accepted_garbage + 1

                if score_garbage > self.max_accepted_garbage:
                    write = False

        elif entry_word in self.set_stop_words and entry_word not in self.set_keep_words:
            write = False

        if entry_word in self.set_keep_words:
            write = True
        if entry_word in self.set_remove_words:
            write = False

        return not write


In [45]:
import os
from pathlib import Path

class Stopwords:
    twitter_stop_words = {"rt", "w/"}
    common_stop_words = {"and", "for", "nbsp", "http", "https", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "20", "25", "30", "40", "50", "100", "1000"}

    cache = {}
    cache_twitter = {}

    @staticmethod
    def get_stop_words(lang):
        if lang in Stopwords.cache:
            return Stopwords.cache[lang]

        stop_words = set(Stopwords.common_stop_words)
        short_stop_words = set(Stopwords.common_stop_words)

        stop_words.update(Stopwords.twitter_stop_words)
        short_stop_words.update(Stopwords.twitter_stop_words)

        path_locale = Stopwords.get_resource_path()

        path_resource = Path(path_locale) / f"{lang}.txt"
        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                stop_words.update(line.strip() for line in file)

        path_resource = Path(path_locale) / f"{lang}_short.txt"
        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                short_stop_words.update(line.strip() for line in file)

        pair = {"short": short_stop_words, "long": stop_words}
        Stopwords.cache[lang] = pair

        return pair

    @staticmethod
    def get_stop_words_useful_in_sentiment_analysis(lang):
        stop_words = set()
        path_locale = Stopwords.get_resource_path()

        path_resource = Path(path_locale) / f"{lang}_stopword_sentiment.txt"
        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                stop_words.update(line.strip() for line in file)

        return stop_words

    @staticmethod
    def get_scientific_stopwords_in_english():
        return Stopwords._get_scientific_stopwords('scientificstopwords_en.txt')

    @staticmethod
    def get_scientific_stopwords_in_french():
        return Stopwords._get_scientific_stopwords('scientificstopwords_fr.txt')

    @staticmethod
    def _get_scientific_stopwords(filename):
        stop_words = set()
        path_locale = Stopwords.get_resource_path()

        path_resource = Path(path_locale) / filename
        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                stop_words.update(line.strip() for line in file)

        return stop_words

    @staticmethod
    def get_twitter_stopwords(long_list=True):
        words = set()
        path_locale = Stopwords.get_resource_path()

        if long_list:
            path_resource = Path(path_locale) / "twitter_long.txt"
        else:
            path_resource = Path(path_locale) / "twitter_short.txt"

        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                words.update(line.strip() for line in file)

        return words

    @staticmethod
    def get_stopwords_valid_for_all_languages():
        words = set(Stopwords.common_stop_words)
        words.update(Stopwords.twitter_stop_words)

        path_locale = Stopwords.get_resource_path()

        path_resource = Path(path_locale) / "stopwords_all_languages.txt"
        if path_resource.exists():
            with path_resource.open('r', encoding='utf-8') as file:
                words.update(line.strip() for line in file)

        return words

    @staticmethod
    def get_resource_path():
        # Adjust this path according to your project's structure

        return f"drive/MyDrive/Colab Notebooks/Functions App/text_files/stopwords/"

class StopWordsRemover:
    def __init__(self, min_word_length, lang):
        self.min_word_length = min_word_length
        self.max_accepted_garbage = 3
        self.nb_stop_words = 5000
        self.nb_stop_words_short = 500

        self.set_stop_words_field_specific_or_short = set()
        self.set_stop_words_short = set()
        self.set_stopwords_field_specific = set()
        self.set_stop_words = set()
        self.set_keep_words = set()
        self.set_remove_words = set()
        self.list_general_stopwords_large = []
        self.list_general_stopwords_short = []
        self.stopwords_long = []
        self.stop_words_long_and_short = Stopwords.get_stop_words(lang)

        self.init()

    def add_stop_words_to_keep(self, words_to_keep):
        if words_to_keep is not None:
            self.set_keep_words.update(words_to_keep)

    def add_words_to_remove(self, words_to_remove):
        if words_to_remove is not None:
            self.set_remove_words.update(words_to_remove)

    def use_user_supplied_stopwords(self, user_supplied_stopwords, user_stopwords_replace_default):
        if user_stopwords_replace_default:
            self.set_stop_words_field_specific_or_short = set(user_supplied_stopwords)
            self.set_stop_words_short = set(user_supplied_stopwords)
            self.set_stopwords_field_specific = set(user_supplied_stopwords)
            self.set_stop_words = set(user_supplied_stopwords)
        else:
            self.set_stop_words_field_specific_or_short.update(user_supplied_stopwords)
            self.set_stop_words_short.update(user_supplied_stopwords)
            self.set_stop_words.update(user_supplied_stopwords)

    def add_field_specific_stop_words(self, field_specific_stop_words_to_remove):
        if field_specific_stop_words_to_remove is not None:
            self.set_stop_words_field_specific_or_short.update(field_specific_stop_words_to_remove)
            self.set_stop_words.update(field_specific_stop_words_to_remove)

    def init(self):
        self.set_keep_words = set()
        self.set_stop_words_short = set()

        self.list_general_stopwords_large = list(self.stop_words_long_and_short["long"])[:self.nb_stop_words]
        self.list_general_stopwords_short = list(self.stop_words_long_and_short["short"])[:self.nb_stop_words_short]

        self.set_stop_words.update(self.list_general_stopwords_large)
        self.set_stop_words.update(Stopwords.get_stopwords_valid_for_all_languages())
        if not self.stop_words_long_and_short['short']:
            self.set_stop_words_short.update(self.list_general_stopwords_short)
        else:
            self.set_stop_words_short.update(self.stop_words_long_and_short['short'])
        self.set_stop_words_field_specific_or_short.update(self.set_stop_words_short)

    def should_it_be_removed(self, term):
        entry_word = term
        write = True
        words_ngram = entry_word.split(" ")
        multiple_word = len(words_ngram) > 1

        if multiple_word:
            if any(len(word) < self.min_word_length for word in words_ngram):
                write = False

            if len(words_ngram) == 2:
                if words_ngram[0].lower().strip() in self.set_stop_words_field_specific_or_short or words_ngram[1].lower().strip() in self.set_stop_words_field_specific_or_short:
                    write = False

            if len(words_ngram) > 2:
                score_garbage = 0
                for i, current_term in enumerate(words_ngram):
                    current_term = current_term.lower().strip()
                    if (i == 0 or i == len(words_ngram) - 1) and current_term in self.set_stop_words_field_specific_or_short:
                        score_garbage += self.max_accepted_garbage + 1
                        continue

                    if (i == 0 or i == len(words_ngram) - 1) and current_term in self.set_stop_words_short:
                        write = False
                        continue

                    if current_term in self.set_stop_words_short:
                        score_garbage += 3
                        continue

                    if current_term in self.set_stopwords_field_specific:
                        score_garbage += 2
                        continue

                if entry_word in self.set_stop_words:
                    score_garbage = self.max_accepted_garbage + 1

                if score_garbage > self.max_accepted_garbage:
                    write = False

        elif entry_word in self.set_stop_words and entry_word not in self.set_keep_words:
            write = False

        if entry_word in self.set_keep_words:
            write = True
        if entry_word in self.set_remove_words:
            write = False

        return not write


In [46]:
# TypeOfTextFragmentEnum and TypeOfTextFragment
class TypeOfTextFragmentEnum(Enum):
    TERM = "TERM"
    NGRAM = "NGRAM"
    ONOMATOPAE = "ONOMATOPAE"
    TEXTO_SPEAK = "TEXTO_SPEAK"
    EMOTICON_IN_ASCII = "EMOTICON_IN_ASCII"
    WHITE_SPACE = "WHITE_SPACE"
    EMOJI = "EMOJI"
    PUNCTUATION = "PUNCTUATION"
    QUESTION = "QUESTION"
    TOO_SHORT = "TOO_SHORT"
    HASHTAG = "HASHTAG"

class TypeOfTextFragment:
    def __init__(self, type_of_token_name: Optional[str] = None, type_of_token_enum: Optional[TypeOfTextFragmentEnum] = None):
        if type_of_token_enum:
            self.type_of_text_fragment_enum = type_of_token_enum
        elif type_of_token_name:
            self.set_type_of_text_fragment_name(type_of_token_name)
        else:
            self.type_of_text_fragment_enum = TypeOfTextFragmentEnum.NGRAM

    def get_type_of_text_fragment_enum(self) -> TypeOfTextFragmentEnum:
        return self.type_of_text_fragment_enum

    def set_type_of_text_fragment_name(self, type_of_token_name: str):
        try:
            self.type_of_text_fragment_enum = TypeOfTextFragmentEnum[type_of_token_name]
        except KeyError:
            print(f"Error: type of token name '{type_of_token_name}' is not a valid name")
            self.type_of_text_fragment_enum = TypeOfTextFragmentEnum.NGRAM


In [47]:
# TextFragment
class TextFragment:
    def __init__(self):
        self.index_cardinal = 0
        self.index_ordinal = 0
        self.index_cardinal_in_sentence = 0
        self.index_ordinal_in_sentence = 0
        self.length = 0
        self.type_of_text_fragment_enum: Optional[TypeOfTextFragmentEnum] = None
        self.sentence_like_fragment_index = 0

        self.original_form = ""
        self.original_form_lemmatized = ""

    def get_original_form(self) -> str:
        return self.original_form

    def set_original_form(self, original_form: str):
        self.original_form = original_form

    def add_char_to_original_form(self, c: str):
        if len(c) == 1:  # Ensure it's a single character
            self.original_form += c

    def add_string_to_original_form(self, s: str):
        self.original_form += s

    def get_index_cardinal(self) -> int:
        return self.index_cardinal

    def set_index_cardinal(self, index_cardinal: int):
        self.index_cardinal = index_cardinal

    def get_index_ordinal(self) -> int:
        return self.index_ordinal

    def set_index_ordinal(self, index_ordinal: int):
        self.index_ordinal = index_ordinal

    def get_length(self) -> int:
        return self.length

    def set_length(self, length: int):
        self.length = length

    def get_type_of_text_fragment_enum(self) -> Optional[TypeOfTextFragmentEnum]:
        return self.type_of_text_fragment_enum

    def get_index_cardinal_in_sentence(self) -> int:
        return self.index_cardinal_in_sentence

    def set_index_cardinal_in_sentence(self, index_cardinal_in_sentence: int):
        self.index_cardinal_in_sentence = index_cardinal_in_sentence

    def get_index_ordinal_in_sentence(self) -> int:
        return self.index_ordinal_in_sentence

    def set_index_ordinal_in_sentence(self, index_ordinal_in_sentence: int):
        self.index_ordinal_in_sentence = index_ordinal_in_sentence

    def get_original_form_lemmatized(self) -> str:
        return self.original_form_lemmatized

    def set_original_form_lemmatized(self, original_form_lemmatized: str):
        self.original_form_lemmatized = original_form_lemmatized

    def get_sentence_like_fragment_index(self) -> int:
        return self.sentence_like_fragment_index

    def set_sentence_like_fragment_index(self, sentence_like_fragment_index: int):
        self.sentence_like_fragment_index = sentence_like_fragment_index

    def __hash__(self) -> int:
        return hash(self.original_form)

    def __eq__(self, other) -> bool:
        if isinstance(other, TextFragment):
            return self.original_form == other.original_form
        return False

In [48]:
class Emoji(TextFragment):
    def __init__(self):
        super().__init__()
        self.semi_colon_form: Optional[str] = None

    def get_semi_colon_form(self) -> Optional[str]:
        return self.semi_colon_form

    def set_semi_colon_form(self, semi_colon_form: str):
        self.semi_colon_form = semi_colon_form

    def get_type_of_text_fragment_enum(self) -> Optional['TypeOfTextFragmentEnum']:
        return TypeOfTextFragmentEnum.EMOJI


In [49]:
class Category:
    class CategoryEnum(Enum):
        _10 = "neutral tone"
        _11 = "positive tone"
        _111 = "positive tone, not promoted"
        _12 = "negative tone"
        _13 = "possibly ironic tone"
        _14 = "fun tone"
        _17 = "delight"
        _20 = "neutral intensity"
        _21 = "weak intensity"
        _22 = "strong intensity"
        _3 = "time"
        _30 = "neutral time"
        _31 = "past time"
        _311 = "immediate past"
        _320 = "present time"
        _321 = "immediate present: just now"
        _33 = "future time"
        _331 = "immediate future"
        _40 = "question"
        _50 = "neutral address"
        _51 = "subjective address"
        _52 = "direct address"
        _521 = "call to action"
        _60 = "neutral topic"
        _61 = "commercial tone / promoted"
        _611 = "commercial offer"
        _612 = "tweeted by the client"
        _6121 = "a retweet of the client's tweet"
        _62 = "factual statement"
        _621 = "factual statement - statistics cited"
        _9 = "not suitable for semantic analysis"
        _91 = "english text not detected"
        _92 = "text too short or garbled"

        def __str__(self):
            return self.value

    def __init__(self, cat_number: str):
        self.category_enum: Category.CategoryEnum = self.set_category_enum_from_string(cat_number)

    def get_category_enum(self) -> 'CategoryEnum':
        return self.category_enum

    def set_category_enum(self, category_enum: 'CategoryEnum'):
        self.category_enum = category_enum

    def set_category_enum_from_string(self, category_enum_from_string: str) -> 'CategoryEnum':
        is_valid_category_name = False
        for c in Category.CategoryEnum:
            if c.name == f"_{category_enum_from_string}":
                is_valid_category_name = True
                return c

        if not is_valid_category_name:
            print("error in class Category")
            print(f"category name {category_enum_from_string} is not a valid name")
            return Category.CategoryEnum._10


In [50]:
# Assuming that TypeOfTextFragmentEnum and Category classes are defined elsewhere
class PatternOfInterest:
    def __init__(self):
        self.description: str = ""
        self.regex: str = ""
        self.should_apply_to_lower_case_text: bool = False
        self.categories: List['Category'] = []
        self.type_of_text_fragment_enum: Optional['TypeOfTextFragmentEnum'] = None
        self.pattern: Optional[re.Pattern] = None
        self.matched: Optional[bool] = None

    def get_description(self) -> str:
        return self.description

    def set_description(self, description: str):
        self.description = description

    def get_regex(self) -> str:
        return self.regex

    def set_regex(self, regex: str):
        self.regex = regex
        self.pattern = re.compile(regex)

    def is_should_apply_to_lower_case_text(self) -> bool:
        return self.should_apply_to_lower_case_text

    def set_should_apply_to_lower_case_text(self, should_apply_to_lower_case_text: bool):
        self.should_apply_to_lower_case_text = should_apply_to_lower_case_text

    def get_categories(self) -> List['Category']:
        return self.categories

    def set_categories(self, categories: List['Category']):
        self.categories = categories

    def get_type_of_text_fragment_enum(self) -> Optional['TypeOfTextFragmentEnum']:
        return self.type_of_text_fragment_enum

    def set_type_of_text_fragment(self, type_of_token_name: str):
        # Assuming TypeOfTextFragment is another class with a method get_type_of_text_fragment_enum
        self.type_of_text_fragment_enum = TypeOfTextFragment(type_of_token_name).get_type_of_text_fragment_enum()

    def set_type_of_text_fragment_enum(self, type_of_text_fragment_enum: 'TypeOfTextFragmentEnum'):
        self.type_of_text_fragment_enum = type_of_text_fragment_enum

    def get_pattern(self) -> Optional[re.Pattern]:
        return self.pattern

    def set_pattern(self, pattern: re.Pattern):
        self.pattern = pattern

    def get_matched(self) -> Optional[bool]:
        return self.matched

    def set_matched(self, matched: bool):
        self.matched = matched


In [51]:
# Assuming that TypeOfTextFragmentEnum and PatternOfInterest classes are defined elsewhere
class NonWord(TextFragment):
    def __init__(self):
        super().__init__()
        self.poi: Optional[PatternOfInterest] = None
        self.type_of_text_fragment_enum: Optional[TypeOfTextFragmentEnum] = None

    def get_type_of_text_fragment_enum(self) -> Optional[TypeOfTextFragmentEnum]:
        return self.type_of_text_fragment_enum

    def get_poi(self) -> Optional[PatternOfInterest]:
        return self.poi

    def set_poi(self, poi: 'PatternOfInterest'):
        self.poi = poi
        self.type_of_text_fragment_enum = poi.get_type_of_text_fragment_enum()

    def set_type_of_text_fragment_enum(self, type_of_text_fragment_enum: 'TypeOfTextFragmentEnum'):
        self.type_of_text_fragment_enum = type_of_text_fragment_enum


In [52]:
class Punctuation(TextFragment):
    def get_type_of_text_fragment_enum(self) -> 'TypeOfTextFragmentEnum':
        return TypeOfTextFragmentEnum.PUNCTUATION

    def to_non_word(self, poi: 'PatternOfInterest', string: str) -> 'NonWord':
        non_word = NonWord()
        non_word.set_poi(poi)
        non_word.set_original_form(string)
        non_word.set_type_of_text_fragment_enum(poi.get_type_of_text_fragment_enum())
        non_word.set_index_cardinal(self.get_index_cardinal())
        non_word.set_index_ordinal(self.get_index_ordinal())
        non_word.set_type_of_text_fragment_enum(poi.get_type_of_text_fragment_enum())

        return non_word


In [53]:
class WhiteSpace(TextFragment):
    def __init__(self):
        super().__init__()
        self.sentence_or_line_break = False

    def get_type_of_text_fragment_enum(self) -> 'TypeOfTextFragmentEnum':
        return TypeOfTextFragmentEnum.WHITE_SPACE

    def is_sentence_or_line_break(self) -> bool:
        return self.sentence_or_line_break

    def set_sentence_or_line_break(self, sentence_or_line_break: bool):
        self.sentence_or_line_break = sentence_or_line_break


In [54]:
# Term
class Term(TextFragment):
    def __init__(self):
        super().__init__()
        self.cleaned_form: Optional[str] = None
        self.cleaned_and_stripped_form: Optional[str] = None

    def get_type_of_text_fragment_enum(self) -> TypeOfTextFragmentEnum:
        return TypeOfTextFragmentEnum.TERM

    @property
    def cleaned_form(self) -> Optional[str]:
        return self._cleaned_form

    @cleaned_form.setter
    def cleaned_form(self, value: str):
        self._cleaned_form = value

    @property
    def cleaned_and_stripped_form(self) -> Optional[str]:
        return self._cleaned_and_stripped_form

    @cleaned_and_stripped_form.setter
    def cleaned_and_stripped_form(self, value: str):
        self._cleaned_and_stripped_form = value

    def get_cleaned_and_stripped_if_condition(self, stripped: bool) -> Optional[str]:
        return self.cleaned_and_stripped_form if stripped else self.cleaned_form

    def to_ngram(self) -> 'NGram':
        ngram = NGram()
        ngram.set_index_cardinal(self.get_index_cardinal())
        ngram.set_index_cardinal_in_sentence(self.get_index_cardinal_in_sentence())
        ngram.set_index_ordinal(self.get_index_ordinal())
        ngram.set_index_ordinal_in_sentence(self.get_index_ordinal_in_sentence())
        ngram.set_original_form(self.get_original_form())
        ngram.terms.append(self)
        return ngram

In [55]:
# Ngram
from typing import List

class NGram(TextFragment):
    def __init__(self):
        super().__init__()
        self.terms: List[Term] = []

    def get_terms(self) -> List[Term]:
        return self.terms

    def set_terms(self, terms: List[Term]):
        self.terms = terms

    def get_type_of_text_fragment_enum(self) -> TypeOfTextFragmentEnum:
        return TypeOfTextFragmentEnum.NGRAM

    def get_cleaned_and_stripped_ngram(self) -> str:
        return ' '.join(term.cleaned_and_stripped_form for term in self.terms).strip()

    def get_cleaned_and_stripped_ngram_if_condition(self, stripped: bool) -> str:
        return ' '.join(term.get_cleaned_and_stripped_if_condition(stripped) for term in self.terms).strip()

    def get_cleaned_ngram(self) -> str:
        return ' '.join(term.cleaned_form for term in self.terms).strip()


In [56]:
class SentenceLike:
    def __init__(self):
        self.ngrams: List[NGram] = []
        self.text_fragments: List[TextFragment] = []
        self.index_ordinal: int = 0
        self.index_cardinal: int = 0

    def get_ngrams(self) -> List[NGram]:
        return self.ngrams

    def set_ngrams(self, ngrams: List[NGram]):
        self.ngrams = ngrams

    def get_text_fragments(self) -> List[TextFragment]:
        return self.text_fragments

    def set_text_fragments(self, text_fragments: List[TextFragment]):
        self.text_fragments = text_fragments

    def get_index_ordinal(self) -> int:
        return self.index_ordinal

    def set_index_ordinal(self, index_ordinal: int):
        self.index_ordinal = index_ordinal

    def get_index_cardinal(self) -> int:
        return self.index_cardinal

    def set_index_cardinal(self, index_cardinal: int):
        self.index_cardinal = index_cardinal

    def __str__(self) -> str:
        return ''.join(tf.get_original_form() for tf in self.text_fragments)


In [57]:
class NGramFinderBisForTextFragments:
    @staticmethod
    def generate_ngrams_upto(ngrams: List[NGram], max_gram_size: int) -> List[NGram]:
        text_fragments_augmented_with_ngrams = []
        ngram_size = 0
        it = deque(ngrams)  # Use deque for efficient indexing
        length = len(it)

        for i in range(length):
            unigram = it[i]

            if not isinstance(unigram, NGram):
                print("Alert: a non-NGram detected in method generate_ngrams_upto")
                print("TextFragment was:", unigram.get_original_form())
                continue

            unigram.set_index_ordinal_in_sentence(i)

            # 1. Add the word itself
            text_fragments_augmented_with_ngrams.append(unigram)

            # 2. Open a new NGram
            ngram = NGram()
            ngram.set_terms([unigram.get_terms()[0]])
            ngram_size = 1

            # 2- Insert previous terms of the word and add those too
            for j in range(i - 1, -1, -1):
                if ngram_size >= max_gram_size:
                    break

                previous_unigram = it[j]
                previous_term = previous_unigram.get_terms()[0]
                ngram.get_terms().insert(0, previous_term)
                new_ngram = NGram()
                new_ngram.set_terms(list(ngram.get_terms()))
                new_ngram.set_index_cardinal(previous_unigram.index_cardinal)
                new_ngram.set_index_ordinal(previous_unigram.index_ordinal)
                new_ngram.set_index_cardinal_in_sentence(previous_unigram.index_cardinal_in_sentence)
                new_ngram.set_index_ordinal_in_sentence(previous_unigram.index_ordinal_in_sentence)
                new_ngram.set_original_form(new_ngram.get_cleaned_and_stripped_ngram_if_condition(stripped=True))
                text_fragments_augmented_with_ngrams.append(new_ngram)
                ngram_size += 1

        return text_fragments_augmented_with_ngrams

In [58]:
class SentenceLikeFragmentsDetector:
    def __init__(self):
        self.stop_punctuations: Set[str] = {".", ":", ";", ",", "(", ")", "\"", "«", "»", "“", "”", "•", "‘", "’", "'", "„", "[", "]", "", "<", ">"}
        self.matching_punctuations: Dict[str, str] = {
            "(": ")", "\"": "\"", "«": "»", "“": "”", "‘": "’", "'": "'", "„": "“", "[": "]", "<": ">"
        }
        self.list_of_sentence_like_fragments: List[SentenceLike] = []
        self.list_of_n_grams: List[NGram] = []
        self.sentence_like = SentenceLike()
        self.text_fragment_already_added_to_a_sentence = False
        self.opening_punctuation_identified = False
        self.expected_closing_punctuation_sign = ""
        self.sentence_like_fragments_counter = 0

    def return_sentence_like_fragments(self, text_fragments: List[TextFragment]) -> List[SentenceLike]:
        self.sentence_like.set_index_cardinal(0)
        self.sentence_like.set_index_ordinal(0)

        for next_text_fragment in text_fragments:
            self.text_fragment_already_added_to_a_sentence = False
            type_of_text_fragment = next_text_fragment.get_type_of_text_fragment_enum()

            if type_of_text_fragment == TypeOfTextFragmentEnum.TERM:
                self.add_term_to_n_grams_of_current_sentence(next_text_fragment)
            elif type_of_text_fragment == TypeOfTextFragmentEnum.PUNCTUATION:
                punctuation_sign = next_text_fragment.get_original_form()
                if punctuation_sign in self.matching_punctuations and not self.opening_punctuation_identified:
                    self.opening_punctuation_identified = True
                    self.expected_closing_punctuation_sign = self.matching_punctuations[punctuation_sign]
                    self.close_current_sentence_and_open_new_one(False)
                    self.add_text_fragment_to_current_sentence(next_text_fragment)
                elif punctuation_sign in self.stop_punctuations:
                    if self.opening_punctuation_identified:
                        self.add_text_fragment_to_current_sentence(next_text_fragment)
                        if punctuation_sign == self.expected_closing_punctuation_sign:
                            self.close_current_sentence_and_open_new_one(True)
                    else:
                        self.close_current_sentence_and_open_new_one(False)

            self.add_text_fragment_to_current_sentence(next_text_fragment)

        self.close_current_sentence_and_open_new_one(False)
        return self.list_of_sentence_like_fragments

    def close_current_sentence_and_open_new_one(self, is_end_of_matching_signs: bool):
        self.sentence_like.get_ngrams().extend(self.list_of_n_grams)
        if self.sentence_like.get_text_fragments():
            self.list_of_sentence_like_fragments.append(self.sentence_like)
        self.sentence_like_fragments_counter += 1
        self.sentence_like = SentenceLike()
        self.sentence_like.set_index_ordinal(len(self.list_of_sentence_like_fragments))
        self.list_of_n_grams = []
        if is_end_of_matching_signs:
            self.opening_punctuation_identified = False
            self.expected_closing_punctuation_sign = ""

    def add_text_fragment_to_current_sentence(self, tf: TextFragment):
        if self.text_fragment_already_added_to_a_sentence:
            return
        if not self.sentence_like.get_text_fragments():
            self.sentence_like.set_index_cardinal(tf.get_index_cardinal())
        tf.set_index_ordinal_in_sentence(len(self.sentence_like.get_text_fragments()))
        tf.set_sentence_like_fragment_index(self.sentence_like_fragments_counter)
        self.sentence_like.get_text_fragments().append(tf)
        self.text_fragment_already_added_to_a_sentence = True

    def add_term_to_n_grams_of_current_sentence(self, next_text_fragment: TextFragment):
        term = next_text_fragment
        ngram = NGram()
        ngram.set_index_cardinal(term.get_index_cardinal())
        ngram.set_index_ordinal(term.get_index_ordinal())
        ngram.set_index_ordinal_in_sentence(term.get_index_ordinal_in_sentence())
        ngram.set_sentence_like_fragment_index(self.sentence_like_fragments_counter)
        ngram.get_terms().append(term)
        ngram.set_original_form(term.get_original_form())
        self.list_of_n_grams.append(ngram)


In [59]:
class Clock:
    def __init__(self, action_being_clocked, start_silent=False):
        """
        Initializes the Clock with the action to be clocked and optional silent mode.
        :param action_being_clocked: Description of the action being clocked.
        :param start_silent: Whether to start silently (default is False).
        """
        self.action_being_clocked = action_being_clocked
        self.silent = start_silent
        self.start = time.time()
        self.log_text = []
        self.intermediate_text = []

        if not self.silent:
            self.start_clock()

    def start_clock(self):
        """
        Starts the clock and prints the start message if not in silent mode.
        """
        self.start = time.time()
        self.log_text.append(f"{self.action_being_clocked}...")
        if not self.silent:
            print(self.log_text[-1])

    def start_clock_to_string(self):
        """
        Starts the clock and returns the start message as a string.
        :return: Start message string.
        """
        self.start = time.time()
        message = f"{self.action_being_clocked}..."
        return message

    def print_intermediary_text(self, intermediary_text):
        """
        Appends intermediary text to the log and prints it if not in silent mode.
        :param intermediary_text: The text to append and print.
        """
        self.intermediate_text.append(intermediary_text)
        if not self.silent:
            print('\n'.join(self.intermediate_text))
        self.intermediate_text = []

    def print_intermediary_text_to_string(self, it):
        """
        Appends intermediary text and returns it as a string.
        :param it: The text to append.
        :return: Intermediary text string.
        """
        self.intermediate_text = [it]
        return '\n'.join(self.intermediate_text)

    def get_elapsed_time(self):
        """
        Returns the elapsed time in milliseconds since the clock was started.
        :return: Elapsed time in milliseconds.
        """
        current_time = time.time()
        return int((current_time - self.start) * 1000)

    def print_elapsed_time(self):
        """
        Prints the elapsed time if not in silent mode.
        """
        if not self.silent:
            print(self.compute_elapsed_time())

    def print_elapsed_time_to_string(self):
        """
        Returns the elapsed time as a string.
        :return: Elapsed time string.
        """
        return self.compute_elapsed_time()

    def compute_elapsed_time(self):
        """
        Computes the elapsed time in a human-readable format.
        :return: Elapsed time string.
        """
        elapsed_time = time.time() - self.start
        if elapsed_time < 10:
            seconds = round(elapsed_time)
            return f"{seconds} seconds"
        elif elapsed_time < 60:
            return f"{int(elapsed_time)} seconds"
        elif elapsed_time < 3600:
            minutes = int(elapsed_time // 60)
            seconds = int(elapsed_time % 60)
            return f"{minutes} minutes {seconds} seconds"
        else:
            hours = int(elapsed_time // 3600)
            minutes = int((elapsed_time % 3600) // 60)
            seconds = int(elapsed_time % 60)
            return f"{hours} hours {minutes} minutes {seconds} seconds"

    def close_and_print_clock(self, closing_message=""):
        """
        Ends the clock, prints the closing message and duration if not in silent mode.
        :param closing_message: Optional closing message.
        """
        if not self.silent:
            print(self.write_log_text_closing(closing_message))

    def close_and_print_clock_to_string(self, closing_message=""):
        """
        Ends the clock and returns the closing message and duration as a string.
        :param closing_message: Optional closing message.
        :return: Closing message string.
        """
        return self.write_log_text_closing(closing_message)

    def write_log_text_closing(self, closing_message):
        """
        Prepares the closing log text with the action description and elapsed time.
        :param closing_message: Optional closing message.
        :return: Closing log text.
        """
        self.log_text = []
        closing_log = (
            f"{closing_message}\n"
            f"finished {self.action_being_clocked}. [Duration: {self.compute_elapsed_time()}]"
        )
        return closing_log

    def get_action(self):
        """
        Returns the description of the action being clocked.
        :return: Action description.
        """
        return self.action_being_clocked


In [60]:
class Multiset:
    def __init__(self, max_elements=None):
        self.internal_map = defaultdict(int)
        self.max_elements = max_elements

    def get_internal_map(self):
        return dict(self.internal_map)

    def set_internal_map(self, internal_map):
        self.internal_map = defaultdict(int, internal_map)

    def set_count(self, element, count):
        self.internal_map[element] = count

    def add_one(self, element):
        self.internal_map[element] += 1

    def add_one_with_limit_to_max_elements(self, element):
        if self.max_elements is not None and len(self.internal_map) >= self.max_elements:
            return
        self.internal_map[element] += 1

    def add_several(self, element, count):
        self.internal_map[element] += count

    def add_all_from_multiset(self, other_multiset):
        for element, count in other_multiset.get_entry_set():
            self.add_several(element, count)

    def add_all_from_map(self, map):
        for element, count in map.items():
            self.add_several(element, count)

    def add_all_from_list_or_set(self, collection):
        for element in collection:
            self.add_one(element)

    def remove_one(self, element):
        if element in self.internal_map:
            if self.internal_map[element] > 1:
                self.internal_map[element] -= 1
            else:
                del self.internal_map[element]

    def remove_several(self, element, number_to_be_removed):
        if element in self.internal_map:
            if self.internal_map[element] > number_to_be_removed:
                self.internal_map[element] -= number_to_be_removed
            else:
                del self.internal_map[element]

    def get_count(self, element):
        return self.internal_map.get(element, 0)

    def get_size(self):
        return len(self.internal_map)

    def get_element_set(self):
        return set(self.internal_map.keys())

    def get_entry_set(self):
        return self.internal_map.items()

    def sort_by_freq(self):
        return OrderedDict(sorted(self.internal_map.items(), key=lambda item: item[1]))

    def sort_desc(self, multiset):
        return sorted(multiset.get_entry_set(), key=lambda item: item[1], reverse=True)

    def sort_asc(self, multiset):
        return sorted(multiset.get_entry_set(), key=lambda item: item[1])

    def sort_desc_keep_most_frequent(self, multiset, n):
        return list(islice(self.sort_desc(multiset), n))

    def keep_most_frequent(self, multiset, n):
        new_multiset = Multiset()
        for element, count in self.sort_desc_keep_most_frequent(multiset, n):
            new_multiset.add_several(element, count)
        return new_multiset

    def sort_desc_keep_above_min_freq(self, multiset, n):
        return [entry for entry in self.sort_desc(multiset) if entry[1] > n]

    def to_list_of_elements(self):
        return list(self.internal_map.keys())

    def to_list_of_all_occurrences(self):
        return [element for element, count in self.internal_map.items() for _ in range(count)]

    def print_top_ranked_elements(self, top_rank):
        top_elements = self.sort_desc_keep_most_frequent(self, top_rank)
        for element, count in top_elements:
            print(f"{element} x {count}")

    def top_ranked_elements_to_string(self, top_rank):
        top_elements = self.sort_desc_keep_most_frequent(self, top_rank)
        return ", ".join([f"{element} x {count}" for element, count in top_elements])

    def top_ranked_elements_to_string_without_counts(self, top_rank):
        top_elements = self.sort_desc_keep_most_frequent(self, top_rank)
        return ", ".join([str(element) for element, _ in top_elements])


In [61]:
class NGramDuplicatesCleaner:
    def __init__(self, stop_words: Set[str] = None):
        if stop_words is None:
            stop_words = set()
        self.stop_words = stop_words
        self.multiset_words = Multiset()
        self.words_to_be_removed = set()

    def remove_duplicates(self, map_ngrams: Dict[str, int], max_grams: int, remove_single_terms: bool) -> Dict[str, int]:
        # Set the factor for removing irrelevant unigrams based on the number of n-grams
        if len(map_ngrams) < 500:
            factor_removing_irrelevant_unigrams = 2.0
            min_occurrences = 2
        else:
            factor_removing_irrelevant_unigrams = 1.5
            min_occurrences = 3 if len(map_ngrams) > 10_000 else 2

        # Remove terms that appear just once in the corpus
        map_ngrams = {k: v for k, v in map_ngrams.items() if v >= min_occurrences}

        # Iterate from max_grams to 1 to remove less frequent terms
        for i in range(max_grams - 1, 0, -1):
            it_freq_list = iter(map_ngrams.items())
            for entry in it_freq_list:
                curr_word = entry[0].strip()
                count_in = curr_word.count(' ')

                if count_in == i:
                    if i == 1:
                        terms_in_bigram = curr_word.split(" ")
                        term1, term2 = terms_in_bigram[0].strip(), terms_in_bigram[1].strip()

                        if term1 in self.stop_words and term2 in self.stop_words:
                            self.words_to_be_removed.add(curr_word)
                        if term1 in self.stop_words:
                            self.words_to_be_removed.add(term1)
                        if term2 in self.stop_words:
                            self.words_to_be_removed.add(term2)

                        count_term1 = map_ngrams.get(term1)
                        count_term2 = map_ngrams.get(term2)

                        if count_term1 is not None and count_term1 < entry[1] * factor_removing_irrelevant_unigrams:
                            self.words_to_be_removed.add(term1)
                        if count_term2 is not None and count_term2 < entry[1] * factor_removing_irrelevant_unigrams:
                            self.words_to_be_removed.add(term2)
                    else:
                        set_current_sub_ngrams = NGramFinder.ngrams_finder_just_a_given_length(i, curr_word).get_element_set()
                        for inner_ngram in set_current_sub_ngrams:
                            inner_ngram = inner_ngram.strip()
                            if inner_ngram in map_ngrams:
                                if map_ngrams[inner_ngram] < entry[1] * factor_removing_irrelevant_unigrams:
                                    first_term_outer_ngram = curr_word.split(" ")[0]
                                    if first_term_outer_ngram not in self.stop_words:
                                        self.words_to_be_removed.add(inner_ngram)

        # Add the terms that should remain to the multiset_words
        for curr_word, count in map_ngrams.items():
            if curr_word not in self.words_to_be_removed or not self.stop_words:
                self.multiset_words.add_several(curr_word.strip(), count)

        return self.multiset_words.get_internal_map()



In [62]:
class RepeatedCharactersRemover:

    @staticmethod
    def repeated_characters(curr_term: str, terms_that_should_not_be_modified: Set[str]) -> str:
        to_return = curr_term
        index = None
        set_rl = set()
        count = 1
        chars = list(curr_term)
        curr_char = ''
        previous_char = ''

        for i in range(len(chars)):
            curr_char = chars[i]
            if i > 0:
                previous_char = chars[i - 1]
            if previous_char == curr_char and RepeatedCharactersRemover.is_alphanumeric(previous_char):
                if index is None:
                    index = i - 1
                count += 1
            else:
                if count > 1:
                    set_rl.add((previous_char, index, count))
                    count = 1
                index = None
            if i == len(chars) - 1 and count > 1:
                set_rl.add((previous_char, index, count))

        for prev_char, idx, cnt in set_rl:
            letter = prev_char
            to_replace = letter * cnt
            if cnt > 2:
                replace_with = letter * 2
                subs = to_return.replace(to_replace, replace_with)
                if subs.lower() in terms_that_should_not_be_modified:
                    to_return = subs
                else:
                    replace_with = letter
                    subs = to_return.replace(to_replace, replace_with)
                    if subs.lower() in terms_that_should_not_be_modified:
                        to_return = subs

        return to_return

    @staticmethod
    def is_alphanumeric(s: str) -> bool:
        return s.isalnum()

In [63]:
class TextCleaningOps:

    @staticmethod
    def clean(status):
        if status is None:
            return ""
        status = status.replace("...", " ")
        status = status.replace(",", " ")
        status = status.replace("..", " ")
        status = re.sub(r'http[^ ]*', ' ', status)
        status = re.sub(r'http.*[\r|\n]*', ' ', status)
        status = re.sub(r' +', ' ', status)
        return status

    @staticmethod
    def remove_punctuation_signs(string):
        if string is None:
            return ""
        punctuation = "!?.@'’`+<>\"«»:-“”—+,|$;_/~&()[]{}#=*"
        for char in punctuation:
            string = string.replace(char, " ")
        return string.strip()

    @staticmethod
    def detach_camel_case_words_and_put_in_lower_case(string):
        if "LeMonde" in string or "PhD" in string:
            return string
        result = []
        for i, char in enumerate(string):
            if i > 0 and char.isupper() and not string[i - 1].isupper():
                result.append(' ')
            result.append(char)
        return ''.join(result)

    @staticmethod
    def remove_small_words(terms, less_or_equal_to_number):
        return {term: count for term, count in terms.items() if len(term) >= less_or_equal_to_number}

    @staticmethod
    def should_it_be_removed(string, less_or_equal_to_number):
        return len(string.strip()) < less_or_equal_to_number or bool(re.search(r'\d', string))

    @staticmethod
    def remove_urls(status):
        status = re.sub(r'http[^ ]*', ' ', status)
        status = re.sub(r'http.*[\r|\n]*', ' ', status)
        status = re.sub(r' +', ' ', status)
        return status

    @staticmethod
    def remove_start_and_final_apostrophs(string):
        string = string.replace("’", "'")
        if string.endswith("'s"):
            string = string[:-2]
        replacements = ["l'", "d'", "m'", "t'", "j'", "c'", "n'", "s'"]
        for rep in replacements:
            string = string.replace(rep, " ")
        return string.strip()

    @staticmethod
    def normalize_apostrophs(string):
        return string.replace("’", "'")

    @staticmethod
    def remove_terms_between_quotes(string):
        string = re.sub(r'"[^"]*"', ' ', string)
        string = re.sub(r'«[^»]*»', ' ', string)
        string = re.sub(r'“[^”]*”', ' ', string)
        return string.strip()

    @staticmethod
    def is_it_cleaned(status):
        return '"' not in status or status.count('"') % 2 == 0

    @staticmethod
    def remove_small_words_or_numeric(terms, max_letters):
        return {term: count for term, count in terms.items() if len(term) >= max_letters and not bool(re.search(r'\d', term))}

    @staticmethod
    def remove_numeric(string):
        return re.sub(r'\d', '', string)

    @staticmethod
    def remove_hashtags(status):
        return re.sub(r'#\p{L}+', '', status)

    @staticmethod
    def put_in_lower_case(input_string):
        return input_string.lower()

    @staticmethod
    def put_in_lower_case_map(map_of_lines):
        if map_of_lines is None:
            return {}
        return {k: (v.lower() if v is not None and not v.strip() == '' else '') for k, v in map_of_lines.items()}

    @staticmethod
    def remove_xml_escaped(input_string):
        if input_string is None:
            return input_string
        replacements = {
            "&gt;": " ",
            "&lt;": " ",
            "&amp;": " ",
            "&apos;": " ",
            "&quot;": " "
        }
        for old, new in replacements.items():
            input_string = input_string.replace(old, new)
        return input_string

    @staticmethod
    def remove_emojis_between_semi_colons(cleaned):
        pattern = re.compile(r':(.*?):')
        return pattern.sub(lambda m: ' ', cleaned)

    @staticmethod
    def remove_null_chars(string):
        return string.replace('\0', '').strip()

    @staticmethod
    def flatten_to_ascii(string):
        if not string or not string.strip():
            return string
        normalized = unicodedata.normalize('NFD', string)
        ascii_only = ''.join(c for c in normalized if ord(c) <= 0x7F)
        return ascii_only

    @staticmethod
    def flatten_to_ascii_and_remove_apostrophs(string):
        if not string or not string.strip():
            return string
        normalized = unicodedata.normalize('NFD', string)
        ascii_only = ''.join(c for c in normalized if ord(c) <= 0x7F and c not in ("'", "’"))
        return ascii_only

    @staticmethod
    def do_all_cleaning_ops(map_of_lines):
        cleaned_lines = {}
        if map_of_lines is None:
            return cleaned_lines
        for k, status in map_of_lines.items():
            if status is None or not status.strip():
                cleaned_lines[k] = ""
                continue
            status = TextCleaningOps.remove_urls(status)
            status = TextCleaningOps.normalize_apostrophs(status)
            status = TextCleaningOps.remove_null_chars(status)
            status = re.sub(r' +', ' ', status)
            status = TextCleaningOps.remove_punctuation_signs(status)
            status = TextCleaningOps.flatten_to_ascii(status)
            status = re.sub(r' +', ' ', status)
            cleaned_lines[k] = status
        return cleaned_lines

    @staticmethod
    def do_all_cleaning_ops_with_optional_ascii(map_of_lines, remove_non_ascii=False):
        cleaned_lines = {}
        if map_of_lines is None:
            return cleaned_lines
        for k, status in map_of_lines.items():
            if status is None or not status.strip():
                cleaned_lines[k] = ""
                continue
            status = TextCleaningOps.remove_urls(status)
            status = TextCleaningOps.normalize_apostrophs(status)
            status = TextCleaningOps.remove_null_chars(status)
            status = re.sub(r' +', ' ', status)
            status = TextCleaningOps.remove_punctuation_signs(status)
            if remove_non_ascii:
                status = TextCleaningOps.flatten_to_ascii(status)
            status = re.sub(r' +', ' ', status)
            cleaned_lines[k] = status
        return cleaned_lines

    @staticmethod
    def do_all_cleaning_ops_string(status):
        if status is None:
            return ""
        status = TextCleaningOps.remove_urls(status)
        status = TextCleaningOps.normalize_apostrophs(status)
        status = TextCleaningOps.remove_null_chars(status)
        status = re.sub(r' +', ' ', status)
        status = TextCleaningOps.remove_punctuation_signs(status)
        status = TextCleaningOps.flatten_to_ascii(status)
        status = re.sub(r' +', ' ', status)
        return status

    @staticmethod
    def do_all_cleaning_ops_set(lines):
        if lines is None:
            return set()
        map_of_lines = {i: line for i, line in enumerate(lines)}
        cleaned_map = TextCleaningOps.do_all_cleaning_ops(map_of_lines)
        return set(cleaned_map.values())


In [64]:
class CombinationGenerator:
    def __init__(self, n: int, r: int):
        if r > n:
            raise ValueError("r cannot be greater than n")
        if n < 1:
            raise ValueError("n must be at least 1")

        self.n = n
        self.r = r
        self.a = list(range(r))
        self.total = Decimal(factorial(n)) / (Decimal(factorial(r)) * Decimal(factorial(n - r)))
        self.num_left = self.total
        self.reset()

    def reset(self):
        self.a = list(range(self.r))
        self.num_left = Decimal(self.total)

    def get_num_left(self) -> Decimal:
        return self.num_left

    def has_more(self) -> bool:
        return self.num_left > 0

    def get_total(self) -> Decimal:
        return self.total

    def get_next(self) -> List[int]:
        if self.num_left == self.total:
            self.num_left -= 1
            return self.a

        i = self.r - 1
        while self.a[i] == self.n - self.r + i:
            i -= 1

        self.a[i] += 1
        for j in range(i + 1, self.r):
            self.a[j] = self.a[i] + j - i

        self.num_left -= 1
        return self.a


In [65]:
class PatternOfInterestChecker:
    def __init__(self):
        self.patterns_of_interest: Set[PatternOfInterest] = set()

    def load_patterns_of_interest(self, file_path: str):
        try:
            with open(file_path, mode='r', encoding='utf-8') as file:
                reader = csv.reader(file, delimiter='\t')
                next(reader)  # Skip header row
                for row in reader:
                    poi = PatternOfInterest()
                    poi.set_description(row[0])
                    poi.set_regex(row[1])
                    poi.set_should_apply_to_lower_case_text(row[2].lower() == 'true')

                    category_ids = row[3].split(',')
                    categories = [Category(cat_id) for cat_id in category_ids]
                    poi.set_categories(categories)

                    # Assuming type_of_text_fragment can be None or a valid string
                    poi.set_type_of_text_fragment(row[4] if row[4] else "stop")

                    self.patterns_of_interest.add(poi)
        except IOError:
            print("Error when loading patterns in tokenizer")

    def contains_percentage(self, text: str) -> Optional[str]:
        # Do we find a percentage?
        if re.search(r'\d%', text):
            # If so, is it followed by "off"?
            if re.search(r'\d% (off|cash back)', text, re.IGNORECASE):
                return "611"
            else:
                return "621"
        return None

    def returns_match_or_not(self, text: str) -> PatternOfInterest:
        for poi in self.patterns_of_interest:
            if poi.get_pattern() and poi.get_pattern().fullmatch(text):
                to_return = PatternOfInterest()
                to_return.set_categories(poi.get_categories())
                to_return.set_type_of_text_fragment_enum(poi.get_type_of_text_fragment_enum())
                to_return.set_description(poi.get_description())
                to_return.set_matched(True)
                return to_return

        poi = PatternOfInterest()
        poi.set_matched(False)
        return poi


In [66]:
class UmigonTokenizer:
    initialized = False
    poi_checker = None

    class CurrentFragment(Enum):
        WHITE_SPACE = 1
        PUNCTUATION = 2
        NON_WORD = 3
        TERM = 4
        NOT_STARTED = 5

    @staticmethod
    def initialize(file_path: Optional[str] = None):
        if not UmigonTokenizer.initialized:
            UmigonTokenizer.poi_checker = PatternOfInterestChecker()
            if file_path:
                UmigonTokenizer.poi_checker.load_patterns_of_interest(file_path)
            UmigonTokenizer.initialized = True

    @staticmethod
    def tokenize(text: str, language_specific_lexicon: Optional[Set[str]] = None) -> List['TextFragment']:
        if not UmigonTokenizer.initialized:
            UmigonTokenizer.initialize()

        text_fragments = []
        if language_specific_lexicon is None:
            language_specific_lexicon = set()

        text_fragment_started = False
        dash_like_characters = {"-", "‐", "‑", "‒", "–", "—", "︱", "﹘", "﹣", "－", "_", "\\", "/", "|"}

        curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED

        white_space = None
        term = None
        punctuation = None
        non_word = None
        emoji = None

        code_points = [ord(c) for c in text]

        for index, code_point in enumerate(code_points):
            char = chr(code_point)

            is_white_space = char.isspace()
            is_emoji = char in EMOJI_DATA
            is_punctuation = bool(re.match(r"[^\w\s]", char))

            if curr_fragment == UmigonTokenizer.CurrentFragment.WHITE_SPACE:
                if is_white_space:
                    white_space.add_string_to_original_form(char)
                    if char == "\n":
                        white_space.set_sentence_or_line_break(True)
                else:
                    text_fragments.append(white_space)
                    text_fragment_started = False
                    curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED

            elif curr_fragment == UmigonTokenizer.CurrentFragment.TERM:
                if is_white_space or is_emoji or is_punctuation:
                    UmigonTokenizer.process_term(term, language_specific_lexicon, text_fragments)
                    text_fragment_started = False
                    curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED
                else:
                    term.add_string_to_original_form(char)

            elif curr_fragment == UmigonTokenizer.CurrentFragment.NON_WORD:
                if is_white_space or is_emoji:
                    text_fragments.append(non_word)
                    text_fragment_started = False
                    curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED
                    non_word = NonWord()
                else:
                    curr_non_word = non_word.get_original_form() + char
                    if UmigonTokenizer.poi_checker.returns_match_or_not(curr_non_word).matched:
                        non_word.add_string_to_original_form(char)
                    else:
                        text_fragments.append(non_word)
                        text_fragment_started = False
                        curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED
                        non_word = NonWord()

            elif curr_fragment == UmigonTokenizer.CurrentFragment.PUNCTUATION:
                if is_punctuation:
                    punctuation.add_string_to_original_form(char)
                else:
                    pattern_of_interest = UmigonTokenizer.poi_checker.returns_match_or_not(punctuation.get_original_form())
                    if pattern_of_interest.matched:
                        non_word = punctuation.to_non_word(pattern_of_interest, punctuation.get_original_form())
                        text_fragments.append(non_word)
                    else:
                        text_fragments.append(punctuation)
                    text_fragment_started = False
                    curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED

            if not text_fragment_started:
                if is_white_space:
                    text_fragment_started = True
                    white_space = WhiteSpace()
                    white_space.index_cardinal = index
                    white_space.index_ordinal = len(text_fragments)
                    white_space.add_string_to_original_form(char)
                    if char == "\n":
                        white_space.set_sentence_or_line_break(True)
                    curr_fragment = UmigonTokenizer.CurrentFragment.WHITE_SPACE

                elif is_emoji:
                    text_fragment_started = True
                    emoji = Emoji()
                    emoji.index_cardinal = index
                    emoji.index_ordinal = len(text_fragments)
                    emoji.add_string_to_original_form(char)
                    text_fragments.append(emoji)
                    curr_fragment = UmigonTokenizer.CurrentFragment.NOT_STARTED

                elif is_punctuation:
                    text_fragment_started = True
                    punctuation = Punctuation()
                    punctuation.index_cardinal = index
                    punctuation.index_ordinal = len(text_fragments)
                    punctuation.add_string_to_original_form(char)
                    curr_fragment = UmigonTokenizer.CurrentFragment.PUNCTUATION

                elif not is_white_space and not is_punctuation:
                    text_fragment_started = True
                    term = Term()
                    term.index_cardinal = index
                    term.index_ordinal = len(text_fragments)
                    term.add_string_to_original_form(char)
                    curr_fragment = UmigonTokenizer.CurrentFragment.TERM

                elif not is_punctuation and is_white_space:
                    non_word = NonWord()
                    non_word.index_cardinal = index
                    non_word.index_ordinal = len(text_fragments)
                    non_word.add_string_to_original_form(char)
                    curr_fragment = UmigonTokenizer.CurrentFragment.NON_WORD

        # Process remaining fragment
        if curr_fragment == UmigonTokenizer.CurrentFragment.TERM:
            UmigonTokenizer.process_term(term, language_specific_lexicon, text_fragments)

        elif curr_fragment == UmigonTokenizer.CurrentFragment.WHITE_SPACE:
            text_fragments.append(white_space)

        elif curr_fragment == UmigonTokenizer.CurrentFragment.PUNCTUATION:
            pattern_of_interest = UmigonTokenizer.poi_checker.returns_match_or_not(punctuation.get_original_form())
            if pattern_of_interest.matched:
                non_word = punctuation.to_non_word(pattern_of_interest, punctuation.get_original_form())
                text_fragments.append(non_word)
            else:
                text_fragments.append(punctuation)

        elif curr_fragment == UmigonTokenizer.CurrentFragment.NON_WORD:
            text_fragments.append(non_word)

        return text_fragments

    @staticmethod
    def process_term(term, language_specific_lexicon, text_fragments):
        original_form = term.get_original_form()
        cleaned_form = RepeatedCharactersRemover.repeated_characters(original_form, language_specific_lexicon)
        cleaned_form = cleaned_form.replace("[’ʼ]", "'")
        cleaned_and_stripped_form = TextCleaningOps.flatten_to_ascii(cleaned_form)
        term.cleaned_form = cleaned_form
        term.cleaned_and_stripped_form = cleaned_and_stripped_form

        pattern_of_interest = UmigonTokenizer.poi_checker.returns_match_or_not(term.cleaned_and_stripped_form)
        if pattern_of_interest.matched:
            non_word = NonWord()
            non_word.index_cardinal = term.index_cardinal
            non_word.index_ordinal = term.index_ordinal
            non_word.original_form = term.original_form
            non_word.type_of_text_fragment_enum = pattern_of_interest.type_of_text_fragment_enum
            non_word.poi = pattern_of_interest
            text_fragments.append(non_word)
        else:
            text_fragments.append(term)


In [67]:
class DataManager:
    def __init__(self):
        self.original_strings_per_line: Dict[int, str] = {}
        self.text_fragments_per_line: Dict[int, List[TextFragment]] = defaultdict(list)
        self.cleaned_and_stripped_ngrams_per_line: Dict[int, Set[str]] = defaultdict(set)
        self.list_of_n_grams_global: List[NGram] = []
        self.n_grams_and_global_count: Dict[NGram, int] = defaultdict(int)
        self.stringified_cleaned_and_stripped_ngram_to_lemmatized_form: Dict[str, str] = {}
        self.mapping_non_lemmatized_form_to_ngram: Dict[str, NGram] = {}
        self.mapping_lemmatized_form_to_ngram: Dict[str, NGram] = {}

    # Getters and Setters
    def get_original_strings_per_line(self) -> Dict[int, str]:
        return self.original_strings_per_line

    def set_map_of_lines(self, original_strings_per_line: Dict[int, str]):
        self.original_strings_per_line = original_strings_per_line

    def get_text_fragments_per_line(self) -> Dict[int, List[TextFragment]]:
        return self.text_fragments_per_line

    def set_text_fragments_per_line(self, text_fragments_per_line: Dict[int, List[TextFragment]]):
        self.text_fragments_per_line = text_fragments_per_line

    def get_cleaned_and_stripped_ngrams_per_line(self) -> Dict[int, Set[str]]:
        return self.cleaned_and_stripped_ngrams_per_line

    def set_cleaned_and_stripped_ngrams_per_line(self, cleaned_and_stripped_ngrams_per_line: Dict[int, Set[str]]):
        self.cleaned_and_stripped_ngrams_per_line = cleaned_and_stripped_ngrams_per_line

    def get_list_of_n_grams_global(self) -> List[NGram]:
        return self.list_of_n_grams_global

    def set_list_of_n_grams_global(self, list_of_n_grams_global: List[NGram]):
        self.list_of_n_grams_global = list_of_n_grams_global

    def get_stringified_cleaned_and_stripped_ngram_to_lemmatized_form(self) -> Dict[str, str]:
        return self.stringified_cleaned_and_stripped_ngram_to_lemmatized_form

    def set_stringified_cleaned_and_stripped_ngram_to_lemmatized_form(self, stringified_cleaned_and_stripped_ngram_to_lemmatized_form: Dict[str, str]):
        self.stringified_cleaned_and_stripped_ngram_to_lemmatized_form = stringified_cleaned_and_stripped_ngram_to_lemmatized_form

    def get_mapping_non_lemmatized_form_to_ngram(self) -> Dict[str, NGram]:
        return self.mapping_non_lemmatized_form_to_ngram

    def set_mapping_non_lemmatized_form_to_ngram(self, mapping_non_lemmatized_form_to_ngram: Dict[str, NGram]):
        self.mapping_non_lemmatized_form_to_ngram = mapping_non_lemmatized_form_to_ngram

    def get_mapping_lemmatized_form_to_ngram(self) -> Dict[str, NGram]:
        return self.mapping_lemmatized_form_to_ngram

    def set_mapping_lemmatized_form_to_ngram(self, mapping_lemmatized_form_to_ngram: Dict[str, NGram]):
        self.mapping_lemmatized_form_to_ngram = mapping_lemmatized_form_to_ngram

    def get_n_grams_and_global_count(self) -> Dict[NGram, int]:
        return self.n_grams_and_global_count

    def set_n_grams_and_global_count(self, n_grams_and_global_count: Dict[NGram, int]):
        self.n_grams_and_global_count = n_grams_and_global_count


In [68]:
class Cooc:
    def __init__(self, a: NGram = None, b: NGram = None):
        if a is not None and b is not None:
            if a.get_original_form_lemmatized().lower() > b.get_original_form_lemmatized().lower():
                self.a = a
                self.b = b
            else:
                self.a = b
                self.b = a
        else:
            self.a = a
            self.b = b

    def get_a(self) -> NGram:
        return self.a

    def set_a(self, a: NGram):
        self.a = a

    def get_b(self) -> NGram:
        return self.b

    def set_b(self, b: NGram):
        self.b = b

    def __hash__(self) -> int:
        hash_first = hash(self.a.get_original_form_lemmatized())
        hash_second = hash(self.b.get_original_form_lemmatized())
        max_hash = max(hash_first, hash_second)
        min_hash = min(hash_first, hash_second)
        return min_hash * 31 + max_hash

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, Cooc):
            return False
        pair_o = other
        return ((self.a.get_original_form_lemmatized().lower() == pair_o.a.get_original_form_lemmatized().lower()
                 and self.b.get_original_form_lemmatized().lower() == pair_o.b.get_original_form_lemmatized().lower())
                or (self.a.get_original_form_lemmatized().lower() == pair_o.b.get_original_form_lemmatized().lower()
                    and self.b.get_original_form_lemmatized().lower() == pair_o.a.get_original_form_lemmatized().lower()))

    def __str__(self) -> str:
        return f"{self.a.get_original_form_lemmatized()}{{--}}{self.b.get_original_form_lemmatized()}"


In [69]:
class PerformCombinationsOnNGrams:
    def __init__(self, table: List[NGram]):
        self.table = table

    def call(self) -> List[Cooc]:
        list_of_coocs = []

        # Find all pairs (2) of the n-grams
        x = CombinationGenerator(len(self.table), 2)

        while x.has_more():
            indices = x.get_next()
            cooc = Cooc(self.table[indices[0]], self.table[indices[1]])
            list_of_coocs.append(cooc)

        return list_of_coocs


In [70]:
class LemmatizerInterface:
    def lemmatize_term(self, term: str) -> str:
        raise NotImplementedError("Subclasses should implement this method")

In [71]:
class BritishAmericanMerger:
    dont_merge = {"our", "flour", "four", "hour", "pour", "sour", "tour"}

    @staticmethod
    def merge_to_american_english(input_string):
        if input_string in BritishAmericanMerger.dont_merge:
            return input_string

        if input_string.endswith("our") and not any(input_string.endswith(suffix) for suffix in ["tour", "hour", "pour"]):
            return input_string[:-3] + "or"
        else:
            return input_string


In [72]:

class LemmatizerEN(LemmatizerInterface):

    def __init__(self, merge_to_american_english):
        self.merge_to_american_english = merge_to_american_english

    def dont_merge_to_american_english(self):
        self.merge_to_american_english = False

    def lemmatize_term(self, term):
        if (term.endswith("s") or term.endswith("s'")) and not (
            term.endswith("us") or
            term.endswith("as") or
            term.endswith("ss") or
            term.endswith("sses") or
            term.endswith("ies") or
            term.endswith("is")
        ):
            if term.endswith("s"):
                term = term[:-1]
            if term.endswith("s'"):
                term = term[:-2]
        elif term.endswith("'"):
            term = term[:-1]

        if term.endswith("sses"):
            term = term[:-2]
        if term.endswith("ies"):
            if not term.endswith("movies"):
                term = term[:-3] + "y"
            else:
                term = term[:-1]
        elif term.endswith("'s"):
            term = term[:-2]
        elif term.endswith("ed"):
            if (
                term.endswith("rred") or
                term.endswith("mmed")
            ):
                term = term[:-3]
            elif (
                term.endswith("lked") or
                term.endswith("cked") or
                term.endswith("pted") or
                term.endswith("ssed") or
                term.endswith("lled") or
                term.endswith("iased") or
                (term.endswith("red") and not (term.endswith("ired") or term.endswith("ured") or term == "clustered" or term.endswith("ared"))) or
                (term.endswith("med") and not (term.endswith("framed") or term.endswith("lamed") or term.endswith("named") or term.endswith("shamed"))) or
                term.endswith("aired") or
                term.endswith("used") or
                term.endswith("ned") or
                (term.endswith("ded") and not term.endswith("ided")) or
                (term.endswith("ted") and not term.endswith("ated"))
            ):
                term = term[:-2]
            elif term.endswith("ied"):
                term = term[:-3] + "y"
            elif term.endswith("eed"):
                pass  # do nothing (as in: exceed, proceed)
            else:
                term = term[:-1]  # purchased -> purchase
        elif term.endswith("ing"):
            if term.endswith("king"):
                term = term[:-3] + "e"
            elif term.endswith("ging") and not term.endswith("gging"):
                term = term[:-3] + "e"
            elif (
                term.endswith("sing") or
                term.endswith("zing") or
                term.endswith("cing") or
                (term.endswith("oding") and not term.endswith("ooding")) or
                (term.endswith("ting") and not (term.endswith("sting") or term.endswith("tting") or term.endswith("nting") or term.endswith("cting"))) or
                (term.endswith("ming") and (term.endswith("framing") or not (term.endswith("laming") or term.endswith("naming") or term.endswith("shaming")))) or
                term.endswith("ving") or
                term.endswith("ring") and not term.endswith("during")
            ):
                term = term[:-3] + "e"
            elif len(term) > 2:
                term = term[:-3]
                # running has become runn. Should become run.
                size = len(term)
                if size > 1:
                    last_two_letters = term[-2:]
                    if last_two_letters[0] == last_two_letters[1]:
                        term = term[:-1]
                    size = len(term)
                    if size > 1:
                        # voting has become vot. Should become vote.
                        # Same for any word ending in at or ot (not plant, suspect, ...).
                        # Yet, it will miscorrect "pivot" and "format"
                        last_two_letters = term[-2:]
                        if term not in ["pivot", "format"]:
                            if last_two_letters in ["at", "ot", "id"]:
                                term = term + "e"
        elif term.endswith("ier"):
            term = term[:-3] + "y"

        if self.merge_to_american_english:
            term = BritishAmericanMerger.merge_to_american_english(term)

        return term


In [73]:
class Lemmatizer:
    def __init__(self, lang: str):
        self.no_lemma_en = {
            "access", "accumbens", "addresses", "afterwards", "always", "amazing",
            "approaches", "analyses", "biases", "businesses", "ceiling", "classes",
            "crises", "daunting", "discusses", "during", "economics", "elsevier",
            "ethics", "focuses", "fries", "goes", "humanities", "hundred",
            "hypotheses", "inches", "king", "lens", "linguistics", "lies",
            "losses", "marketing", "morning", "news", "outlier", "outstanding",
            "physics", "politics", "premises", "processes", "red", "rigged", "ries",
            "series", "sometimes", "something", "species", "spring", "status",
            "ted", "themselves", "neural processes", "united", "wales", "witnesses"
        }

        self.no_lemma_fr = {
            "accès", "alors", "alpes", "ailleurs", "apres", "après", "aupres", "auprès",
            "Calvados", "concours", "corps", "cours", "dans", "discours", "divers", "etes",
            "êtes", "ethos", "éthos", "gens", "gros", "lors", "outils", "pays", "parcours",
            "pres", "près", "proces", "procès", "propos", "puis", "sans", "secours", "sens",
            "sommes", "succès", "succes", "temps", "toujours", "travers", "très", "tres",
            "univers", "viens", "vos"
        }

        self.no_lemma_es = {
            "revés", "atrás", "país", "gafas", "años", "adiós", "peces", "tres", "azulgris",
            "compás", "menos", "mes", "tijeras", "avis", "anís", "vals", "compás", "alas",
            "análisis", "oasis", "paréntesis", "estrés", "colchones", "espejuelos", "martes",
            "lunes", "miércoles", "jueves", "viernes", "calcetines", "álbumes", "nueces", "veces",
            "coches", "alfileres", "lazos", "pistaches", "pañales", "prismas", "bolsos", "panes",
            "alfileres", "golpes", "jardines", "manos", "ojos", "dedos", "radios"
        }

        self.no_lemma = {
            "analytics", "accumbens", "aws", "bayes", "business", "charles", "ects", "cnrs",
            "cosmos", "cowles", "deep learning", "developer", "ethos", "faas", "forbes",
            "iaas", "james", "keynes", "koopmans", "nhs", "paas", "paris", "programming",
            "reactjs", "saas", "siemens", "sanders", "ted", "virus", "vuejs", "united states"
        }

        self.no_lemma_set = set(self.no_lemma)

        self.merge_to_american_english = True
        self.lemmatizer_interface = None

        if lang == "en":
            self.no_lemma_set = set(self.no_lemma_en)
            self.lemmatizer_interface = LemmatizerEN(self.merge_to_american_english)
        elif lang == "fr":
            self.no_lemma_set = set(self.no_lemma_fr)
            self.lemmatizer_interface = LemmatizerFR()
        elif lang == "es":
            self.no_lemma_set = set(self.no_lemma_es)
            self.lemmatizer_interface = LemmatizerES()
        else:
            self.no_lemma_set = set(self.no_lemma_en)
            self.lemmatizer_interface = LemmatizerEN(self.merge_to_american_english)

    def dont_merge_to_american_english(self):
        self.merge_to_american_english = False
        if isinstance(self.lemmatizer_interface, LemmatizerEN):
            self.lemmatizer_interface.dont_merge_to_american_english()

    def lemmatize(self, term: str) -> str:
        if term in self.no_lemma_set:
            return term
        term = self.lemmatizer_interface.lemmatize_term(term)

        if term.endswith("'"):
            term = term[:-1]
        return term.strip()

    def sentence_lemmatizer(self, sentence: str) -> str:
        terms = sentence.split()
        return " ".join(self.lemmatizer_interface.lemmatize_term(term) for term in terms).strip()

In [74]:
class CurvedEdge(FancyArrowPatch):
    def __init__(self, p1, p2, rad=0.2, **kwargs):
        super().__init__(p1, p2, connectionstyle=f"arc3,rad={rad}", **kwargs)

    @staticmethod
    def interpolate_color(color1, color2, alpha):
        """Interpolate between two colors with a given alpha."""
        try:
            c1 = np.array(to_rgba(color1))
            c2 = np.array(to_rgba(color2))
            interpolated_color = c1 * (1 - alpha) + c2 * alpha
            return tuple(interpolated_color)
        except Exception as e:
            print(f"Error in color interpolation: {e}")
            return to_rgba(color1)  # Fallback to color1 if there's an error

    @staticmethod
    def get_cluster_colors(seed, num_clusters):
        """Generate a deterministic list of colors for clusters based on a seed."""
        np.random.seed(seed)
        cmap = plt.get_cmap('tab10')
        num_colors = cmap.N
        # Generate a list of colors, cycling through the colormap if needed
        colors = [cmap(i % num_colors) for i in range(num_clusters)]
        return colors

    @staticmethod
    def draw_curved_edges(G, pos, ax, rad=0.2, edge_colors=None, linewidth=1):
        if edge_colors is None:
            edge_colors = ['black'] * len(G.edges())

        for (u, v), color in zip(G.edges(), edge_colors):
            p1, p2 = pos[u], pos[v]
            arrow = CurvedEdge(p1, p2, rad=rad, color=color, linewidth=linewidth)
            ax.add_patch(arrow)

    @staticmethod
    def visualize_graph_to_variable(G, seed):
        pos = nx.spring_layout(G, seed=seed)

        # Use 'size' attribute to set node sizes, scaled up for visibility
        node_sizes = [G.nodes[node]['weights']['countTerms'] * 10 for node in G.nodes()]  # Increased scaling factor

        # Define a color map for clusters with seeding
        num_clusters = len(set(nx.get_node_attributes(G, 'cluster').values()))
        cluster_colors = CurvedEdge.get_cluster_colors(seed=42, num_clusters=num_clusters)  # Seed for reproducibility
        cluster_ids = set(nx.get_node_attributes(G, 'cluster').values())
        cluster_color_map = {cluster_id: cluster_colors[i % len(cluster_colors)] for i, cluster_id in enumerate(cluster_ids)}

        # Get node colors based on cluster IDs
        node_colors = [cluster_color_map[G.nodes[node]['cluster']] for node in G.nodes()]

        # Define edge colors based on clusters of source and target nodes
        edge_colors = []
        for u, v in G.edges():
            cluster_u = G.nodes[u]['cluster']
            cluster_v = G.nodes[v]['cluster']
            if cluster_u != cluster_v:
                # Create gradient color between two clusters
                color_u = cluster_color_map[cluster_u]
                color_v = cluster_color_map[cluster_v]
                edge_color = CurvedEdge.interpolate_color(color_u, color_v, alpha=0.5)
            else:
                edge_color = cluster_color_map[cluster_u]
            edge_colors.append(edge_color)

        fig, ax = plt.subplots(figsize=(12, 12))  # Increased figure size for better clarity

        # Adjust margins
        plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1)

        # Draw nodes
        nx.draw_networkx_nodes(G, pos, ax=ax, node_size=node_sizes, node_color=node_colors, edgecolors='k', alpha=0.6)

        # Draw labels (ensure labels are visible)
        labels = nx.get_node_attributes(G, 'label')
        nx.draw_networkx_labels(G, pos, labels=labels, ax=ax, font_size=10, font_color='black', bbox=dict(facecolor='white', alpha=0.2, edgecolor='none', pad=1))

        # Draw curved edges with color based on cluster IDs
        CurvedEdge.draw_curved_edges(G, pos, ax, rad=0.3, edge_colors=edge_colors, linewidth=2)

        plt.title("Network Visualization", fontsize=15)
        plt.axis('off')  # Turn off the axis
        return fig  # Return the figure object

    @staticmethod
    def create_graph_from_json(json_data):
        # Ensure json_data is a dictionary
        if isinstance(json_data, str):
            json_data = json.loads(json_data)

        G = nx.Graph()

        # Extract nodes and add cluster and size attributes
        for item in json_data['network']['items']:
            G.add_node(item['id'],
                      label=item['label'],
                      cluster=item['cluster'],
                      x=item['x'],
                      y=item['y'],
                      weights=item['weights'],
                      scores=item['scores'])

        # Extract edges
        for link in json_data['network']['links']:
            G.add_edge(link['source_id'], link['target_id'], weight=link['strength'])

        return G


In [75]:
def calculate_node_sizes(G):
    """Calculate node sizes based on the sum of link strengths."""
    node_sizes = {}
    for node in G.nodes():
        total_strength = sum(data['weight'] for _, _, data in G.edges(node, data=True))
        node_sizes[node] = total_strength
    return node_sizes

def apply_mds(similarity_matrix):
    """Apply MDS to reduce similarity matrix to 2D."""
    mds = MDS(n_components=2, dissimilarity='precomputed', random_state=42, normalized_stress='auto')
    dissimilarity_matrix = np.max(similarity_matrix) - similarity_matrix
    pos = mds.fit_transform(dissimilarity_matrix)
    return pos

def create_similarity_matrix(G):
    """Create a similarity matrix from the graph based on edge weights."""
    num_nodes = len(G.nodes())
    similarity_matrix = np.zeros((num_nodes, num_nodes))

    # Node index mapping
    node_index = {node: idx for idx, node in enumerate(G.nodes())}

    for (u, v, data) in G.edges(data=True):
        idx_u = node_index[u]
        idx_v = node_index[v]
        similarity_matrix[idx_u, idx_v] = data['weight']
        similarity_matrix[idx_v, idx_u] = data['weight']  # Symmetric

    return similarity_matrix, node_index
