BERTopic Topic Modelling → creates topics on a subset of data and classifies review texts into the topics

 - created a designated distributed sample to preserve niche topics with stratification

 - a customized stop words list

 - minimum of 50 documents per cluster

 - filter to create more meaningful topics

In [None]:
#Exluding the training data


import pandas as pd
from sqlalchemy import create_engine, inspect, text
from bertopic import BERTopic
from sentence_transformers import SentenceTransformer
from dotenv import dotenv_values
from tqdm import tqdm # Import tqdm for progress bars

# Import specific BERTopic sub-models for fine-tuning
# from umap import UMAP # UMAP is implicitly used by BERTopic, no direct import needed unless customizing.
from hdbscan import HDBSCAN
from sklearn.feature_extraction.text import CountVectorizer
# from bertopic.representation import KeyBERTInspired, ZeroShotClassification, TextGeneration # Uncomment if you want to use advanced representation models

# --- Configuration and Setup ---

# Load environment variables from .env file
config = dotenv_values()

# PostgreSQL database connection details
pg_user = config['POSTGRES_USER']
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_schema = config.get("POSTGRES_SCHEMA", "public") # Defaults to 'public' if not found
pg_pass = config['POSTGRES_PASS']

# Validate environment variables
if not all([pg_user, pg_host, pg_port, pg_db, pg_pass]):
    raise ValueError("Missing one or more required PostgreSQL environment variables (POSTGRES_USER, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_PASS).")
if pg_schema == "public":
    print("WARNING: POSTGRES_SCHEMA not found in .env, defaulting to 'public'.")

# Constants for file paths, table names, and batch processing
CHUNK_SIZE = 10_000 # Number of rows to process in each batch for transformation
INPUT_DB_TABLE = "review_2019" # Table containing source review data (all data to be analyzed)
OUTPUT_DB_TABLE = "bertopic_analysis_results_update_3" # Table to store topic modeling results
MODEL_DIR = "bertopic_model_update_3" # Directory to save the trained BERTopic model
TOPIC_JSON_PATH = "topic_keywords_update_3.json" # Path to save topic summary information


TRAINING_SAMPLE_SOURCE = "DB_TABLE"
TRAINING_DB_TABLE_NAME = "training_sample" # <--- Specify your table name here (e.g., "review_training_subset")

# --- Database Utility Functions ---

def get_database_connection():
    """
    Establishes and returns a SQLAlchemy database engine for PostgreSQL.
    The connection string is constructed from environment variables.

    Returns:
        sqlalchemy.engine.base.Engine: A SQLAlchemy engine object.
    """
    url = f"postgresql+psycopg2://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}"
    return create_engine(url, pool_pre_ping=True)

