# LLM-powered Social Listening System - Pipeline Runner

This notebook runs the complete data pipeline in Google Colab with GPU support.

## Steps:
1. Install dependencies
2. Set up AWS credentials
3. Fetch data from Athena
4. Run sentiment analysis (Stage 1)
5. Run aspect analysis (Stage 2)
6. Generate themes parquet (Stage 3)
7. Download results


## Step 1: Setup and Install Dependencies


In [None]:
# Mount Google Drive (optional - to save files)
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# Install required packages
!pip install -q boto3 pandas pyathena pyarrow python-dotenv transformers sentence-transformers scikit-learn torch torchvision torchaudio


In [None]:
# Check GPU availability
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA Version: {torch.version.cuda}")
else:
    print("‚ö†Ô∏è No GPU detected. Enable GPU: Runtime > Change runtime type > GPU")


## Step 2: Set Up AWS Credentials


In [None]:
# Set up AWS credentials
import os

# ‚ö†Ô∏è SECURITY: Use Colab Secrets or environment variables - DO NOT hardcode credentials!
# Option 1: Use Colab Secrets (Recommended)
# from google.colab import userdata
# os.environ['AWS_ACCESS_KEY_ID'] = userdata.get('AWS_ACCESS_KEY_ID')
# os.environ['AWS_SECRET_ACCESS_KEY'] = userdata.get('AWS_SECRET_ACCESS_KEY')

# Option 2: Set manually (ONLY for local testing, remove before committing)
# os.environ['AWS_ACCESS_KEY_ID'] = 'YOUR_ACCESS_KEY_HERE'
# os.environ['AWS_SECRET_ACCESS_KEY'] = 'YOUR_SECRET_KEY_HERE'

# Required AWS configuration
os.environ['AWS_REGION'] = 'us-east-1'
os.environ['AWS_SESSION_TOKEN'] = ''  # Leave empty if not using temporary credentials

# Athena configuration
os.environ['ATHENA_SCHEMA'] = 'cs668_capstone'
os.environ['ATHENA_WORKGROUP'] = 'primary'
os.environ['ATHENA_STAGING_DIR'] = 's3://capstone-transformed-twitterdata-cs668/query_results/'
# Increase result size from 10k to 15k
os.environ['ATHENA_SQL'] = 'SELECT DISTINCT * FROM tweet_clean LIMIT 15000'

# Verify credentials are set
if 'AWS_ACCESS_KEY_ID' in os.environ and 'AWS_SECRET_ACCESS_KEY' in os.environ:
    print("‚úÖ AWS credentials configured")
else:
    print("‚ö†Ô∏è  Warning: AWS credentials not set. Please configure them above.")


## Step 3: Create Project Structure


In [None]:
# Create necessary directories
import os
os.makedirs('data', exist_ok=True)
print("‚úÖ Directory structure created")


## Step 4: Fetch Data from AWS Athena


In [None]:
from pyathena import connect
import pandas as pd
import os

# Create connection
conn = connect(
    s3_staging_dir=os.getenv('ATHENA_STAGING_DIR'),
    region_name=os.getenv('AWS_REGION'),
    work_group=os.getenv('ATHENA_WORKGROUP'),
    schema_name=os.getenv('ATHENA_SCHEMA'),
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    aws_session_token=os.getenv('AWS_SESSION_TOKEN') or None,
)

# Execute query
print(f"üìä Executing query: {os.getenv('ATHENA_SQL')}")
df = pd.read_sql(os.getenv('ATHENA_SQL'), conn)
print(f"‚úÖ Retrieved {len(df):,} rows, {len(df.columns)} columns")
print(f"\nColumns: {', '.join(df.columns[:10])}{'...' if len(df.columns) > 10 else ''}")
print(f"\nFirst few rows:")
df.head()


In [None]:
# Save Stage 0 parquet
df.to_parquet('data/tweets_stage0_raw.parquet', index=False)
print(f"‚úÖ Saved Stage 0: data/tweets_stage0_raw.parquet ({len(df):,} rows)")


## Step 5: Stage 1 - Sentiment Analysis (GPU Accelerated)


In [None]:
# Load Stage 0 data
import pandas as pd
df = pd.read_parquet('data/tweets_stage0_raw.parquet')
print(f"üìä Loaded {len(df):,} rows")

# Run sentiment analysis
from transformers import pipeline
import time

device = 0 if torch.cuda.is_available() else -1
print(f"Using device: {'GPU' if device == 0 else 'CPU'}")

text_col = 'clean_tweet' if 'clean_tweet' in df.columns else ('text' if 'text' in df.columns else df.columns[2])
print(f"Using text column: {text_col}")

sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model="cardiffnlp/twitter-roberta-base-sentiment-latest",
    device=device,
    return_all_scores=True
)

texts = df[text_col].astype(str).tolist()
batch_size = 32 if device == 0 else 8

labels = []
scores = []
t0 = time.time()

