In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --upgrade transformers




In [None]:
!pip install findspark pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, udf, regexp_replace, from_json, substring
from pyspark.sql.types import StringType
import re
from pyspark.sql.streaming import DataStreamReader
import json
import torch
from transformers import pipeline
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType
import os
from googleapiclient.discovery import build
import datetime
import html
import unicodedata
import time
import threading

In [None]:
stop_flag = False

In [None]:
def stop_streaming():
    global stop_flag
    stop_flag = True

In [None]:
def clean_comment(comment):
    comment = html.unescape(comment)
    comment = unicodedata.normalize('NFKD', comment).encode('ascii', 'ignore').decode('utf-8', 'ignore')
    return comment

def stream_comments(api_key, video_id, save_dir="streaming_data", batch_size=2, sleep_time=5):
    youtube = build('youtube', 'v3', developerKey=api_key)
    os.makedirs(save_dir, exist_ok=True)
    seen_comment_ids = set()

    next_page_token = None  # Initialize the page token for pagination

    while not stop_flag:
        try:
            request = youtube.commentThreads().list(
                part="snippet",
                videoId=video_id,
                maxResults=2,  # You can increase this if needed
                order="time",
                pageToken=next_page_token  # Use the pageToken to get the next page
            )
            response = request.execute()

            new_comments = []
            for item in response['items']:
                comment_id = item['id']
                if comment_id in seen_comment_ids:
                    continue
                seen_comment_ids.add(comment_id)

                comment = item['snippet']['topLevelComment']['snippet']['textDisplay']
                clean_cmt = clean_comment(comment)
                new_comments.append({"comment": clean_cmt})

                if len(new_comments) >= batch_size:
                    break

            if new_comments:
                timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                file_path = os.path.join(save_dir, f"batch_{timestamp}.json")
                with open(file_path, "w", encoding='utf-8') as f:
                    for c in new_comments:
                        json.dump(c, f, ensure_ascii=False)
                        f.write("\n")
                print(f"Saved {len(new_comments)} comments to {file_path}")

            # Get the next page token for pagination (if there is more data)
            next_page_token = response.get('nextPageToken')

            # If no nextPageToken is available, break the loop and stop
            if not next_page_token:
                print("No more comments to fetch.")
                break

            time.sleep(sleep_time)
        except Exception as e:
            print(f"Error during streaming: {e}")
            time.sleep(30)

In [None]:
def get_sentiment(comment):
    if isinstance(comment, Row):
        comment_text = comment.comment
    elif isinstance(comment, str):
        comment_text = comment
    else:
        return "UNKNOWN"

    if comment_text is None:
        return "UNKNOWN"

    max_length = 512
    if len(comment_text) > max_length:
        comment_text = comment_text[:max_length]

    result = classifier(comment_text)
    return result[0]['label']

In [None]:
spark = SparkSession.builder.appName("StreamingComments").getOrCreate()

In [None]:
classifier = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Device set to use cpu


In [None]:
comments = ["This is a great video!", "I hate this", "Amazing content", "Terrible"]
predictions = classifier(comments)
for comment, prediction in zip(comments, predictions):
    print(f"Comment: {comment} => Sentiment: {prediction['label']}, Score: {prediction['score']}")

Comment: This is a great video! => Sentiment: POSITIVE, Score: 0.9998794794082642
Comment: I hate this => Sentiment: NEGATIVE, Score: 0.9996224641799927
Comment: Amazing content => Sentiment: POSITIVE, Score: 0.9998816251754761
Comment: Terrible => Sentiment: NEGATIVE, Score: 0.999752938747406


In [None]:
schema = StructType().add("comment", StringType())

In [None]:
input_path = "/content/drive/MyDrive/ds200/streaming_data/"

In [None]:
streaming_data_raw = (
    spark.readStream
    .schema(schema)
    .json(input_path)
)

In [None]:
query = (
    streaming_data_raw.writeStream
    .queryName("comments")
    .format("memory")
    .outputMode("append")
    .trigger(processingTime='5 second')
    .start()
)

In [None]:
display(spark.sql("SELECT * FROM comments").show(truncate=False))

+-------+
|comment|
+-------+
+-------+



None

In [None]:
get_sentiment_udf = udf(get_sentiment, StringType())

In [None]:
stop_flag = False
api_key = "" # Your YouTube Data API key
video_id = "" # Your YouTube video ID

save_dir = "" # Directory to save the streamed comments

stream_thread = threading.Thread(
    target=stream_comments,
    args=(api_key, video_id, save_dir),
    daemon=True
)

In [None]:
stream_thread.start()

streaming_data_with_sentiment = (
    streaming_data_raw
    .withColumn("sentiment", get_sentiment_udf("comment"))
    .writeStream
    .queryName("__comments_with_sentiment")
    .format("memory")
    .outputMode("append")
    .trigger(processingTime='1 seconds')
    .start()
)

try:
    while True:
        time.sleep(5)
        df = spark.sql("SELECT * FROM __comments_with_sentiment")
        # if df.count() > 0:
        #     display(df.show(truncate=False))
        df.show(n=500)
        print(df.count())
except KeyboardInterrupt:
    print("Stopped by user")
    print("Stopping the streaming query...")
    streaming_data_with_sentiment.stop()
    print("Query stopped.")
    print("Stopping the streaming thread...")
    stop_flag = True
    stream_thread.join()
    print("Streaming thread stopped.")

finally:
    streaming_data_with_sentiment

Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082400.json
+-------+---------+
|comment|sentiment|
+-------+---------+
+-------+---------+

Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082405.json
0
Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082410.json
+-------+---------+
|comment|sentiment|
+-------+---------+
+-------+---------+

0
Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082415.json
+-------+---------+
|comment|sentiment|
+-------+---------+
+-------+---------+

0
Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082421.json
+-------+---------+
|comment|sentiment|
+-------+---------+
+-------+---------+

0
Saved 2 comments to /content/drive/MyDrive/ds200/streaming_data/batch_20250429_082426.json
+--------------------+---------+
|             comment|sentiment|
+--------------------+---------+
|SUBSCRIBE NOW I S...| 

In [None]:
streaming_data_with_sentiment.stop()