# SpectraSync: Neural Intelligence Meets Multi-Dimensional Topic Analysis

This notebook performs document preprocessing for the SpectraSync: Neural Intelligence Meets Multi-Dimensional Topic Analysis pipeline, ensuring documents are prepared for seamless ingestion by `specrasync.py` and related scripts. The SpectraSync pipeline integrates topic modeling, diachronic analysis, and visualization, allowing for adaptable and detailed analysis across diverse textual corpora. This notebook ensures document standardization, facilitating compatibility with SpectraSync's topic modeling and evaluation stages.

### Notebook Objectives
- **Load and Transform Documents**: Imports and structures text from HTML and JSON files, preparing them for SpectraSync's topic analysis.
- **Data Cleansing**: Standardizes text by removing curly quotes, non-printable characters, and other inconsistencies.
- **Content Structuring**: Extracts text using BeautifulSoup and regex, arranging it into paragraphs and sentences.
- **Output Preparation**: Produces a processed corpus in a format optimized for `SpectraSync.py` ingestion, supporting downstream analysis and visualization.

### Dependencies and Related Components
This notebook is designed to work alongside the following scripts:
- **`SpectraSync.py`**: Coordinates topic modeling, diachronic analysis, and evaluation.
- **`alpha_eta.py`**: Supports hyperparameter tuning for model optimization.
- **`process_futures.py`**: Manages asynchronous processing, essential for handling large datasets efficiently.
- **`topic_model_trainer.py`**: Defines and trains the topic models used in SpectraSync.
- **`visualization.py`**: Generates visualizations for model insights and evaluation.
- **`write_to_postgres.py`**: Facilitates data persistence into PostgreSQL, supporting structured data retrieval.
- **`utils.py`**: Provides utility functions to enhance efficiency and consistency across the SpectraSync pipeline.

### Workflow Overview
1. **Document Loading**: Reads HTML or JSON files, detecting encoding where necessary.
2. **Content Extraction**: Extracts structured content, normalizing punctuation and replacing curly quotes for text consistency.
3. **Data Output**: Saves the processed content in a structured format for direct use in `spectrasync.py`.

Running this notebook prepares a clean, standardized corpus compatible with the SpectraSync pipeline, optimizing input quality for topic modeling and diachronic analysis.


In [None]:
# Standard Library Imports
import os            # Provides functions for interacting with the operating system, e.g., file handling.
import re            # Supports regular expressions for text manipulation and pattern matching.
import csv           # Facilitates reading from and writing to CSV files.
import json          # Enables reading from and writing to JSON files, often used for structured data.
from time import time  # Allows timing operations for performance measurement.
import logging       # Provides logging functionalities to monitor code execution.
import codecs        # Handles different text encodings, important for text data processing.
import multiprocessing  # Supports parallel processing, enhancing performance on large datasets.
import pprint as pp  # Pretty-prints data structures, useful for debugging and visualization.
from hashlib import md5  # Provides MD5 hashing, useful for generating unique IDs or checking data integrity.

# Encoding and Parsing Imports
import chardet        # Detects character encoding of text files, allowing for accurate reading of various encodings.
import unicodedata    # Handles Unicode character information, useful for identifying and removing non-printable characters.
from bs4 import BeautifulSoup  # Parses HTML content, enabling extraction of specific tags (e.g., <p> tags) for processing.


# NLTK Imports
import nltk                             # Natural Language Toolkit, a suite for text processing.
from nltk.corpus import stopwords       # Provides lists of stop words to remove from text.
from nltk.corpus.reader.api import CorpusReader  # Base class for reading and structuring corpora.
from nltk import sent_tokenize, pos_tag, wordpunct_tokenize  # Tokenizers and POS tagger for sentence processing.
stop_words = stopwords.words('english')  # Initializing English stop words list for filtering out common words.

