## Netflix Reviews ETL and Sentiment Engine
## Author: Oyefejo Olaseni Ifeoluwa
## Objective: Automate the sync, cleaning and categorization of Netflix App reviews.
## Stack: Python (Pandas), SQL (SQLAlchemy), NLP (BERTopic), LLM (Ollama)

In [1]:
#Import necessary librabries
import pandas as pd
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
from kaggle.api.kaggle_api_extended import KaggleApi
from pathlib import Path
import re
from tqdm import tqdm
from bertopic import BERTopic
import joblib  
from sentence_transformers import SentenceTransformer
import ollama

In [3]:
# CONFIGURATION and LOGGING
env_path = Path('.') / '.env'
load_dotenv(dotenv_path=env_path, override=True)
DATASET = "ashishkumarak/netflix-reviews-playstore-daily-updated"
LOCAL_PATH = Path("data")
FILE_NAME = "netflix_reviews.csv"
EXISTING_FILE = "categorized_reviews.csv"
MODEL_PATH = "bertopic_netflix_model.pkl"
UNIQUE_COLS = ['review_id', 'datetime']
CATEGORY_LOG = "discovered_categories.txt"
BATCH_SIZE = 20
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
database = os.getenv("DB_NAME")
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}")

In [5]:
# CORE FUNCTIONS

def sync_data():
    """Fetches the latest dataset from Kaggle via API."""
    LOCAL_PATH.mkdir(exist_ok=True)
    api = KaggleApi()
    api.authenticate()
    print("üì• Downloading latest data from Kaggle...")
    api.dataset_download_files(DATASET, path=LOCAL_PATH, unzip=True, force=True)
    
    df = pd.read_csv(LOCAL_PATH / FILE_NAME)
    return df

In [6]:
def clean_data(df):
    """Performs initial schema standardization and data deduplication."""
    df = df.drop(columns=['reviewCreatedVersion'], errors='ignore')

    # 2. Rename Columns
    custom_names = {
        'reviewId': 'review_id',
        'userName': 'user_name',
        'content': 'review',
        'score': 'rating',
        'thumbsUpCount': 'thumbs_up_count',
        'at': 'datetime',
        'appVersion': 'app_version'
    }
    # Apply custom names
    df = df.rename(columns=custom_names)
    
    #3  fill content NaNs and " " with "No comment"
    df['review'] = df['review'].fillna("No comment").str.strip()
    df.drop_duplicates(subset=UNIQUE_COLS, inplace=True)

    return df


In [7]:
def load_existing():
    if os.path.exists(EXISTING_FILE):
        df = pd.read_csv(EXISTING_FILE, dtype={'review_id': str})
        df['datetime'] = pd.to_datetime(df['datetime'])
        return df
    return pd.DataFrame(columns=UNIQUE_COLS + ['review', 'category'])

In [8]:
def get_topic_model():
    """The NLP Engine: Clusters reviews into topics to be labelled using LLM."""
    if os.path.exists(MODEL_PATH):
        return joblib.load(MODEL_PATH)
    
    print("‚ú® Initializing BERTopic Engine...")
    embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
    
    topic_model = BERTopic(
        embedding_model=embedding_model,
        min_topic_size=200,
        verbose=True
    )
    return topic_model

In [9]:
def ask_ollama_for_label(keywords):
    """Uses Local LLM (Phi-3) to generate human-readable category names."""
    client = ollama.Client(host='http://127.0.0.1:11434')
    prompt = f"""
    Analyze these Netflix review keywords: {", ".join(keywords)}
    
    Instruction: Provide a highly specific 2-3 word category name.
    STRICT RULE: Do NOT use the words 'General', 'Feedback', 'Netflix', or 'Review'.
    Example: Instead of 'General Issues', use 'Streaming Quality' or 'Login Errors'.
    
    Return ONLY the category name.No full sentences. No explanations
    """
    try:
        response = client.generate(model='phi3:mini', prompt=prompt, stream=False)
        label = response.get('response', '').strip().title()
        
        return label if label else "Niche Issue"
    except Exception as e:
        print(f"‚ö†Ô∏è Ollama Error: {e}")
        return "Unclassified Issue"

In [10]:
def clean_for_sql(df):
    def strip_symbols(text):
        # Removes leading/trailing symbols but keeps them in the middle
        clean_text = re.sub(r'^[\s\-"\'\.]+|[\s\-"\'\.]+$', '', str(text))
        return clean_text.strip().title()

    if 'category' in df.columns:
        df['category'] = df['category'].apply(strip_symbols)

    return df

