In [None]:
# Install required packages
# !pip install python-dotenv
# !pip install numpy
# !pip install pandas
# !pip install google-api-python-client
# !pip install mysql-connector-python
# !pip install sqlalchemy

In [None]:
# Import required packages
from dotenv import load_dotenv
import os
import numpy as np
import pandas as pd
from googleapiclient.discovery import build
import mysql.connector
from sqlalchemy import create_engine

In [None]:
# Load environment variables from .env file
load_dotenv()

# Get YouTube API key from .env 
youtube_api_key = os.getenv("youtube_api_key")
# Get MySQL username from .env
mysql_user = os.getenv("mysql_user")
# Get MySQL password from .env
mysql_password = os.getenv("mysql_password")

In [None]:
# Build the YouTube service object
youtube = build("youtube", "v3", developerKey=youtube_api_key)

In [None]:
# Extract YouTube channel data

# Select channels
channel_names = ["AlexTheAnalyst", "LukeBarousse", "Thuvu5"]

# Initialize an empty list to store dictionaries for each channel
channels_ls = []

# Initialize an empty list to store uploads playlist IDs of all channels
uploads_playlist_ids = []

# Loop through each channel
for channel_name in channel_names:
    # Get channel data using the YouTube Channels API
    # Note: Uses 1 out of 10.000 units from the daily usage limit 
    channel_data = youtube.channels().list(part="statistics,snippet,contentDetails", forHandle=channel_name).execute()  

    # Extract channel data in dictionary format
    channel_dict = {
        "channel_id": channel_data["items"][0]["id"],
        "channel_name": channel_data["items"][0]["snippet"]["title"],
        "views": int(channel_data["items"][0]["statistics"]["viewCount"]),
        "videos": int(channel_data["items"][0]["statistics"]["videoCount"]),
        "subscribers": int(channel_data["items"][0]["statistics"]["subscriberCount"])
    }
    
    # Append channel data in dictionary format to the list
    channels_ls.append(channel_dict)
    
    # Append uploads playlist ID to the list 
    uploads_playlist_ids.append(channel_data["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"])

# Convert list of dictionaries to pandas DataFrame
channel_df = pd.DataFrame(channels_ls) 
channel_df

In [None]:
# Extract video data

# Initialize an empty list to store dictionaries for each video
videos_ls = []

# Loop through each playlist
for uploads_playlist_id in uploads_playlist_ids:
    # Initialize next_page_token to None
    next_page_token = None

    # Loop through each video in the playlist
    while True:
        # Get playlist data using the YouTube PlaylistItems API 
        # Note: Each loop uses 1 out of 10.000 units from the daily usage limit (1 unit for 50 videos)
        playlist_data = youtube.playlistItems().list(
            part="snippet", 
            playlistId=uploads_playlist_id, 
            maxResults=50,
            pageToken=next_page_token
        ).execute()

        # Initialize an empty list to store video IDs
        video_ids = []

        # Extract video IDs from the playlist data
        video_ids += [video_data["snippet"]["resourceId"]["videoId"] for video_data in playlist_data["items"]]

        # Get video data using the YouTube Videos API 
        # Note: Uses 1 out of 10.000 units from the daily usage limit (1 unit for 50 videos)
        video_data = youtube.videos().list(part="statistics,snippet,contentDetails", id=video_ids).execute()    

        # Loop through each video 
        for video in video_data["items"]:
            # Extract video data in dictionary format
            video_dict = {
                "video_id": video["id"],
                "channel_id": video["snippet"]["channelId"],
                "video_title": video["snippet"]["title"],
                "video_description": video["snippet"]["description"],
                "published_at": video["snippet"]["publishedAt"],
                "video_duration": video["contentDetails"]["duration"],
                "views": video["statistics"]["viewCount"],
                "likes": video["statistics"]["likeCount"],
                "comments": video["statistics"]["commentCount"],
            }

            try:
                # Try to get thumbnail in maximum resolution
                video_dict["thumbnail_url"] = video["snippet"]["thumbnails"]["maxres"]["url"]
            except KeyError:
                # If maxres is not available, get default resolution
                video_dict["thumbnail_url"] = video["snippet"]["thumbnails"]["default"]["url"]

            # Append video data in dictionary format to the list
            videos_ls.append(video_dict)

        # Get the next page token
        next_page_token = playlist_data.get("nextPageToken")

        # Exit the loop if there are no more pages
        if next_page_token is None:
            break
        
# Convert list of dictionaries to pandas DataFrame
videos_df = pd.DataFrame(videos_ls)    
videos_df

In [None]:
# Extract comments data

# Initialize an empty list to store comments
comments_ls = []

# Loop through each video
for video_id in videos_df["video_id"].values:
    # Initialize next_page_token to None
    next_page_token = None

    # Loop through data batches of 100 comments 
    while True:
        try:
            # Get data from 100 comments using the YouTube CommentThreads API 
            # Note: Each loop uses 1 out of 10.000 units from the daily usage limit (1 unit for 100 comments)
            comments_data = youtube.commentThreads().list(
                part="snippet", 
                videoId=video_id, 
                maxResults=100,
                pageToken=next_page_token
            ).execute()
        # Handle error if e.g. comments for the video are disabled
        except Exception as e:
            print(f"Failed to get comments for video {video_id}.")

        # Loop through each comment
        for comment in comments_data["items"]:
            # Extract comment data in dictionary format
            comment_dict = {
                "comment_id": comment["snippet"]["topLevelComment"]["id"],
                "video_id": comment["snippet"]["topLevelComment"]["snippet"]["videoId"],
                "channel_id": comment["snippet"]["topLevelComment"]["snippet"]["channelId"],
                "comment_text": comment["snippet"]["topLevelComment"]["snippet"]["textOriginal"]
            }
            # Append comment data dictionary to the list
            comments_ls.append(comment_dict)

        # Get the next page token
        next_page_token = playlist_data.get("nextPageToken")

        # Exit the loop if there are no more pages
        if next_page_token is None: 
            break
        
# Convert list of dictionaries to pandas DataFrame
comments_df = pd.DataFrame(comments_ls)    
comments_df 

In [None]:
# Load data from Pandas DataFrames into MySQL tables

# Connect to MySQL database
connection = mysql.connector.connect(
    host = "localhost",
    user = mysql_user,
    password = mysql_password,
    database = "youtube_analytics"
)

# Create a cursor object for interacting with the database
cursor = connection.cursor()

try:
    # Create a SQLAlchemy engine for interacting with the MySQL database
    engine = create_engine(f"mysql+mysqlconnector://{mysql_user}:{mysql_password}@localhost/youtube_analytics")
    
    # Load the YouTube channels DataFrame into the MySQL channels table
    try:
        channel_df.to_sql("channels", con=engine, if_exists="replace", index=False)
        print("Channels data successfully loaded into MySQL database.")
    except Exception as e:
        print("Error loading channels data:", e)
    
    # Load the YouTube videos DataFrame into the MySQL videos table
    try:
        videos_df.to_sql("videos", con=engine, if_exists="replace", index=False)
        print("Videos data successfully loaded into MySQL database.")
    except Exception as e:
        print("Error loading videos data:", e)
    
    # Load the YouTube comments DataFrame into the MySQL comments table
    try:
        comments_df.to_sql("comments", con=engine, if_exists="replace", index=False)
        print("Comments data successfully loaded into MySQL database.")
    except Exception as e:
        print("Error loading comments data:", e)
    
except Exception as e:
    # Print error if exception occurs when connecting to the database 
    print("Error connecting to MySQL database:", e)

finally:
    # Close the cursor and connection to free up resources
    cursor.close()
    connection.close()