# Gensim Imports
import gensim                           # Library for topic modeling and word vector creation.
from gensim.models import Word2Vec, ldamulticore  # Word2Vec for word embeddings, ldamulticore for topic modeling.
from gensim.models.phrases import Phrases, Phraser  # Constructs multi-word phrases (e.g., bigrams) from tokenized text.
import gensim.corpora as corpora        # Handles creation of dictionaries and corpora from text data.
from gensim.utils import simple_preprocess  # Preprocesses text into a list of tokens.

# SpaCy Import (specific model)
import en_core_web_lg                   # SpaCy's large English NLP model for advanced text processing.
nlp = en_core_web_lg.load(disable=['parser','ner'])  # Loads model, with parsing and named entity recognition disabled.

# Readability Import
from readability.readability import Unparseable  # Exception handling for parsing errors in HTML.
from readability.readability import Document as Paper  # Extracts readable content from HTML, discarding noise.

# BeautifulSoup and HTML5lib
import bs4                                # BeautifulSoup for parsing HTML and XML documents.
import html5lib                           # Parser for HTML5, used by BeautifulSoup for web scraping.

# Data Processing and Scientific Libraries
import numpy as np                        # Supports efficient numerical operations on large arrays and matrices.
import pandas as pd                       # Data analysis library for handling structured data (e.g., DataFrames).
from sklearn.manifold import TSNE         # Dimensionality reduction for visualizing high-dimensional data.
from matplotlib import pyplot as plt      # Plotting library for creating visualizations.
from tqdm import tqdm                     # Adds progress bars to loops, useful for monitoring lengthy operations.

import unicodedata
import re

In [None]:
# Load custom stop words based on document context
with open('config/custom_stopwords.json', 'r') as file:
    custom_stopwords = json.load(file)

# Add CDC MMWR-specific stop words, or any additional terms the user finds appropriate
# This approach enables the user to include context-specific stop words, not limited to MMWR.
# Users can add any terms they consider irrelevant or noise within the custom_stopwords.json file, 
# making this process adaptable to various document types or specific text analysis needs.
stop_words.extend(custom_stopwords.get("cdc_mmwr", []))

In [None]:
DOC_ID = r'.*[\d\w\-.]+\.(html|json)$'  # Regular expression pattern to identify document filenames ending in .html or .json.

TAGS = ['p']  # List of HTML tags to extract content from; 'p' is commonly used to denote paragraphs in HTML.

# Set flag to determine if lemmatization (reducing words to their base form) will be performed during preprocessing.
LEMMATIZATION = True

In [None]:

class DocumentParser(CorpusReader):
    
    def __init__(self, root, tags=TAGS, fileids=DOC_ID, **kwargs):
        """
        Initializes the HTMLReader with the root directory, specified tags, and file identifiers.

        Parameters:
            root (str): The root directory containing the corpus.
            tags (list): List of HTML tags to extract (e.g., 'p' for paragraphs).
            fileids (str): Regular expression pattern to match file names.
            **kwargs: Additional keyword arguments.
        """
        CorpusReader.__init__(self, root, fileids)
        self.tags = tags

    def resolve(self, fileids=None):
        """
        Resolves the file identifiers, returning them as-is.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.

        Returns:
            str or None: The resolved file identifiers.
        """
        return fileids 

    def detect_encoding(self, file_path):
        """
        Detects the character encoding of a file.

        Parameters:
            file_path (str): Path to the file whose encoding needs to be detected.

        Returns:
            str: The detected encoding of the file.
        """
        with open(file_path, 'rb') as f:
            raw_data = f.read()
        return chardet.detect(raw_data)['encoding']
    

    def process_content(self, content, non_html_log_file):
        """
        Processes HTML content by extracting text within <p> tags, allowing common punctuation and whitespace,
        and removing control characters except newlines and tabs. Logs any remaining control characters for review.

        Parameters:
            content (str): The raw HTML content to process.
            non_html_log_file (file): File handle to log non-HTML content or paragraphs with control characters.

        Yields:
            str or None: Cleaned text extracted from <p> tags, or None if an error occurs.
        """
        # Normalize Unicode to a standard form (NFKC) to handle variations in character representations
        content = unicodedata.normalize('NFKC', content.replace("\\n", "\n").strip())

        try:
            # Parse the HTML content and find all <p> elements
            soup = BeautifulSoup(content, 'html.parser')
            paragraphs = soup.find_all('p')

            for p in paragraphs:
                # Extract text from each <p> element
                text = p.get_text()

                # Remove all control characters except newline (U+000A) and tab (U+0009)
                cleaned_text = re.sub(r'[^\x09\x0A\x20-\x7E\x80-\uFFFF]', '', text)
                
                # Identify and log any remaining control characters (excluding newline and tab)
                control_chars = [
                    (char, f"U+{ord(char):04X}", unicodedata.name(char, "UNKNOWN"))
                    for char in text if ord(char) < 32 and char not in '\n\t'
                ]

                # If control characters are found, log their details for review
                if control_chars:
                    char_details = "; ".join([f"{c} ({code}: {name})" for c, code, name in control_chars])
                    non_html_log_file.write(f"Control characters found in paragraph: {char_details}\n")
                    
                # Yield the cleaned text for further processing
                yield cleaned_text
        except Exception as e:
            # Log any parsing errors and yield None to indicate a failure in processing
            non_html_log_file.write(f"Error processing as HTML: {str(e)}\n")
            yield None


    def docs(self, fileids=None, pattern=None, non_html_log_file=None):
        """
        Iterates over the documents in the corpus, yielding processed content.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.
            pattern (str or None): Regular expression pattern to filter file names.
            non_html_log_file (file or None): File handle to log non-HTML content.

        Yields:
            str: Processed content from HTML or JSON files.
        """        
        fileids = self.resolve(fileids)
        
        # Compile the regular expression pattern, or use a default if none is provided
        if pattern is not None:
            regex = re.compile(pattern, re.IGNORECASE)
        else:
            regex = re.compile(r'.*([\d\w\-.]+)\.(html|json)$', re.IGNORECASE)
            
        for path, encoding in self.abspaths(fileids, include_encoding=True):
            if regex.search(path):
                encoding = self.detect_encoding(path)
                
                # Check if the file is JSON by extension
                if path.lower().endswith('.json'):
                    with codecs.open(path, 'r', encoding=encoding) as f:
                        try:
                            # Load JSON content
                            data = json.load(f)
                            # Process each HTML snippet in JSON directly for <p> tags
                            if isinstance(data, list):
                                for item in data:
                                    if isinstance(item, str):
                                        # Process each content item with HTML or plain text
                                        for content in self.process_content(item, non_html_log_file):
                                            if content:
                                                yield self.replace_curly_quotes(content)
                            else:
                                print(f"Error: {path} does not contain a list of HTML strings.")
                        except json.JSONDecodeError:
                            print(f"Error: {path} is not a valid JSON file.")
                
                elif path.lower().endswith('.html'):
                    # Process as regular HTML file
                    with open(path, 'r', encoding='utf-8', errors='replace') as f:
                        doc_content = f.read()
                        # Process as HTML or plain text
                        for content in self.process_content(doc_content, non_html_log_file):
                            if content:
                                yield self.replace_curly_quotes(content)
                else:
                    print(f"Unsupported file type: {path}. Only JSON and HTML files are allowed.")
                    continue

    def html(self, fileids=None):
        """
        Iterates over HTML documents, yielding content for each document.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.

        Yields:
            str: Parsed content from each HTML document.
        """
        for doc in self.docs(fileids):
            try:
                yield doc
            except Exception as e:
                print("Could not parse HTML: {}".format(e))
                continue

    def replace_curly_quotes(self, text):
        """
        Replaces curly quotes with straight quotes in the provided text.

        Parameters:
            text (str): The text to process.

        Returns:
            str: The text with curly quotes replaced by straight quotes.
        """
        quote_replacements = {
            u"\\u2018": "'",  # Left single quotation mark
            u"\\u2019": "'",  # Right single quotation mark
            u"\\u201C": '"',  # Left double quotation mark
            u"\\u201D": '"',  # Right double quotation mark
        }
        
        for curly_quote, straight_quote in quote_replacements.items():
            text = text.replace(curly_quote, straight_quote)
        
        return text

    def remove_non_printable_chars(self, text):
        """
        Removes non-printable characters from the provided text.

        Parameters:
            text (str): The text to process.

        Returns:
            str: The text with non-printable characters removed.
        """
        non_printable_pattern = re.compile(r'[\\x00-\\x1F\\x7F-\\x9F]+')
        cleaned_text = re.sub(non_printable_pattern, '', text)
        return cleaned_text.strip()

    def get_invalid_character_names(self, text):
        """
        Retrieves the names of non-printable characters in the text.

        Parameters:
            text (str): The text to analyze.

        Returns:
            set: A set of names of non-printable characters found in the text.
        """
        char_names = set()
        non_printable_pattern = re.compile(r'[\\x00-\\x1F\\x7F-\\x9F]')
        invalid_chars = non_printable_pattern.findall(text)
        for char in invalid_chars:
            try:
                name = unicodedata.name(char)
            except ValueError:
                name = "UNKNOWN CONTROL CHARACTER"
            char_names.add(name)
        return char_names
        
    def validate_paragraph(self, paragraph):
        """
        Validates a paragraph, checking for non-printable characters and whitespace-only content.

        Parameters:
            paragraph (str): The paragraph to validate.

        Returns:
            bool or str: True if valid, otherwise a string explaining the issues.
        """
        reasons = []
        if not paragraph.strip():
            reasons.append("Only whitespace")

        invalid_char_names = self.get_invalid_character_names(paragraph)
        
        if invalid_char_names:
            reason = f"Contains non-printable characters: {', '.join(invalid_char_names)}"
            reasons.append(reason)

        return True if not reasons else ', '.join(reasons)
    
    para_dict = dict()
    def paras(self, parser_type='lxml', fileids=None):
        """
        Extracts paragraphs from HTML content based on specified tags.

        Parameters:
            parser_type (str): Parser type for BeautifulSoup (default is 'lxml').
            fileids (str or None): Specific file identifier(s) or None.

        Yields:
            str: Extracted paragraph text.
        """
        for html in self.html(fileids):
            # Check if html content looks like an HTML string
            if not isinstance(html, str) or "<" not in html:
                print(f"Skipping non-HTML content: {html}")
                continue
            
            soup = BeautifulSoup(html, parser_type)
            
            # Join tags into a CSS selector if `self.tags` is a list
            tag_selector = ",".join(self.tags) if isinstance(self.tags, list) else self.tags
            
            for element in soup.select(tag_selector):
                text = element.text.strip()
                yield text

    sent_dict = dict()
    def sents(self, fileids=None):
        """
        Splits paragraphs into sentences.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.

        Yields:
            str: Extracted sentence text.
        """
        for paragraph in self.paras(fileids=fileids):
            for sentence in nltk.sent_tokenize(paragraph): 
                yield sentence

    word_dict = dict()
    def words(self, fileids=None): 
        """
        Splits sentences into individual words.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.

        Yields:
            str: Extracted word token.
        """
        for sentence in self.sents(fileids=fileids):
            for token in nltk.wordpunct_tokenize(sentence):
                yield token

    def generate(self, fileids=None, log_file_path=None, non_html_log_path=None):
        """
        Processes documents, logging invalid paragraphs and returning valid ones.

        Parameters:
            fileids (str or None): Specific file identifier(s) or None.
            log_file_path (str): Path for logging invalid paragraphs.
            non_html_log_path (str): Path for logging non-HTML content.

        Returns:
            tuple: A tuple containing:
                - doc_dict (list): List of valid paragraphs.
                - error_dict (list): List of invalid paragraphs.
                - count (int): Number of valid paragraphs added after cleaning.
                - all_paragraph_count (int): Total number of valid paragraphs.
        """
        doc_dict = []
        error_dict = []
        count = 0
        all_paragraph_count = 0 

        # Open two log files: one for invalid paragraphs and one for non-HTML content
        with open(log_file_path, 'a') as invalid_log_file, open(non_html_log_path, 'a') as non_html_log_file:
            for idx, html_content in enumerate(self.docs(fileids=fileids, non_html_log_file=non_html_log_file)):
                html_content = self.replace_curly_quotes(html_content)
                validation_result = self.validate_paragraph(html_content)

                if isinstance(validation_result, bool) and validation_result:  
                    # Valid paragraph
                    all_paragraph_count += 1
                    doc_dict.append(html_content)
                else:
                    # Invalid paragraph; log to the invalid paragraphs file
                    if not isinstance(validation_result, bool):
                        invalid_log_file.write(f"Invalid Paragraph {count}: {validation_result}\n")
                        cleaned_html_content = self.remove_non_printable_chars(html_content)

                        if isinstance(self.validate_paragraph(cleaned_html_content), bool):
                            count += 1
                            doc_dict.append(cleaned_html_content)
                        else:
                            error_dict.append(cleaned_html_content)

        return doc_dict, error_dict, count, all_paragraph_count



