In [11]:
import os
import json
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType
from textblob import TextBlob

# Function to fetch comments for specific videos (once)
def fetch_comments(api_key, video_ids, output_dir):
    base_url = "https://www.googleapis.com/youtube/v3/commentThreads"
    comments = []

    # Fetch comments for each video
    for video_id in video_ids:
        params = {
            "part": "snippet",
            "videoId": video_id,
            "key": api_key,
            "textFormat": "plainText",
            "maxResults": 100  # Adjust as needed
        }
        response = requests.get(base_url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            for item in data.get("items", []):
                comment = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
                comments.append({"video_id": video_id, "comment": comment})
        else:
            print(f"Error fetching comments for video {video_id}: {response.status_code}")

    # Save comments to a file
    file_path = os.path.join(output_dir, "comments.json")
    with open(file_path, "w") as f:
        for comment in comments:
            f.write(json.dumps(comment) + "\n")

# Sentiment analysis function using TextBlob
def analyze_sentiment(comment):
    analysis = TextBlob(comment)
    if analysis.sentiment.polarity > 0:
        return "Positive"
    elif analysis.sentiment.polarity < 0:
        return "Negative"
    else:
        return "Neutral"

# Main function to run sentiment analysis and save results
def run_sentiment_analysis():
    # *** CONFIGURATION PART - AMEND THIS ***
    API_KEY = "AIzaSyAG53ZyTyJpRO9M7lz6RxRRG_gjOKs1M_4"  # Replace with your YouTube API key
    VIDEO_IDS = ["xZbcwi7SfZE", "EkyAuG9RSSU", "CWqSzUNzoUc"]  # Replace with your video IDs
    OUTPUT_DIR = "comments_stream_dir"  # Directory for storing comments file
    # Ensure the output directory exists
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)

    # Fetch comments for the given video IDs
    fetch_comments(API_KEY, VIDEO_IDS, OUTPUT_DIR)

    # Initialize Spark session
    spark = SparkSession.builder.appName("YouTubeCommentsSentimentAnalysis").getOrCreate()

    # Define schema for incoming JSON data
    schema = StructType([
        StructField("video_id", StringType(), True),  # video_id column, string type, nullable
        StructField("comment", StringType(), True)    # comment column, string type, nullable
    ])

    # *** CHECK THIS PATH: Ensure your "comments.json" is correctly located ***
    OUTPUT_DIR = "C:/Users/labadmin/comments_stream_dir"
    comments_file_path = os.path.join(OUTPUT_DIR, "comments.json")
    if os.path.exists(comments_file_path):
        df = spark.read.json(comments_file_path, schema=schema)
    else:
        print(f"No comments file found at {comments_file_path}")
        return

    # Show the DataFrame (verify the data is read)
    df.show()

    # Define UDF for sentiment analysis
    sentiment_udf = udf(analyze_sentiment, StringType())

    # Apply transformations to perform sentiment analysis on comments
    sentiment_df = df.withColumn("sentiment", sentiment_udf(col("comment"))).select("video_id", "comment", "sentiment")

    # *** CHECK THIS OUTPUT PATH: Specify where the sentiment analysis results will be saved ***
    output_path = "C:/Users/labadmin/comments_stream_dir"
    sentiment_df.write.csv(output_path, header=True, mode="overwrite")

    #degugging operation
    
    try:
        sentiment_df.write.csv(output_path, header=True, mode="overwrite")
        print(f"Sentiment analysis results saved to: {output_path}")
    except Exception as e:
        print(f"Error writing results to CSV: {e}")


    # Show results in the console
    sentiment_df.show()

    run_sentiment_analysis()

