# 1. Import Necessary Libraries

Make sure  Microsoft Visual C++ is installed on your pc

Extracting text from pdf and converting to csv

In [None]:
"""
Before anything else, Read the README.md file
"""

In [None]:
# Standard Library
import csv
import json
import logging
import os
import re
from collections import Counter
from typing import List, Dict, Any, Generator, Tuple, Union, Set

# Third-Party Libraries
import numpy as np
import pandas as pd
import spacy
import fitz
from langdetect import detect, LangDetectException
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import plotly.io as pio
from bertopic import BERTopic
from concurrent.futures import ThreadPoolExecutor

# spaCy Stopwords
from spacy.lang.en.stop_words import STOP_WORDS as en_stopwords
from spacy.lang.fr.stop_words import STOP_WORDS as fr_stopwords
from spacy.lang.nl.stop_words import STOP_WORDS as nl_stopwords

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
with open("../configs/config.json") as f:
    config = json.load(f)

# Constants
PDF_DIRECTORY = config["test_pdf_directory"]
CLEANED_CSV = config["test_cleaned_csv"]
TOPIC_CSV = config["test_topics_csv"]
PDF_VISUAL_PATH = config["pdf_visual_path"]

COLUMN_FILENAME = "filename"
COLUMN_PAGE = "page"
COLUMN_TEXT = "text"
COLUMN_LANGUAGE = "language"
COLUMN_CLEAN_TEXT = "clean_text"

# 2. Extract text from either a single or multiple research study PDFs. Path is 'studies/papers'


### Loading functions

In [None]:
# Load spaCy models once to avoid repeated initialization
NLP_MODELS = {
    'en': spacy.load("en_core_web_lg"),
    'nl': spacy.load("nl_core_news_lg"),
    'fr': spacy.load("fr_core_news_lg")
}

# Ligature replacement dictionary
LIGATURES = str.maketrans({
    # Standard Latin Ligatures
    'ﬁ': 'fi', 'ﬂ': 'fl', 'ﬃ': 'ffi', 'ﬄ': 'ffl', 'ﬀ': 'ff', 'ﬅ': 'ft', 'ﬆ': 'st',
    
    # Mathematical & Scientific Ligatures
    '≤': '<=', '≥': '>=', '≠': '!=', '±': '+-', '→': '->', '∞': 'oo',
    '∫': 'int', '∑': 'sum', '∏': 'prod', '∇': 'nabla', '∂': 'partial', '√': 'sqrt'
})

# Define common OCR fixes
OCR_FIXES_PATTERN = re.compile(r'signi\s*ficant|di\s*fferent|e\s*ffective|e\s*ffect|chil\s*dren|e\s*ff\s*ective|con\s*fi\s*dence')

# Define punctuation and symbol fixes
PUNCTUATION_FIXES = {
    '“': '"', '”': '"', '‘': "'", '’': "'",
    '—': '-', '–': '-', '…': '...', '•': '*', '·': '*', '●': '*',
}

# Define normalization fixes for numbers, symbols, and units
NORMALIZATION_FIXES = {
    '–': '-',  # Replace en dash with hyphen
    ' %': '%',  # Remove space before percentage
    '^': '',   # Remove caret
    'kg.': 'kg',  # Remove period after kg
    'C°': '°C',   # Correct temperature symbol
}

# Define citation and reference fixes
CITATION_FIXES = {
    'et. al.': 'et al.', 
    'et al': 'et al.'
}

# Combine all replacements into a single dictionary
ALL_REPLACEMENTS = {**PUNCTUATION_FIXES, **NORMALIZATION_FIXES, **CITATION_FIXES}

# Unwanted keywords for filtering
UNWANTED_KEYWORDS = frozenset({
    'doi', 'https', 'http', 'journal', 'university', 'copyrighted',
    'taylor & francis', 'elsevier', 'published by', 'received',
    'revised', 'author(s)', 'source:', 'history:', 'keywords',
    'volume', 'downloaded', 'article', 'creative commons use',
    'authors', 'all rights reserved'
})

