Importing necessary libraries

In [11]:
import requests
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import execute_values
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer

# Download required NLTK assets
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)

# Configuration
API_URL = "http://localhost:8000/api/retrieve/cmtsep/en"
DB_CONFIG = {
    "host": "localhost",
    "database": "data_pipeline",
    "user": "djangouser",
    "password": "django123"
}

Fetching Data from Django API

In [12]:
def fetch_english_comments():
    print("[*] Contacting Django API...")
    try:
        response = requests.get(API_URL)
        response.raise_for_status()
        data = response.json()
        df = pd.DataFrame(data)
        
        if df.empty:
            print("[!] No English comments found.")
            return None
            
        # We only want to process rows where cleaned_text is currently empty/None
        # This handles the "prior pre-processed data" requirement
        if 'cleaned_text' in df.columns:
            unprocessed = df[df['cleaned_text'].isna() | (df['cleaned_text'] == "")]
        else:
            unprocessed = df
            
        print(f"[+] Found {len(unprocessed)} new comments to process.")
        return unprocessed
    except Exception as e:
        print(f"[X] API Error: {e}")
        return None

df_raw = fetch_english_comments()

[*] Contacting Django API...
[+] Found 5 new comments to process.


Cleaning & Normalization

In [13]:
def clean_text(text):
    if not text: return ""
    
    # 1. Remove URLs, HTML tags, and non-alphabetic characters
    text = re.sub(r'http\S+|www\S+|<.*?>', '', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    
    # 2. Tokenize and Lowercase
    words = text.lower().split()
    
    # 3. Remove Stopwords and Lemmatize
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    
    # Keep words longer than 2 characters
    cleaned = [lemmatizer.lemmatize(w) for w in words if w not in stop_words and len(w) > 2]
    
    return " ".join(cleaned)

if df_raw is not None:
    print("[*] Running Cleaning Pipeline...")
    df_raw['cleaned_text'] = df_raw['comment'].apply(clean_text)
    # Drop rows that result in empty strings after cleaning
    df_clean = df_raw[df_raw['cleaned_text'] != ""].copy()
    print(f"[+] Cleaning complete. {len(df_clean)} rows ready.")

[*] Running Cleaning Pipeline...
[+] Cleaning complete. 5 rows ready.


Vectorization

In [14]:
if df_raw is not None:
    print("[*] Generating TF-IDF Vectors...")
    vectorizer = TfidfVectorizer(max_df=0.95, min_df=2, stop_words='english')
    tfidf_matrix = vectorizer.fit_transform(df_clean['cleaned_text'])
    
    # Capture feature names (words) for later topic modeling
    feature_names = vectorizer.get_feature_names_out()
    print(f"[+] Vectorization successful. Vocabulary size: {len(feature_names)}")

[*] Generating TF-IDF Vectors...
[+] Vectorization successful. Vocabulary size: 2


Save Results Back to Postgres

In [15]:
# STEP 5: SAVE TO DATABASE
def update_database(df):
    if df.empty: 
        print("[!] No data to save.")
        return
    
    conn = None
    cur = None
    
    print("[*] Connecting to database to save results...")
    try:
        # Using 127.0.0.1 to avoid Windows localhost issues
        conn = psycopg2.connect(
            host="127.0.0.1", 
            database="data_pipeline", 
            user="admin", 
            password="admin" 
        )
        cur = conn.cursor()
        
        # Prepare data for the update
        data_to_update = list(df[['id', 'cleaned_text']].itertuples(index=False, name=None))
        
        # This query updates the existing rows created by Airflow
        update_query = """
            UPDATE airflow.processed_comments 
            SET cleaned_text = data.new_cleaned_text,
                updated_at = CURRENT_TIMESTAMP
            FROM (VALUES %s) AS data (cid, new_cleaned_text)
            WHERE airflow.processed_comments.comment_id = data.cid;
        """
        
        execute_values(cur, update_query, data_to_update)
        
        conn.commit()
        print(f"[!] Successfully updated {len(data_to_update)} rows in 'processed_comments'.")
        
    except Exception as e:
        print(f"[X] Database Error: {e}")
        if conn:
            conn.rollback()
    finally:
        if cur is not None:
            cur.close()
        if conn is not None:
            conn.close()
        print("[*] Database connection closed.")

# Finally, call the function
if 'df_clean' in locals() and df_clean is not None:
    update_database(df_clean)
else:
    print("[X] df_clean not found. Make sure you ran the cleaning cell first!")

[*] Connecting to database to save results...
[!] Successfully updated 5 rows in 'processed_comments'.
[*] Database connection closed.