def create_results_table(engine, schema_name, table_name):
    """
    Creates a PostgreSQL table to store the BERTopic analysis results if it doesn't already exist.

    Args:
        engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine.
        schema_name (str): The name of the database schema.
        table_name (str): The name of the table to create.
    """
    inspector = inspect(engine)
    if inspector.has_table(table_name, schema=schema_name):
        print(f"Table '{schema_name}.{table_name}' already exists. Skipping creation.")
        return

    # SQL DDL statement to create the table
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
        business_id VARCHAR(255),
        review_id VARCHAR(255) PRIMARY KEY,
        text TEXT,
        topic INTEGER,
        probability NUMERIC(5, 4)
    );
    """
    with engine.connect() as connection:
        connection.execute(text(create_table_sql))
        connection.commit()
    print(f"Successfully created table '{schema_name}.{table_name}' for results.")

def load_reviews_from_db(engine, schema, table):
    """
    Loads all reviews from a specified database table into a pandas DataFrame.

    Args:
        engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine.
        schema (str): The name of the database schema.
        table (str): The name of the table to load data from.

    Returns:
        pandas.DataFrame: A DataFrame containing the review data.
    """
    query = f'SELECT review_id, text, business_id FROM "{schema}"."{table}"' # Select specific columns
    return pd.read_sql_query(query, engine)

def save_topic_info(model, path):
    """
    Retrieves and saves the BERTopic model's topic information to a JSON file.

    Args:
        model (bertopic.BERTopic): The trained BERTopic model.
        path (str): The file path to save the JSON output.
    """
    topic_info = model.get_topic_info()
    topic_info.to_json(path, orient="records", indent=2)

def write_batch_to_db(engine, df, table, schema):
    """
    Writes a batch of DataFrame rows to a specified database table using psycopg2.extras.execute_values
    for efficient bulk updates.

    Args:
        engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine.
        df (pandas.DataFrame): The DataFrame batch to write. Must contain 'review_id', 'topic', 'probability'.
        table (str): The name of the target database table.
        schema (str): The name of the database schema.
    """
    if df.empty:
        return

    # Prepare data for bulk update: list of tuples (topic, probability, review_id)
    # The order must match the order in the SQL UPDATE statement.
    # Ensure data types are consistent for PostgreSQL.
    # business_id is also part of the insert.
    data_to_update = [(row["business_id"], row["review_id"], row["text"], int(row["topic"]), float(row["probability"]))
                      for _, row in df.iterrows()]

    with engine.begin() as conn:
        # Get the underlying psycopg2 connection for execute_values
        db_connection = conn.connection
        cursor = db_connection.cursor()

        # SQL for bulk upsert (INSERT or UPDATE if conflict on review_id)
        # This prevents primary key errors if some data was already processed
        # or if rerunning with overlapping chunks.
        insert_update_sql = f"""
            INSERT INTO "{schema}"."{table}" (business_id, review_id, text, topic, probability)
            VALUES %s
            ON CONFLICT (review_id) DO UPDATE SET
                business_id = EXCLUDED.business_id,
                text = EXCLUDED.text,
                topic = EXCLUDED.topic,
                probability = EXCLUDED.probability;
        """
        # Execute the upsert statement using execute_values for performance
        psycopg2.extras.execute_values(
            cursor,
            insert_update_sql,
            data_to_update,
            template="(%s, %s, %s, %s, %s)", # Template for values (business_id, review_id, text, topic, probability)
            page_size=CHUNK_SIZE # Controls internal batching for psycopg2
        )
        # No explicit commit needed here if using `engine.begin()` as it commits automatically on exit.
        print(f"Bulk inserted/updated {len(df)} rows with topic results into {table}.")


# --- Main Script Execution ---

if __name__ == "__main__":
    engine = get_database_connection()
    create_results_table(engine, pg_schema, OUTPUT_DB_TABLE)

    print("Loading all review data from DB...")
    df_full = load_reviews_from_db(engine, pg_schema, INPUT_DB_TABLE)

    # Drop exact duplicate text entries from the full dataset
    # `keep='first'` keeps one instance of each duplicate, which is generally what you want
    # to maintain data integrity while removing redundant text entries for modeling.
    print(f"Initial dataset size: {len(df_full)} rows.")
    df_full = df_full.drop_duplicates(subset='text', keep='first').copy()
    print(f"Dataset size after dropping exact text duplicates: {len(df_full)} rows.")

    # --- Select Training Data based on TRAINING_SAMPLE_SOURCE ---
    df_sample = pd.DataFrame() # Initialize an empty DataFrame
    if TRAINING_SAMPLE_SOURCE is None:
        print(f"Sampling {TRAINING_SAMPLE_SIZE} reviews randomly for training...")
        if len(df_full) < TRAINING_SAMPLE_SIZE:
            print(f"WARNING: Full dataset ({len(df_full)}) is smaller than requested sample size ({TRAINING_SAMPLE_SIZE}). Using full dataset for training.")
            df_sample = df_full.copy()
        else:
            df_sample = df_full.sample(n=TRAINING_SAMPLE_SIZE, random_state=42).copy()
    elif TRAINING_SAMPLE_SOURCE == "DB_TABLE":
        if 'TRAINING_DB_TABLE_NAME' not in locals() and 'TRAINING_DB_TABLE_NAME' not in globals():
             raise ValueError("TRAINING_DB_TABLE_NAME must be defined in constants when TRAINING_SAMPLE_SOURCE is 'DB_TABLE'.")
        print(f"Loading training data from database table '{pg_schema}.{TRAINING_DB_TABLE_NAME}'...")
        try:
            df_sample = load_reviews_from_db(engine, pg_schema, TRAINING_DB_TABLE_NAME)
            if df_sample.empty:
                raise ValueError(f"No data found in training table '{pg_schema}.{TRAINING_DB_TABLE_NAME}'.")
            if 'review_id' not in df_sample.columns or 'text' not in df_sample.columns:
                raise ValueError(f"Training table '{pg_schema}.{TRAINING_DB_TABLE_NAME}' must contain 'review_id' and 'text' columns.")
            # Ensure the sample reviews are also unique by text if they came from an external table
            original_sample_size = len(df_sample)
            df_sample = df_sample.drop_duplicates(subset='text', keep='first').copy()
            if len(df_sample) < original_sample_size:
                print(f"INFO: Removed {original_sample_size - len(df_sample)} duplicate texts from training sample loaded from DB table.")

        except Exception as e:
            raise ValueError(f"Error loading training sample from database table '{TRAINING_DB_TABLE_NAME}': {e}")
    elif isinstance(TRAINING_SAMPLE_SOURCE, list):
        print("Using provided list of review_ids for training data...")
        df_sample = df_full[df_full["review_id"].isin(TRAINING_SAMPLE_SOURCE)].copy()
        if len(df_sample) != len(TRAINING_SAMPLE_SOURCE):
            print(f"WARNING: Not all provided review_ids ({len(TRAINING_SAMPLE_SOURCE)}) were found in the full dataset. Found {len(df_sample)}.")
    elif isinstance(TRAINING_SAMPLE_SOURCE, str):
        print(f"Loading training data from '{TRAINING_SAMPLE_SOURCE}' (CSV file)...")
        try:
            df_source_ids = pd.read_csv(TRAINING_SAMPLE_SOURCE)
            if "review_id" not in df_source_ids.columns:
                raise ValueError(f"CSV sample file '{TRAINING_SAMPLE_SOURCE}' must contain a 'review_id' column.")

            # Filter df_full to get the actual texts for the specified review_ids
            df_sample = df_full[df_full["review_id"].isin(df_source_ids["review_id"])].copy()

            if len(df_sample) != len(df_source_ids):
                print(f"WARNING: Not all review_ids from '{TRAINING_SAMPLE_SOURCE}' ({len(df_source_ids)}) were found in the (deduplicated) database data. Using {len(df_sample)} IDs found.")

        except FileNotFoundError:
            raise FileNotFoundError(f"Training sample file not found at '{TRAINING_SAMPLE_SOURCE}'")
        except Exception as e:
            raise ValueError(f"Error loading training sample from CSV: {e}")
    else:
        raise ValueError("Invalid type for TRAINING_SAMPLE_SOURCE. Must be None, 'DB_TABLE', a list of review_ids, or a file path string.")

    if df_sample.empty:
        raise ValueError("No training data could be loaded. Please check your TRAINING_SAMPLE_SOURCE configuration and data.")

    sampled_texts = df_sample["text"].tolist()
    print(f"Training BERTopic model on {len(sampled_texts)} reviews.")

    print("Training BERTopic model with customized settings...")
    # Load embedding model
    embedding_model = SentenceTransformer("intfloat/e5-large-v2")

    # --- Customize BERTopic Parameters Here ---
    # These settings are crucial for improving topic quality, especially for large datasets.

    # 1. Custom CountVectorizer for stop words and min_df
    # - `stop_words`: Using a custom list of stop words to remove very common words that
    #   might not be useful for distinguishing topics.
    # - `min_df=10`: Ignores terms that appear in fewer than 10 documents (helps reduce noise).
    # - `ngram_range=(1, 2)`: Includes single words (unigrams) and two-word phrases (bigrams)
    #   as potential topic keywords, which can capture more nuanced topic representations.
    custom_stop_words = [
        "a", "an", "the", "and", "or", "but",
        "if", "while", "is", "am", "are", "was", "were", "be", "been", "being",
        "do", "does", "did", "doing",
        "have", "has", "had", "having",
        "i", "me", "my", "myself",
        "we", "our", "ours", "ourselves",
        "you", "your", "yours", "yourself", "yourselves",
        "he", "him", "his", "himself",
        "she", "her", "hers", "herself",
        "it", "its", "itself",
        "they", "them", "their", "theirs", "themselves",
        "this", "that", "these", "those",
        "to", "in", "of","for","on","at","with",
        "as", "such", "too",
        "can", "will", "would", "should", "could",
        "just", "only", "also", "so", "than", "then", "there", "here",
        "what", "which", "who", "whom", "whose",
        "when", "where", "why", "how"
    ]

    vectorizer_model = CountVectorizer(stop_words=custom_stop_words, min_df=10, ngram_range=(1, 2))

    # 2. Custom HDBSCAN for clustering
    # - `min_cluster_size=50`: This is your `min_topic_size`. Increasing it (from default 10)
    #   helps reduce outliers (-1 topic) and yields more robust and larger topics.
    #   For a million rows, 50 is a reasonable starting point, but you might even go higher (e.g., 100-200)
    #   depending on the desired granularity of topics.
    # - `prediction_data=True`: Essential for the `.transform()` method to assign new documents to topics,
    #   especially for documents not in the training set.
    hdbscan_model = HDBSCAN(min_cluster_size=50, prediction_data=True)

    # 3. Initialize BERTopic with all customized models and settings
    topic_model = BERTopic(
        embedding_model=embedding_model,
        vectorizer_model=vectorizer_model,
        hdbscan_model=hdbscan_model,
        nr_topics="auto", # Automatically merge topics to a more optimal count.
                           # Alternatively, you can specify an integer, e.g., nr_topics=100.
        language="english", # Explicitly set language for the model.
        verbose=True        # Show progress updates during training.
    )



    # Fit the model to your selected training data
    topics, probabilities = topic_model.fit_transform(sampled_texts)
    print(f"BERTopic model training complete. Found {len(topic_model.get_topics())} topics.")


    # Save the trained model and topic information
    print(f"Saving BERTopic model to {MODEL_DIR}...")
    topic_model.save(MODEL_DIR) # Saves the model configuration and trained components
    print(f"Saving topic summary to {TOPIC_JSON_PATH}...")
    save_topic_info(topic_model, TOPIC_JSON_PATH)

    # --- Process Remaining Data in Chunks ---

    # Filter out training data from the full dataset so it's not processed again
    print("Filtering out training data from the full dataset for batch processing...")
    # This ensures that only reviews *not* used for training are processed in the next step.
    df_remaining = df_full[~df_full["review_id"].isin(df_sample["review_id"])].copy()
    print(f"Remaining reviews to process with topic model: {len(df_remaining)} rows.")

    # Loop through the remaining data in defined CHUNK_SIZE batches
    # Use tqdm for a main progress bar for processing the remaining data
    with tqdm(total=len(df_remaining), desc="Processing Remaining Reviews", unit="reviews") as pbar_remaining:
        for start in range(0, len(df_remaining), CHUNK_SIZE):
            end = start + CHUNK_SIZE
            batch = df_remaining.iloc[start:end].copy() # Use .copy() to avoid SettingWithCopyWarning

            texts_batch = batch["text"].tolist()
            # Transform the batch using the trained model to get topic assignments
            # Note: topic_model.transform() is used for new data, not .fit_transform()
            topics_batch, probs_batch = topic_model.transform(texts_batch)

            # Add topic and probability results to the batch DataFrame
            batch["topic"] = topics_batch
            batch["probability"] = probs_batch

            # Write the processed batch results to the database
            # Ensure only relevant columns are passed to write_batch_to_db
            write_batch_to_db(engine, batch[["business_id", "review_id", "text", "topic", "probability"]],
                              OUTPUT_DB_TABLE, pg_schema)

            pbar_remaining.update(len(batch)) # Update progress bar for the processed batch
            print(f"Processed and updated {len(batch)} rows.")

    print("Finished topic modeling and database update for all reviews.")

    # Close the database connection
    engine.dispose()