In [87]:
#Required Installs

#%pip install --upgrade google-api-python-client -q
#%pip install pymongo cassandra-driver -q

In [88]:
import os
import pyspark
import googleapiclient.discovery
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# MONGO CONFIGURATION
mongo_uri = "mongodb://admin:mongopw@mongo:27017/demo.feedback?authSource=admin"

# CASSANDRA CONFIGURATION
cassandra_host = "cassandra"

#NEO4J CONFIGURATION
bolt_url = "bolt://neo4j:7687"

# Spark init
spark = SparkSession.builder \
    .master("local") \
    .appName('jupyter-pyspark') \
    .config("spark.mongodb.input.uri", mongo_uri) \
    .config("spark.mongodb.output.uri", mongo_uri) \
    .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
    .config("spark.cassandra.connection.host", cassandra_host) \
    .config("spark.jars.packages","com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0")\
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("Done")

Done


In [89]:
#YOUTUBE API CALLS AND MONGO

API_KEY = "AIzaSyDsMwmQeItUE4T4Stzq6mYTxelrdOaUL_8"

youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=API_KEY)

# Use Youtube API to get video title from video id
def get_video_title(video_id):
    try:

        response = youtube.videos().list(
            part='snippet',
            id=video_id
        ).execute()

        # Get video title from response
        video_title = response['items'][0]['snippet']['title']

        return video_title

    except Exception as e:
        print(f"Error fetching video title for {video_id}: {str(e)}")
        return None