# Reference and Acknowledgement markers
REFERENCE_MARKERS = frozenset({'references', 'bibliography', 'acknowledgements', 'method', 'methods'})

# Utility functions
def validate_pdf_path(pdf_path: str) -> bool:
    """Checks if the PDF file exists and is readable."""
    if not os.path.exists(pdf_path) or not os.access(pdf_path, os.R_OK):
        logging.error(f"PDF file is not accessible: {pdf_path}")
        return False
    return True

def is_heading(line: str) -> bool:
    """Determines if a line is a heading (all uppercase or starts with 'CHAPTER')."""
    return line.isupper() or line.startswith('CHAPTER')

def is_footnote(line: str) -> bool:
    """Identifies footnotes based on common patterns."""
    return re.match(r'^(\\[\\d+\\]|\\d+\\.|[*]|Note|Table)', line) is not None

def contains_unwanted_keywords(line: str) -> bool:
    """Checks if the line contains any unwanted keywords."""
    lower_line = line.lower()
    return any(keyword in lower_line for keyword in UNWANTED_KEYWORDS)

def is_reference_section(line: str) -> bool:
    """Checks if the line indicates the start of a reference section."""
    lower_line = line.lower()
    return any(marker in lower_line for marker in REFERENCE_MARKERS)

def clean_text(text: str) -> str:
    """Cleans text by replacing ligatures, fixing OCR errors, and normalizing symbols."""
    # Replace ligatures
    text = text.translate(LIGATURES)
    
    # Apply OCR fixes using regex substitution
    text = OCR_FIXES_PATTERN.sub(lambda match: ALL_REPLACEMENTS.get(match.group(0), match.group(0)), text)
    
    # Apply other fixes using a loop for efficiency
    for pattern, replacement in ALL_REPLACEMENTS.items():
        text = text.replace(pattern, replacement)
    
    return text

def detect_language(text: str) -> str:
    """Detects the language of a given text block."""
    try:
        return detect(text)
    except LangDetectException:
        return "unknown"

def process_pdf(file_path: str, filename: str) -> Generator[List[str], None, None]:
    """
    Processes a PDF file and yields cleaned text data along with metadata.

    Args:
        file_path (str): Path to the PDF file.
        filename (str): Name of the PDF file (used for metadata).

    Yields:
        List[str]: A list containing:
            - filename (str): Name of the PDF file.
            - page_num (int): Page number where the text was extracted.
            - full_text (str): Cleaned and concatenated text from the page.
            - language (str): Detected language of the text.
    """
    # Extract the title from the filename (used to filter out irrelevant text)
    title = os.path.splitext(filename)[0].lower()

    try:
        # Open the PDF file using PyMuPDF (fitz)
        with fitz.open(file_path) as pdf_document:
            # Flag to track if the reference section has been reached
            section_reached = False

            # Iterate through each page in the PDF
            for page_num, page in enumerate(pdf_document, start=1):
                # Stop processing if the reference section has been reached
                if section_reached:
                    break

                # Extract text from the page in a structured format (dictionary)
                text_dict = page.get_text("dict")

                # Iterate through each block of text in the page
                for block in text_dict.get("blocks", []):
                    # Skip non-text blocks (e.g., images)
                    if block.get("type", 1) != 0:
                        continue

                    # Initialize a list to store lines of the current paragraph
                    paragraph = []
                    # Track the x-coordinate of the first word in the previous line
                    prev_x = None

                    # Iterate through each line in the block
                    for line in block.get("lines", []):
                        # Extract spans (text segments) from the line
                        spans = line.get("spans", [])
                        # Concatenate text from all spans, replacing semicolons with commas
                        line_text = "".join(span["text"].replace(';', ',') for span in spans)
                        # Clean the text (fix OCR errors, ligatures, etc.)
                        line_text = clean_text(line_text)

                        # Check if the line indicates the start of a reference section
                        if is_reference_section(line_text):
                            section_reached = True
                            break

                        # Skip unwanted lines (headings, footnotes, or lines with unwanted keywords)
                        if (is_heading(line_text) or is_footnote(line_text) or
                            contains_unwanted_keywords(line_text) or line_text.strip().lower() == title):
                            continue

                        # Get the x-coordinate of the first word in the current line
                        first_word_x = spans[0]["bbox"][0] if spans else 0

                        # If the x-coordinate is close to the previous line's x-coordinate,
                        # treat it as part of the same paragraph
                        if prev_x is None or abs(first_word_x - prev_x) < 10:
                            paragraph.append(line_text)
                        else:
                            # If the x-coordinate changes significantly, finalize the current paragraph
                            if paragraph:
                                full_text = "".join(paragraph).strip()
                                # Yield the paragraph if it contains enough words
                                if len(full_text.split()) >= 10:
                                    yield [filename, page_num, full_text, detect_language(full_text)]
                            # Start a new paragraph with the current line
                            paragraph = [line_text]

                        # Update the previous x-coordinate
                        prev_x = first_word_x

                    # Finalize the last paragraph in the block if the reference section hasn't been reached
                    if paragraph and not section_reached:
                        full_text = "".join(paragraph).strip()
                        if len(full_text.split()) >= 10:
                            yield [filename, page_num, full_text, detect_language(full_text)]

    except Exception as e:
        # Log any errors that occur during PDF processing
        logging.error(f"Failed to process {file_path}: {e}")