> ⚠️ **Important:** Verify File Path

In [None]:
#corpus_path = os.path.join("topic-modeling", "data", "docs-to-process", "PROJECT_FOLDER")
corpus_path = "/SpectraSync/data/docs-to-process/"

_corpus = DocumentParser(corpus_path)
# print filenames
_corpus.fileids()

> ⚠️ **Important:** Verify file output location and filenames

In [None]:
# Define generic paths for log files
base_path = "/SpectraSync/data/docs-to-process/"

log_file_path = os.path.join(base_path, "paragraph_error.log")
non_html_log_path = os.path.join(base_path, "non_html_content.log")

# Run the generate function with generic paths
corpus_tuple, errors, count, all_paragraph_count = _corpus.generate(
    log_file_path=log_file_path,
    non_html_log_path=non_html_log_path
)

In [None]:
# Display each error found, indexed for easy reference
if errors:
    print("Errors found during document processing:")
    for i, txt in enumerate(errors, start=1):
        if len(txt) == 0: txt = "NON-PRINTABLE CHARACTER"
        print(f"Error {i}: {txt}")
else:
    print("No errors found during document processing.")

In [None]:
from time import time
import spacy

texts_out = []       # List to store processed text (filtered and lemmatized tokens) for each paragraph
inner_text = []      # Temporary list to hold tokens for the current paragraph

