In [None]:
import os
from typing import List
import pandas as pd
import numpy as np

from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound

from transformers import pipeline
from sklearn.metrics import classification_report
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score



In [None]:
# Set up YouTube API key
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
if not YOUTUBE_API_KEY:
    raise ValueError("Set YOUTUBE_API_KEY env variable before running.")

youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)


In [None]:
# PART 1: SEARCH VIDEOS 
def search_youtube(query: str, max_results: int = 50) -> List[str]:
    video_ids = []
    next_page_token = None
    while len(video_ids) < max_results:
        request = youtube.search().list(
            q=query,
            part="id",
            maxResults=min(50, max_results - len(video_ids)),
            pageToken=next_page_token,
            type="video"
        )
        response = request.execute()
        video_ids.extend(item["id"]["videoId"] for item in response["items"])
        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break
    return video_ids[:max_results]


In [None]:
# PART 2: GET TRANSCRIPTS AND COMMENTS
def get_transcript(video_id: str) -> str:
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return " ".join([entry['text'] for entry in transcript])
    except (TranscriptsDisabled, NoTranscriptFound):
        return ""
    except Exception as e:
        print(f"Error fetching transcript for {video_id}: {e}")
        return ""

def get_comments(video_id: str, max_comments: int = 100) -> List[str]:
    comments = []
    next_page_token = None
    while len(comments) < max_comments:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=min(100, max_comments - len(comments)),
            pageToken=next_page_token,
            textFormat="plainText"
        )
        response = request.execute()
        comments.extend(item["snippet"]["topLevelComment"]["snippet"]["textDisplay"] for item in response["items"])
        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break
    return comments



In [None]:
# PART 3: BUILD DATASET
def build_dataset(query: str, n_videos: int = 50) -> pd.DataFrame:
    video_ids = search_youtube(query, n_videos)
    data = []
    for i, vid in enumerate(video_ids, 1):
        print(f"[{i}/{n_videos}] Fetching video ID: {vid}")
        transcript = get_transcript(vid)
        comments = get_comments(vid, max_comments=100)
        data.append({
            "video_id": vid,
            "transcript": transcript,
            "comments": " ".join(comments),
            "source": query
        })
    return pd.DataFrame(data)

In [None]:
# PART 4: SENTIMENT ANALYSIS
sentiment_model = pipeline("sentiment-analysis")

def analyze_sentiment(text: str) -> str:
    if not text.strip():
        return "NEUTRAL"
    # Truncate long texts for performance; adjust length as needed
    try:
        result = sentiment_model(text[:512])[0]
        label = result['label']
        # Convert to standardized label
        if label == "POSITIVE":
            return "POSITIVE"
        elif label == "NEGATIVE":
            return "NEGATIVE"
        else:
            return "NEUTRAL"
    except Exception as e:
        print(f"Sentiment error: {e}")
        return "NEUTRAL"

def analyze_dataset(df: pd.DataFrame) -> pd.DataFrame:
    print("Analyzing transcript sentiment...")
    df['transcript_sentiment'] = df['transcript'].apply(analyze_sentiment)
    print("Analyzing comment sentiment...")
    df['comment_sentiment'] = df['comments'].apply(analyze_sentiment)
    return df


In [None]:
# PART 5: EVALUATION
def evaluate_sentiment(df: pd.DataFrame, col_pred: str, col_true: str):
    print("\nClassification Report:")
    print(classification_report(df[col_true], df[col_pred], zero_division=0))



In [None]:
# PART 6: CLUSTERING TO IDENTIFY TOPICS
def cluster_texts(df: pd.DataFrame, text_column: str = "comments", n_clusters: int = 5):
    vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)
    X = vectorizer.fit_transform(df[text_column].fillna(""))

    print("Clustering texts with KMeans...")
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(X)
    df['cluster'] = clusters

    score = silhouette_score(X, clusters)
    print(f"Silhouette Score: {score:.3f}")
    return df



In [None]:
# PART 7: MAIN EXECUTION
if __name__ == "__main__":
    QUERY = "car advertisement"
    N_VIDEOS = 50

    # Step 1: Build dataset from YouTube
    df = build_dataset(QUERY, N_VIDEOS)

    # Step 2: Analyze sentiment on transcripts and comments
    df = analyze_dataset(df)

    # Step 3: Evaluate comment sentiment using transcript sentiment as proxy "ground truth"
    evaluate_sentiment(df, col_pred='comment_sentiment', col_true='transcript_sentiment')

    # Step 4: Cluster comments to find dominant topics
    df = cluster_texts(df, text_column="comments", n_clusters=5)

    # Step 5: Save results
    output_file = "youtube_car_ads_sentiment.csv"
    df.to_csv(output_file, index=False)
    print(f"\nSaved results to {output_file}")