# Helper function for parallel processing
def process_single_pdf(file_path: str, filename: str) -> List[List[str]]:
    return list(process_pdf(file_path, filename))

## The main function that processes the pdf's

In [None]:
# Main function
pdf_files = [f for f in os.listdir(PDF_DIRECTORY) if f.endswith('.pdf')]

with open(CLEANED_CSV, mode='w', newline='', encoding='utf-8') as csv_file:
    writer = csv.writer(csv_file)
    writer.writerow(["filename", "page", "text", "language"])

    # Process PDFs in parallel
    with ThreadPoolExecutor() as executor:
        futures = []
        for filename in pdf_files:
            file_path = os.path.join(PDF_DIRECTORY, filename)
            if validate_pdf_path(file_path):
                futures.append(executor.submit(process_single_pdf, file_path, filename))

        # Write results to CSV as they are processed
        for future in tqdm(futures, desc="Processing PDFs"):
            for row in future.result():
                writer.writerow(row)

logging.info(f"Data successfully exported to {CLEANED_CSV}")

### Showing the cleaned DataFrame

In [None]:
# Load the data
df_cleaned = pd.read_csv(CLEANED_CSV)
df_cleaned.head(30)

# 3. Clean it using spaCy with language support for english, dutch and french.
We clean up the text
- Remove the name of city, country, geography for better outcome
- Remove special characters (only letters)
- Convert to lower case
- Remove stop words
- Remove words of only one or 2 letters ('a', 'I', at,...)
- Remove very short sentences
- Remove urls 
- use stemming
- remove duplicate sentences

In [None]:
# Map languages to their respective spaCy models
STOPWORDS_MAP: Dict[str, Set[str]] = {
    'en': en_stopwords,  # Replace with actual stopwords for English
    'fr': fr_stopwords,  # Replace with actual stopwords for French
    'nl': nl_stopwords,  # Replace with actual stopwords for Dutch
}

# Define entity types to remove (Personal Information)
PERSONAL_ENTITIES: Set[str] = {
    "PERSON", "EMAIL", "PHONE", "GPE", "ORG", "NORP", "FAC", "LOC", "PRODUCT", 
    "EVENT", "WORK_OF_ART", "LAW", "DATE"
}

# Regex pattern to remove unwanted characters (e.g., emojis, symbols, numbers, etc.)
UNWANTED_CHARACTERS_PATTERN = re.compile(
    r"[^\w\s"  # Keep alphanumeric characters and whitespace
    r"ÀÁÂÃÄÅàáâãäå"  # Allow common accented characters
    r"ÈÉÊËèéêë" 
    r"ÌÍÎÏìíîï"
    r"ÒÓÔÕÖòóôõö"
    r"ÙÚÛÜùúûü"
    r"ÇçÑñ"  # Allow specific special characters
    r"]", 
    flags=re.UNICODE
)