# API call to Youtube to get 100 of the most recent comments and related data from a video
def get_youtube_comment_data(video_id):
    
    max_results = 100
    comment_list = []

    try:
        video_title = get_video_title(video_id)

        # Loop through pages to get 100 comments (only does about 20 at a time)
        while len(comment_list) < max_results:
            request = youtube.commentThreads().list(
                part="snippet,replies",
                maxResults=min(100, max_results - len(comment_list)),
                textFormat="plainText",
                videoId=video_id,
                pageToken=None if not comment_list else comment_list[-1].get("nextPageToken"),
                prettyPrint=True
            )

            # Send request
            response = request.execute()

            # Iterate through comments and extract relevant information
            comment_list.extend(
                {
                    "_id": item["id"],
                    "video_id": item["snippet"]["videoId"],
                    "video_title": video_title,
                    "author_display_name": item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
                    "text_original": item["snippet"]["topLevelComment"]["snippet"]["textOriginal"],
                    "like_count": item["snippet"]["topLevelComment"]["snippet"]["likeCount"],
                    "repliesCount": item["snippet"]["totalReplyCount"],
                    "datetime_posted": item["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
                }
                for item in response.get("items", [])
            )

    except Exception as e:
        print(f"An error occurred: {str(e)}")

    return comment_list



# Youtube IDs of videos we want to get comments from
video_list = ["gir8BEqAutk", "mvVBuG4IOW4", "lUvBk4owRNU"]

# Define a lambda function to process each video_id, NO FOR LOOP!
process_video = lambda video_id: spark.createDataFrame(get_youtube_comment_data(video_id)) \
    .write.format("mongo") \
    .mode("append") \
    .option("replaceDocument", "false") \
    .option("database", "youtube_comments") \
    .option("collection", "video_comments") \
    .save()

# Apply function to each video_id
list(map(process_video, video_list))

print("Done")

Done


In [90]:
df = spark.read \
    .format("mongo") \
    .option("uri", "mongodb://admin:mongopw@mongo:27017/") \
    .option("database", "youtube_comments") \
    .option("collection", "video_comments") \
    .option("authSource", "admin") \
    .load()

print(f"{df.count()} comments accross {len(video_list)} videos.")

543 comments accross 3 videos.


# CASSANDRA <br>
Unlike the Mongo database, this cassandra database is suppose to be ephemeral. <br>
The tables are dropped and created each run so that this notebook can run multiple times.

In [99]:
# CQL statements
create_keyspace_cassandra_sql = """
    CREATE KEYSPACE IF NOT EXISTS youtube_comments
    WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
    """

create_table_cassandra_sql = """
    CREATE TABLE IF NOT EXISTS youtube_comments.video_comments
    (
        id text,
        author_display_name text,
        datetime_posted timestamp,
        like_count bigint,
        repliescount bigint,
        text_original text,
        video_id text,
        video_title text,
        PRIMARY KEY (video_title, datetime_posted)
    );
    """

# Cassandra connection setup
with Cluster([cassandra_host]) as cluster:
    session = cluster.connect()

    # Create the keyspace if it doesn't exist
    session.execute(create_keyspace_cassandra_sql)

    # Use keyspace
    session.execute("USE youtube_comments;")   

    # Create the table if it doesn't exist
    session.execute(create_table_cassandra_sql)


In [92]:
cassandra_comments = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(**cassandra_options) \
    .load()

cassandra_comments.createOrReplaceTempView("blank_space_comments")

blank_space_query_1 = """
    SELECT 
        video_title,
        datetime_posted,
        text_original
    FROM blank_space_comments
    WHERE video_title = "Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)"
    ORDER BY datetime_posted DESC;
    """

blank_space_query_2 = """
    SELECT 
        video_title,
        like_count,
        text_original
    FROM blank_space_comments
    WHERE video_title = "Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)"
    ORDER BY like_count DESC;
    """

blank_space_result_1 = spark.sql(blank_space_query_1)

blank_space_result_2 = spark.sql(blank_space_query_2)

# The query executes on cassandra, not spark (proof)
spark.sql(blank_space_query_1).explain()

== Physical Plan ==
*(1) Sort [datetime_posted#1830 DESC NULLS LAST], true, 0
+- *(1) Project [video_title#1829, datetime_posted#1830, text_original#1835]
   +- BatchScan[video_title#1829, datetime_posted#1830, text_original#1835] Cassandra Scan: youtube_comments.video_comments
 - Cassandra Filters: [["video_title" = ?, Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)]]
 - Requested Columns: [video_title,datetime_posted,text_original]




### Show most recent comments on the Blank Space video

In [93]:
blank_space_result_1.toPandas().head(5)

Unnamed: 0,video_title,datetime_posted,text_original
0,Taylor Swift - Blank Space (Taylor's Version) ...,2023-11-29 16:50:20,I still need a new mv
1,Taylor Swift - Blank Space (Taylor's Version) ...,2023-11-29 14:48:50,This is a great video
2,Taylor Swift - Blank Space (Taylor's Version) ...,2023-11-29 05:16:26,"What's up, people, Stabat's back"
3,Taylor Swift - Blank Space (Taylor's Version) ...,2023-11-29 01:06:34,I can’t get this song out of my head it’s so a...
4,Taylor Swift - Blank Space (Taylor's Version) ...,2023-11-28 20:59:49,...👍🏾🙏👍🏾✨✨✨BLANK SPACE... WOW ...✨✨✨👍🏾🙏👍🏾...


### Show Top liked comments on the Blank Space video

In [94]:
blank_space_result_2.toPandas().head(5)

Unnamed: 0,video_title,like_count,text_original
0,Taylor Swift - Blank Space (Taylor's Version) ...,8,"A VOZ DESSA MULHER É VICIANTE, AS MÚSICAS SÃO!..."
1,Taylor Swift - Blank Space (Taylor's Version) ...,4,Top show gostei linda voz linda ❤😮😊
2,Taylor Swift - Blank Space (Taylor's Version) ...,3,"Haha she changed the "" Starbucks lovers"" 😂❤"
3,Taylor Swift - Blank Space (Taylor's Version) ...,3,I love the spareness of this version. Taylor's...
4,Taylor Swift - Blank Space (Taylor's Version) ...,3,🙌🏽🙌🏽🙌🏽🙌🏽🙌🏽🙌🏽✨✨✨✨


# neo4j Data Preparation <br>
At the time of writing, the full dataset contains 541 rows. This is too much to attempt to display in neo4j.  <br>
This next section filters the full spark dataframe, df, to only include authors who have commented on at least 2 of the 3 videos. 

In [95]:
#NEO4J

# Create a new DataFrame without 'text_original' column
df_without_text = df.drop('text_original')

# Get distinct author_display_name and video_title combinations
distinct_authors_videos = df_without_text.select('author_display_name', 'video_title').distinct()

# Count the number of unique videos each author has commented on
window_spec = Window.partitionBy('author_display_name').orderBy('video_title')
author_video_counts = distinct_authors_videos.select(
    'author_display_name',
    'video_title',
    F.row_number().over(window_spec).alias('comment_count')
).filter(F.col('comment_count') >= 2)

# Filter the original DataFrame based on selected authors
filtered_authors = author_video_counts.select('author_display_name').distinct()
df_2vid_comment_authors = df.join(filtered_authors, 'author_display_name', 'inner').select('author_display_name', 'video_title', 'text_original')

# Show the result
df_2vid_comment_authors.toPandas()

                                                                                

Unnamed: 0,author_display_name,video_title,text_original
0,@bangtanstarsv7,Taylor Swift - Shake It Off (Taylor's Version)...,Revivió la música
1,@bangtanstarsv7,Taylor Swift - Bad Blood (Taylor's Version) (L...,BAD BLOOOD
2,Anthony Courteaux,Taylor Swift - Blank Space (Taylor's Version) ...,Ok my love Queen 👑 king 👑
3,Anthony Courteaux,Taylor Swift - Blank Space (Taylor's Version) ...,Biggest terriost in the United States
4,Anthony Courteaux,Taylor Swift - Blank Space (Taylor's Version) ...,You know what i want my love Queen 👑 king 👑💖
5,Anthony Courteaux,Taylor Swift - Blank Space (Taylor's Version) ...,Me and the Queen 👑 king 👑 Jesus ❤️ want Elon Musk
6,Anthony Courteaux,Taylor Swift - Shake It Off (Taylor's Version)...,Ok I quit playing around my love Queen 👑 king ...
7,ender,Taylor Swift - Shake It Off (Taylor's Version)...,"HER LAUGH AT ""i go on too many dates"" 😭💖"
8,ender,Taylor Swift - Bad Blood (Taylor's Version) (L...,she's so emo i love it
9,flyinpengwin,Taylor Swift - Blank Space (Taylor's Version) ...,She’s my idol I love her so much like if u agr...


In [96]:
df_n4j = df_2vid_comment_authors.select("video_title", "author_display_name", "text_original")


cql_query = """
MERGE (df_n4j:Author {author: event.author_display_name})
"""

df_n4j.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", bolt_url) \
    .option("query", cql_query) \
    .save()

                                                                                

In [97]:
cql_query = """
MERGE (df_n4j:Video{title:event.video_title})
"""
df_n4j.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", bolt_url) \
    .option("query", cql_query) \
    .save()

                                                                                

In [98]:
cql_query = """
match (v:Video{title:event.video_title}), (a:Author {author: event.author_display_name})
MERGE (a)-[:COMMENTS_ON {comment: event.text_original}]->(v)
"""
df_n4j.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", bolt_url) \
    .option("query", cql_query) \
    .save()

                                                                                