<a href="https://colab.research.google.com/github/davidelgas/DataSciencePortfolio/blob/main/Language_Models/NLP_Corpus_Development.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1 Project Description






The project leverages user-generated content from a domain-specific online forum as the training corpus. This data is largely unstructured, with minimal metadata available. The following tools were considered to gather the source text for the corpus:


### Web Scraping
- **Tools:** Beautiful Soup, online SaaS products
    - **Pros:**
        - **Direct Access to Targeted Data:** Enables precise extraction of user-generated content from specific sections or threads within the forum.
        - **Efficiency in Data Collection:** Automated scripts can gather large volumes of data in a short amount of time, making it suitable for assembling significant datasets for NLP.
    - **Cons:**
        - **Potential for Incomplete Data:** May miss embedded content or dynamically loaded data, depending on the website’s structure.
        - **Ethical and Legal Considerations:** Scraping data from forums may raise concerns about user privacy and must adhere to the terms of service of the website.
        - **Very Platform Dependent:** Forum specific solutions result in forum specific data schemas that must be reverse engineered to for successful text extraction.

### Forum-specific APIs
- **Tools:** Python (`requests` library for API calls and `json` library for handling responses)
    - **Pros:**
        - **Structured and Reliable Data Retrieval:** APIs provide structured data, making it easier to process and integrate into your project.
        - **Efficient and Direct Access:** Directly accessing the forum's data through its API is efficient, bypassing the need for HTML parsing.
        - **Compliance and Ethical Data Use:** Utilizing APIs respects the forum's data use policies and ensures access is in line with user agreements.
    - **Cons:**
        - **Rate Limiting:** APIs often have limitations on the number of requests that can be made in a certain timeframe, which could slow down data collection.
        - **API Changes:** Dependence on the forum's API structure means that changes or deprecation could disrupt your data collection pipeline.
        - **Access Restrictions:** Some data or functionalities might be restricted or require authentication, posing additional challenges for comprehensive data collection.


**Conclusion: I will be using Beautiful Soup to create my corpus.**


#2 Create Enviornment

In [None]:
# Access to Google Drive
# This seems to propagate credentials better from its own cell

from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Packages and libraries

!pip install snowflake

import os
import time
import requests
import pandas as pd
import concurrent.futures
import snowflake.connector
import concurrent.futures

from bs4 import BeautifulSoup

# --- Settings ---
base_path = '/content/drive/Othercomputers/My Mac/Git/Language_Models/datasets/e9'
credentials_path = '/content/drive/Othercomputers/My Mac/Git/credentials/snowflake_credentials.txt'


#3 Data Collection


In [None]:
# --- Utility Functions ---

def load_credentials(path_to_credentials):
    if not os.path.exists(path_to_credentials):
        raise FileNotFoundError(f"Credentials file not found: {path_to_credentials}")
    with open(path_to_credentials, 'r') as file:
        for line_num, line in enumerate(file, start=1):
            line = line.strip()
            if line and '=' in line:
                key, value = line.split('=', 1)
                os.environ[key] = value
            else:
                print(f"Skipping invalid line {line_num}: {line}")
    for var in ['USER', 'PASSWORD', 'ACCOUNT']:
        if not os.environ.get(var):
            raise EnvironmentError(f"Missing environment variable: {var}")

def connect_to_snowflake():
    try:
        conn = snowflake.connector.connect(
            user=os.environ.get('USER'),
            password=os.environ.get('PASSWORD'),
            account=os.environ.get('ACCOUNT')
        )
        print(f"Connected to Snowflake account: {os.environ.get('ACCOUNT')}")
        return conn
    except Exception as e:
        raise ConnectionError(f"Failed to connect to Snowflake: {e}")

def create_db_schema_table(cur):
    try:
        cur.execute("CREATE DATABASE IF NOT EXISTS e9_corpus")
        cur.execute("USE DATABASE e9_corpus")
        cur.execute("CREATE SCHEMA IF NOT EXISTS e9_corpus_schema")
        cur.execute("""
            CREATE TABLE IF NOT EXISTS e9_corpus.e9_corpus_schema.e9_forum_corpus (
                THREAD_ID NUMBER(38,0) PRIMARY KEY,
                THREAD_TITLE STRING,
                THREAD_FIRST_POST STRING,
                THREAD_ALL_POSTS STRING
            )
        """)
        print("Database, schema, and table checked/created.")
    except Exception as e:
        print(f"Error creating database/schema/table: {e}")

def fetch_existing_thread_ids(cur):
    query = "SELECT THREAD_ID FROM e9_corpus.e9_corpus_schema.e9_forum_corpus"
    try:
        cur.execute(query)
        result = cur.fetchall()
        return set(row[0] for row in result)
    except Exception as e:
        print(f"Error fetching existing thread IDs: {e}")
        return set()