In [11]:
def main():
    # 1. Sync and Clean the Raw batch
    df_raw = sync_data()
    df_raw = clean_data(df_raw)

    # 2. Standardize Raw Kaggle Data (YYYY-MM-DD)
    df_raw['datetime'] = pd.to_datetime(df_raw['datetime']).dt.strftime('%Y-%m-%d %H:%M')

    # 2. Compare against existing data
    df_existing = load_existing()
    
    if not df_existing.empty:
        # Standardize your CSV data (DD/MM/YYYY -> YYYY-MM-DD)
        # We use dayfirst=True because your CSV stores it as 16/01/2026
        df_existing['datetime'] = pd.to_datetime(
            df_existing['datetime'], 
            dayfirst=True, 
            errors='coerce'
        ).dt.strftime('%Y-%m-%d %H:%M')
        
        # We perform a 'left merge' on both unique identifiers
        # This aligns df_raw against df_existing based on ID and Timestamp
        df_merge = pd.merge(
            df_raw, 
            df_existing[UNIQUE_COLS], 
            on=UNIQUE_COLS, 
            how='left', 
            indicator=True
        )
        
        df_to_process = df_merge[df_merge['_merge'] == 'left_only'].drop(columns=['_merge']).copy()
    else:
        df_to_process = df_raw.copy()
        
    # ADD THIS LINE FOR TESTING:
    #df_to_process = df_to_process.sample(1000)  # Just take 1030 sample rows
    #print(f"Test mode: Processing only {len(df_to_process)} rows.")

    if df_to_process.empty:
        print("‚úÖ Everything is up to date.")
        return

    print(f"üöÄ Processing {len(df_to_process)} reviews")
    topic_model = get_topic_model()

    # Process texts
    docs = df_to_process['review'].tolist()

    if os.path.exists(MODEL_PATH):
        # Apply pre-existing categories to new rows
        topics, _ = topic_model.transform(docs)
    else:
        # First-time run: Fit the model to create categories
        topics, _ = topic_model.fit_transform(docs)
        # Only reduce if -1 exists in topics
        if -1 in topics:
            print("üßπ Reducing outliers to ensure 100% categorization...")
            topics = topic_model.reduce_outliers(docs, topics, strategy="embeddings")
        else:
            print("‚ú® No outliers found in this sample. Skipping reduction.")
        joblib.dump(topic_model, MODEL_PATH)

    # label the topics
    topic_info = topic_model.get_topic_info()
    custom_labels = {}

    print("üè∑Ô∏è Asking Ollama to name the discovered topics...")
    # 4. CONSISTENT LABELING
    if not hasattr(topic_model, "custom_labels_") or not topic_model.custom_labels_:
        topic_model.custom_labels_ = {} 
        topic_info = topic_model.get_topic_info()

        print("üè∑Ô∏è Generating INITIAL labels with Ollama...")
        for _, row in tqdm(topic_info.iterrows(), total=len(topic_info), desc="Labeling"):
            topic_num = row['Topic']
            if topic_num == -1:
                topic_model.custom_labels_[topic_num] = "Outliers/Miscellaneous"
            else:
                # Extract keywords for this specific topic
                keywords = [word for word, _ in topic_model.get_topic(topic_num)]
                # Call Ollama to turn keywords into a readable name
                topic_model.custom_labels_[topic_num] = ask_ollama_for_label(keywords)
        
        # Re-save model with the new labels included
        joblib.dump(topic_model, MODEL_PATH)

    # 5. Map the topics to the final Category column
    df_to_process['category'] = [topic_model.custom_labels_.get(t, "Miscellaneous") for t in topics]

    #6. rid category colomn of symbols and empty spaces
    if not df_to_process.empty:
        df_to_process['datetime'] = pd.to_datetime(df_to_process['datetime'], format='mixed', errors='coerce')
        df_to_process['unique_ref_id'] = (df_to_process['review_id'].astype(str) + "_" + df_to_process['datetime'].dt.strftime('%Y%m%d%H%M'))
        df_to_process = df_to_process.drop_duplicates(subset=['unique_ref_id'], keep='first')
        
        # Apply cleaning
        df_to_process = clean_for_sql(df_to_process)

    # 7. Save and Append
    file_exists = os.path.isfile(EXISTING_FILE)
    df_to_process.to_csv(EXISTING_FILE, mode='a', header=not file_exists, index=False)
    print(f"‚úÖ Success! Data saved to {EXISTING_FILE}")

    # 7. Send to SQL
    df_to_process.to_sql(
    name="data",
    con=engine,
    if_exists="append",   # or "replace"
    index=False
)
    print(f"Data successfully loaded into database '{database}'")

In [13]:
main()

üì• Downloading latest data from Kaggle...
Dataset URL: https://www.kaggle.com/datasets/ashishkumarak/netflix-reviews-playstore-daily-updated
üöÄ Processing 122 reviews


Batches:   0%|          | 0/4 [00:00<?, ?it/s]

üè∑Ô∏è Asking Ollama to name the discovered topics...
‚úÖ Success! Data saved to categorized_reviews.csv
Data successfully loaded into database 'netflix_reviews'