# Regex pattern to remove numbers
NUMBERS_PATTERN = re.compile(r"\d+")

def preprocess_text(text: str) -> str:
    """
    Preprocesses text by removing unwanted characters, numbers, normalizing spaces, and stripping leading/trailing whitespace.
    Args:
        text (str): Input text to preprocess.
    Returns:
        str: Preprocessed text.
    """
    # Remove unwanted characters using regex
    text = UNWANTED_CHARACTERS_PATTERN.sub("", text)
    # Remove numbers using regex
    text = NUMBERS_PATTERN.sub("", text)
    # Normalize spaces (replace multiple spaces with a single space)
    text = re.sub(r"\s+", " ", text)
    # Strip leading/trailing whitespace
    return text.strip()

def clean_text_with_spacy(text: str, lang: str) -> str:
    """
    Cleans text using spaCy: removes personal entities, stopwords, and lemmatizes words.
    Args:
        text (str): Input text to clean.
        lang (str): Language code (e.g., 'en', 'fr', 'nl').
    Returns:
        str: Cleaned text.
    """
    if lang not in NLP_MODELS:
        logging.warning(f"Language model for '{lang}' not found. Defaulting to English.")
        lang = 'en'

    nlp = NLP_MODELS.get(lang)
    stopwords = STOPWORDS_MAP.get(lang, set())  # Get stopwords for the language, default to empty set

    if not nlp:
        return text  # If no model is available, return original text

    # Preprocess text to remove unwanted characters and numbers
    text = preprocess_text(text)
    doc = nlp(text)

    # Token processing: lemmatization, stopword removal, personal entity removal
    tokens = [
        token.lemma_.lower() for token in doc
        if token.lemma_  # Ensure lemma exists
        and token.ent_type_ not in PERSONAL_ENTITIES  # Remove personal entities
        and token.text.lower() not in stopwords  # Remove stopwords
        and not token.is_punct  # Remove punctuation
        and not token.is_space  # Remove spaces
        and len(token.lemma_) > 3  # Remove very short words
    ]
    
    return " ".join(tokens)

def final_clean_csv(input_csv: str):
    """
    Cleans text in a CSV file using spaCy and saves the results.
    Args:
        input_csv (str): Path to the input CSV file.
    """
    try:
        # Read the CSV file
        df = pd.read_csv(input_csv)

        # Validate required columns
        if COLUMN_TEXT not in df or COLUMN_LANGUAGE not in df:
            raise KeyError(f"CSV must contain '{COLUMN_TEXT}' and '{COLUMN_LANGUAGE}' columns")

        # Apply text cleaning in batches to reduce memory usage
        batch_size = 1000  # Adjust based on memory constraints
        cleaned_texts = []

        for i in range(0, len(df), batch_size):
            batch = df.iloc[i:i + batch_size]
            cleaned_batch = batch.apply(
                lambda row: clean_text_with_spacy(str(row[COLUMN_TEXT]), row[COLUMN_LANGUAGE]),
                axis=1
            )
            cleaned_texts.extend(cleaned_batch)

        # Add cleaned text to the DataFrame
        df[COLUMN_CLEAN_TEXT] = cleaned_texts

        # Save the cleaned CSV
        df.to_csv(input_csv, index=False)
        logging.info(f"Cleaned data added to new column in {input_csv}")

    except FileNotFoundError:
        logging.error(f"File not found: {input_csv}")
    except KeyError as e:
        logging.error(f"Missing required column in CSV: {e}")
    except Exception as e:
        logging.error(f"Unexpected error processing file: {e}")

# Run the cleaning process
final_clean_csv(CLEANED_CSV)

In [None]:
# Display the first 30 rows of the cleaned DataFrame
df_final = pd.read_csv(CLEANED_CSV)
df_final.head(30)