for i in range(0, len(texts), batch_size):
    if i % (batch_size * 50) == 0:
        print(f"Progress: {i}/{len(texts)} ({100*i/len(texts):.1f}%) | Elapsed: {time.time()-t0:.1f}s")
    batch = texts[i:i+batch_size]
    results = sentiment_pipeline(batch, truncation=True, max_length=256)
    for row in results:
        best = max(row, key=lambda x: x['score'])
        labels.append(best['label'].lower())
        scores.append(best['score'])

df['sentiment_label'] = labels
df['sentiment_score'] = scores

print(f"\n‚úÖ Sentiment analysis complete!")
print(f"\nSentiment distribution:")
print(df['sentiment_label'].value_counts())

# Save Stage 1
df.to_parquet('data/tweets_stage1_sentiment.parquet', index=False)
print(f"\n‚úÖ Saved Stage 1: data/tweets_stage1_sentiment.parquet")


In [None]:
# Load Stage 1 data
df = pd.read_parquet('data/tweets_stage1_sentiment.parquet')
print(f"üìä Loaded {len(df):,} rows")

# Aspect analysis
from transformers import pipeline
import numpy as np

aspects = ["pricing", "delivery", "returns", "staff", "app/ux"]
device = 0 if torch.cuda.is_available() else -1

aspect_pipeline = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    device=device,
)

text_col = 'clean_tweet' if 'clean_tweet' in df.columns else ('text' if 'text' in df.columns else df.columns[2])
texts = df[text_col].astype(str).tolist()
batch_size = 24 if device == 0 else 8

scores_per_aspect = {a: [] for a in aspects}
t0 = time.time()

for i in range(0, len(texts), batch_size):
    if i % (batch_size * 20) == 0:
        print(f"Progress: {i}/{len(texts)} ({100*i/len(texts):.1f}%) | Elapsed: {time.time()-t0:.1f}s")
    batch = texts[i:i+batch_size]
    preds = aspect_pipeline(batch, candidate_labels=aspects, multi_label=True, truncation=True)
    if isinstance(preds, dict):
        preds = [preds]
    for p in preds:
        l2s = dict(zip(p['labels'], p['scores']))
        for a in aspects:
            scores_per_aspect[a].append(float(l2s.get(a, 0.0)))

for a in aspects:
    df[f'aspect_{a.replace("/", "_")}'] = scores_per_aspect[a]

arr = df[[f'aspect_{a.replace("/", "_")}' for a in aspects]].values
idxmax = arr.argmax(axis=1)
maxval = arr.max(axis=1)
dom = np.where(maxval >= 0.5, np.array([a.replace("/", "_") for a in aspects])[idxmax], 'none')
df['aspect_dominant'] = dom

print(f"\n‚úÖ Aspect analysis complete!")
print(f"\nAspect distribution:")
print(df['aspect_dominant'].value_counts())

# Save Stage 2
df.to_parquet('data/tweets_stage2_aspects.parquet', index=False)
print(f"\n‚úÖ Saved Stage 2: data/tweets_stage2_aspects.parquet")


In [None]:
# Load Stage 2 data
df = pd.read_parquet('data/tweets_stage2_aspects.parquet')
print(f"üìä Loaded {len(df):,} rows")

# Theme generation
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import normalize
import numpy as np

text_col = 'clean_tweet' if 'clean_tweet' in df.columns else ('text' if 'text' in df.columns else df.columns[2])
texts = df[text_col].astype(str).tolist()

# Create embeddings (TF-IDF)
print("Creating embeddings...")
vectorizer = TfidfVectorizer(max_features=3000, stop_words='english', ngram_range=(1, 2))
embeddings = vectorizer.fit_transform(texts).astype('float32')
embeddings = normalize(embeddings)

# Clustering
print("Clustering...")
n_clusters = 6
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=3)
df['theme'] = kmeans.fit_predict(embeddings)

print(f"\n‚úÖ Theme clustering complete!")
print(f"\nTheme distribution:")
print(df['theme'].value_counts().sort_index())

# Save Stage 3 (THIS IS WHAT YOUR SERVER NEEDS!)
df.to_parquet('data/tweets_stage3_themes.parquet', index=False)
print(f"\n‚úÖ Saved Stage 3: data/tweets_stage3_themes.parquet")
print(f"\nüéâ This is the file your server needs!")


## Step 8: Download Results


In [None]:
from google.colab import files
import os

# Download the important parquet files
files_to_download = [
    'data/tweets_stage3_themes.parquet',  # Most important - for server
    'data/tweets_stage2_aspects.parquet',
    'data/tweets_stage1_sentiment.parquet',
    'data/tweets_stage0_raw.parquet',
]

for file in files_to_download:
    if os.path.exists(file):
        print(f"üì• Downloading {file}...")
        files.download(file)
    else:
        print(f"‚ö†Ô∏è  {file} not found")

print("\n‚úÖ All files downloaded!")
print("\nüìã Next steps:")
print("   1. Upload tweets_stage3_themes.parquet to your server's data/ directory")
print("   2. Restart your server")
print("   3. Test the /api/themes/:id/tweets endpoint")