def insert_missing_data(cur, df, existing_thread_ids):
    """Insert only new data into Snowflake, skipping already loaded threads."""
    if df.empty:
        print("No data to insert.")
        return

    print(f"Original DataFrame has {len(df)} rows.")
    df.columns = [col.upper() for col in df.columns]

    # Filter only missing threads
    new_df = df[~df['THREAD_ID'].isin(existing_thread_ids)]
    print(f"{len(new_df)} new threads will be inserted into Snowflake.")

    if new_df.empty:
        print("No new threads to insert.")
        return

    # Replace NaN values with None
    new_df = new_df.where(pd.notnull(new_df), None)

    insert_query = """
    INSERT INTO e9_corpus.e9_corpus_schema.e9_forum_corpus
    (THREAD_ID, THREAD_TITLE, THREAD_FIRST_POST, THREAD_ALL_POSTS)
    VALUES (%s, %s, %s, %s)
    """

    # Create a list of tuples
    rows_to_insert = [
        (
            row['THREAD_ID'],
            row['THREAD_TITLE'],
            row['THREAD_FIRST_POST'],
            row['THREAD_ALL_POSTS']
        )
        for _, row in new_df.iterrows()
    ]

    # Batch insert all rows at once
    cur.executemany(insert_query, rows_to_insert)

def upload_corpus_to_snowflake(base_path: str, credentials_path: str, filename: str):
    file_path = os.path.join(base_path, filename)
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Corpus file not found: {file_path}")

    forum_corpus_df = pd.read_csv(file_path)
    print(f"Loaded {len(forum_corpus_df)} rows from {file_path} to upload.")

    load_credentials(credentials_path)
    conn = connect_to_snowflake()
    cur = conn.cursor()

    try:
        create_db_schema_table(cur)
        existing_thread_ids = fetch_existing_thread_ids(cur)
        print(f"Snowflake already has {len(existing_thread_ids)} threads.")
        insert_missing_data(cur, forum_corpus_df, existing_thread_ids)
        conn.commit()
        print(f"Data from {filename} committed successfully.")

        cur.execute("SELECT COUNT(*) FROM e9_corpus.e9_corpus_schema.e9_forum_corpus")
        final_count = cur.fetchone()[0]
        print(f"Total threads now in Snowflake: {final_count}")

    except Exception as e:
        print(f"Error during upload: {e}")
        conn.rollback()
        raise e
    finally:
        cur.close()
        conn.close()


In [None]:
df_e9_corpus = pd.read_csv('/content/drive/Othercomputers/My Mac/Git/Language_Models/datasets/e9/e9_forum_threads_decorated.csv')
df_e9_corpus.info()

# 4 Data Storage

In [None]:
def create_urls(base_path: str, filename: str = 'e9_forum_thread_ids.csv', threads: int = 1):
    file_path = os.path.join(base_path, filename)
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
        existing_ids = pd.read_csv(file_path)
        last_thread_id = int(existing_ids['thread_id'].iloc[-1])
        print(f"Existing thread_ids found. Last thread_id: {last_thread_id}")
    else:
        last_thread_id = 0
        print(f"No existing thread_ids. Starting from {last_thread_id}")

    new_ids = [{'thread_id': tid} for tid in range(last_thread_id + 1, last_thread_id + threads + 1)]
    new_thread_ids = pd.DataFrame(new_ids)
    new_thread_ids.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)

    print(f"Added {threads} new thread_ids. Ending at {new_ids[-1]['thread_id']}")
    return new_thread_ids