In [None]:
# Check for non-string values to avoid errors before applying topic modeling
print(df_final['clean_text'].isnull().sum())  # Count NaN values
print(df_final[df_final['clean_text'].apply(lambda x: not isinstance(x, str))])  # Find non-string values

In [None]:
# Replace NaN values with empty strings to avoid errors
df_final['clean_text'] = df_final['clean_text'].fillna('')

In [None]:
# Check for non-string values in the 'clean_text' column
print(df_final['clean_text'].isnull().sum())  # Count NaN values
print(df_final[df_final['clean_text'].apply(lambda x: not isinstance(x, str))])  # Find non-string values


# 4. Initialize and fit BERTopic
The good thing with BERTopic is that is does most of the work automatically (Meaning, I do not need to bore you to death with details about how it works behind te scenes.)

We need to do 3 things
1. Initialize BERTopic model
2. 'Fit' the model -> this  means: run the model, as you would run a simple linear regression
3. Look at the topics via 

To get started, let's just use the default settings.

In [None]:
unique_filenames_count = df_final['filename'].nunique()
print(unique_filenames_count)

In [None]:
# Initialize BERTopic model
topic_model = BERTopic(calculate_probabilities=True, min_topic_size=5, nr_topics=10)

# Fit the model with preprocessed text sentences
topics, probabilities = topic_model.fit_transform(df_final['clean_text'])

# View and inspect topics
topic_model.get_topic_info()

# 5. Visualize Topics

In [None]:
# Visualize the topics
topic_model.visualize_topics()

In [None]:
topic_model.visualize_barchart()

# 6. Visualize Topic Hierarchy

In [None]:
hierarchical_topics = topic_model.hierarchical_topics(df_final['clean_text'])
topic_model.visualize_hierarchy(hierarchical_topics=hierarchical_topics)

In [None]:
tree = topic_model.get_topic_tree(hierarchical_topics)
print(tree)

# 7. Visualize documents

In [None]:
topic_model.visualize_documents(df_final['clean_text'])

In [None]:
# Add topics and probabilities to the original DataFrame
df_final["topic_number"] = np.argmax(probabilities, axis=1)

# Also extract the topic names and assign them to the DataFrame
info = topic_model.get_topic_info()
topic_names = info['Representation']

df_final['topic_name'] = df_final['topic_number'].map(topic_names)

# Save the updated DataFrame to a CSV

df_final['topic_name'] = df_final['topic_number'].map(topic_names)

# Save to a new CSV file
df_final.to_csv(TOPIC_CSV, index=False)

In [None]:
df_final.head(30)

In [None]:
topic_model.visualize_distribution(probabilities[5])

# 8. Topics per full article

In [None]:
# Calculate the count of times each topic is chosen within each article
article_topic_counts = df_final.groupby('filename')['topic_number'].value_counts().unstack(fill_value=0)

# Rename columns to 'Topic X'
article_topic_counts.columns = [f'Topic {i}' for i in article_topic_counts.columns]

# Display the table
print(article_topic_counts)

# Plot the distribution for each article
article_topic_counts.plot(kind='bar', stacked=True, figsize=(15, 7))
plt.title('Topic Distribution per Article (Count)')
plt.xlabel('Article')
plt.ylabel('Count')
plt.legend(title='Topics', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

In [None]:
# Calculate the proportion of times each topic is chosen within each article
article_topic_proportions = df_final.groupby('filename')['topic_number'].value_counts(normalize=True).unstack(fill_value=0)

# Rename columns to 'Topic X'
article_topic_proportions.columns = [f'Topic {i}' for i in article_topic_proportions.columns]

# Display the table
print(article_topic_proportions)

# Plot the distribution for each article
article_topic_proportions.plot(kind='bar', stacked=True, figsize=(15, 7))
plt.title('Topic Distribution per Article (Proportion)')
plt.xlabel('Article')
plt.ylabel('Proportion')
plt.legend(title='Topics', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

# 9. Save the visuals to a pdf file.