# Frequency distribution to count occurrences of stop words found in the text
stopword_count = nltk.FreqDist()

# Print message indicating whether lemmatization is enabled
pp.pprint(f"Executing POS/LEMMATIZATION({LEMMATIZATION})")

t = time()  # Start timer to measure processing time

# Process each paragraph in corpus_tuple with progress tracking
for paras in tqdm(corpus_tuple):
    doc = nlp(paras)  # Process the paragraph with spaCy NLP pipeline

    for token in doc:
        # Check if token is a content word (noun, adjective, verb, or adverb) and has 5+ characters
        if token.pos_ in ['NOUN', 'ADJ', 'VERB', 'ADV']:
            # Include only words with a minimum length (e.g., 5 characters) for more meaningful content;
            # this threshold can be adjusted as needed by the user.
            if len(token.text) >= 5:
                # Filter out tokens that are stop words (based on both text and lemma forms)
                if token.text.lower() not in stop_words and token.lemma_.lower() not in stop_words:
                    # Append token (text or lemma based on LEMMATIZATION setting) to inner_text
                    inner_text.append(token.text if not LEMMATIZATION else token.lemma_)
                else:
                    # If token is a stop word, increment its count in stopword_count
                    stopword_count[token.text if not LEMMATIZATION else token.lemma_] += 1

    # Append processed tokens of the current paragraph to texts_out if not empty
    if len(inner_text) > 0:
        texts_out.append(inner_text)
    inner_text = []  # Reset inner_text for the next paragraph

# Print the time taken to complete the spaCy filtering and processing
pp.pprint('Time to finish spaCy filter: {} mins'.format(round((time() - t) / 60, 2)))