def fetch_full_thread_data(df, base_path: str, posts_filename: str = 'e9_forum_posts.csv', decorated_filename: str = 'e9_forum_threads_decorated.csv'):
    posts_file = os.path.join(base_path, posts_filename)
    decorated_file = os.path.join(base_path, decorated_filename)

    existing_posts = pd.read_csv(posts_file) if os.path.exists(posts_file) else pd.DataFrame(columns=['thread_id', 'post_timestamp', 'post_raw'])
    existing_decorated = pd.read_csv(decorated_file) if os.path.exists(decorated_file) else pd.DataFrame(columns=['thread_id', 'thread_title', 'thread_first_post'])

    existing_thread_ids = set(existing_posts['thread_id'].tolist()) | set(existing_decorated['thread_id'].tolist())
    new_threads = df[~df['thread_id'].isin(existing_thread_ids)]

    post_data = []
    decorated_data = []

    for thread_id in new_threads['thread_id']:
        thread_url = f"https://e9coupe.com/forum/threads/{thread_id}/?page=1"
        try:
            print(f"Fetching thread {thread_id}...")
            response = requests.get(thread_url)
            if response.status_code != 200:
                print(f"Error {response.status_code} fetching {thread_url}")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')
            articles = soup.find_all('article', class_='message--post')
            if not articles:
                print(f"No posts found for thread {thread_id}. Skipping.")
                continue

            title_element = soup.find('title')
            thread_title = title_element.get_text().split('|')[0].strip() if title_element else "No Title"

            first_post_element = soup.find('article', class_='message-body')
            first_post = first_post_element.get_text(strip=True) if first_post_element else "No content"

            decorated_data.append({
                'thread_id': thread_id,
                'thread_title': thread_title,
                'thread_first_post': first_post
            })

            for article in articles:
                timestamp_element = article.find('time')
                content_element = article.find('div', class_='bbWrapper')
                post_data.append({
                    'thread_id': thread_id,
                    'post_timestamp': timestamp_element['datetime'] if timestamp_element else "N/A",
                    'post_raw': content_element.get_text(strip=True) if content_element else "No content"
                })

            time.sleep(1)

        except Exception as e:
            print(f"Error fetching thread {thread_id}: {e}")

    if post_data:
        new_posts_df = pd.DataFrame(post_data)
        combined_posts = pd.concat([existing_posts, new_posts_df], ignore_index=True)
        combined_posts.to_csv(posts_file, index=False)
        print(f"Saved {len(new_posts_df)} new posts. Total posts: {len(combined_posts)}")

    if decorated_data:
        new_decorated_df = pd.DataFrame(decorated_data)
        combined_decorated = pd.concat([existing_decorated, new_decorated_df], ignore_index=True)
        combined_decorated.to_csv(decorated_file, index=False)
        print(f"Saved {len(new_decorated_df)} new decorated threads. Total threads: {len(combined_decorated)}")

def create_forum_corpus(base_path: str, posts_filename: str = 'e9_forum_posts.csv', decorated_filename: str = 'e9_forum_threads_decorated.csv', corpus_filename: str = 'e9_forum_corpus.csv'):
    posts_file = os.path.join(base_path, posts_filename)
    decorated_file = os.path.join(base_path, decorated_filename)
    corpus_file = os.path.join(base_path, corpus_filename)

    posts_df = pd.read_csv(posts_file)
    decorated_df = pd.read_csv(decorated_file)

    aggregated = posts_df.groupby('thread_id')['post_raw'].agg(lambda x: ' '.join(str(i) for i in x)).reset_index()
    aggregated.rename(columns={'post_raw': 'thread_all_posts'}, inplace=True)

    decorated_df['thread_id'] = decorated_df['thread_id'].astype('int64')
    aggregated['thread_id'] = aggregated['thread_id'].astype('int64')

    decorated_df = decorated_df[decorated_df['thread_id'].isin(aggregated['thread_id'])]

    forum_corpus = pd.merge(decorated_df, aggregated, on='thread_id', how='inner')
    forum_corpus.to_csv(corpus_file, index=False)
    print(f"Saved corpus with {len(forum_corpus)} threads to {corpus_file}")

    return forum_corpus

def update_local_corpus(base_path: str, threads_to_add: int = 5, corpus_filename: str = 'e9_forum_corpus.csv'):
    print("\n=== Starting Local Forum Corpus Update ===\n")
    new_thread_ids = create_urls(base_path, threads=threads_to_add)
    fetch_full_thread_data(new_thread_ids, base_path)
    forum_corpus_df = create_forum_corpus(base_path, corpus_filename=corpus_filename)
    print("\n=== Local Forum Corpus Update Complete ===\n")
    return forum_corpus_df


# 5 Orchestration

In [None]:
# ====== START BATCH SCRAPE + BACKGROUND UPLOAD ======

num_batches = 10
threads_per_batch = 10
max_workers_upload = 3

executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers_upload)

for batch_num in range(num_batches):
    print(f"\n=== Starting batch {batch_num + 1} ===\n")

    batch_filename = f"e9_forum_corpus_batch_{batch_num + 1}.csv"
    forum_corpus_df = update_local_corpus(base_path, threads_to_add=threads_per_batch, corpus_filename=batch_filename)

    future = executor.submit(upload_corpus_to_snowflake, base_path, credentials_path, batch_filename)

    def handle_upload_result(fut):
        try:
            fut.result()
        except Exception as e:
            print(f"UPLOAD FAILED for {batch_filename}: {e}")

    future.add_done_callback(handle_upload_result)

executor.shutdown(wait=True)
print("\n=== All scraping and uploads complete ===\n")