In [None]:
# Filter sentences to include only those with 5 or more words, ensuring more substantial content.
texts_for_processing = [sent for sent in texts_out if len(sent) >= 5]

# Clean up the texts_out variable as it's no longer needed
del texts_out

In [None]:
# For QA: Print the number of filtered sentences and display each sentence
print(len(texts_for_processing))  # Output the count of sentences meeting the length requirement
for text in texts_for_processing:
    print(text)  # Display each selected sentence for verification

> ⚠️ **Important:** Verify file output location and filename

In [None]:
import json  # Import JSON module for encoding data as JSON strings

def write_tokenized_sentences_to_jsonl(sentences, output_file):
    """
    Writes a list of tokenized sentences to a JSON Lines (.jsonl) file.

    Parameters:
        sentences (list): A list of tokenized sentences, where each sentence is a list of words.
        output_file (str): Path to the output .jsonl file.
    """
    # Open the specified output file in write mode with UTF-8 encoding
    with open(output_file, 'w', encoding='utf-8') as f:
        for sentence in sentences:
            # Convert each sentence to a JSON string and write it to a new line in the file
            json_line = json.dumps(sentence, ensure_ascii=False)
            f.write(json_line + '\n')  # Write each JSON string to a new line

# Call the function to save tokenized sentences to a JSONL file, replacing the path as needed
write_tokenized_sentences_to_jsonl(texts_for_processing, '/SpectraSync/data/processed-docs/data.jsonl')

> ⚠️ **Important:** Verify file output location and filename

In [None]:
# Define the file path for saving processed text in JSON format
filename2 = "SpectraSync/data/processed-docs/data.json"

# Open the specified file in write mode
with open(filename2, 'w', encoding='utf-8') as jsonfile:
    # Write the processed text data to the JSON file with formatting and UTF-8 encoding
    json.dump(texts_for_processing, jsonfile, indent=1, ensure_ascii=False)

In [None]:
# Import Gensim's Phrases for detecting common word pairs (bigrams) and triplets (trigrams)
from gensim.models import Phrases

# Create bigrams (two-word combinations) from the processed text data in `texts_for_processing`.
# Only word pairs that appear frequently (default threshold of 20 times or more) are considered valid bigrams.
bigram = Phrases(texts_for_processing, min_count=20)

# Initialize a frequency distribution object to count occurrences of each bigram for analysis
bigram_freq = nltk.FreqDist()

# Display detected bigrams for review; this allows verification of commonly identified phrases.
for ngrams, _ in bigram.vocab.items():
    if '_' in ngrams:  # Identify only bigrams (contains '_')
        bigram_freq[ngrams] += 1
        print(ngrams)  # Output each bigram to review its presence in the text data

# Add identified bigrams to each document in `texts_for_processing`.
# This step includes the bigrams in further analysis or model training as part of the document content.
for idx in range(len(texts_for_processing)):
    for token in bigram[texts_for_processing[idx]]:
        if '_' in token:  # Check if token is a bigram
            texts_for_processing[idx].append(token)  # Append bigram to the current document

> ⚠️ **Important:** Verify file output location and filename

In [None]:
# Define the file path for saving processed documents, now including bigrams, in JSON format.
filename3 = "/SpectraSync/data/processed-docs/data-w-bigrams.json"

# Open the specified file in write mode with UTF-8 encoding.
with open(filename3, 'w', encoding='utf-8') as jsonfile:
    # Save the updated text data with bigrams to the JSON file.
    json.dump(texts_for_processing, jsonfile, ensure_ascii=False)

> ⚠️ **Important:** Verify file output location and filename

In [None]:
# Save processed text with bigrams in JSON Lines (JSONL) format for efficient line-by-line reading.
# JSONL format allows each document to be stored on a separate line, making it ideal for large datasets.
write_tokenized_sentences_to_jsonl(
    texts_for_processing, 
    "/SpectraSync/data/processed-docs/data-w-bigrams.jsonl"
)