"""
This script is designed to download data from YouTube using the Google API and store it in a MySQL database.
"""

In [None]:
# Standard library imports
import os
import re
import time
from datetime import datetime, timedelta, timezone

# Third-party imports
from dateutil import parser
from dotenv import load_dotenv
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import mysql.connector
from mysql.connector import Error

# Load environment variables from .env
load_dotenv()

# Copy `.env.example` to `.env` and update it with your YouTube API key and database connection details:

In [2]:
# Defining the YouTube API access key
api_key = os.getenv('YOUTUBE_API_KEY')

# Creating a YouTube service object
youtube = build('youtube','v3',developerKey=api_key)

# Cleaning up unnecessary variables
del api_key

In [None]:
# Function to fetch details of one or more YouTube channels.

# Channels to fetch details for.
channels = ["LinusTechTips", "techlinked", "GameLinked", "ShortCircuit", "techquickie", "LMGClips", "macaddress"]

def get_channel_details(query):
    """
    Retrieves the channel ID, name, and handle for a given YouTube channel query.
    """
        
    # List of search strategies
    search_strategies = [
        lambda: youtube.search().list(part="snippet", q=query, type="channel", maxResults=1),
        lambda: youtube.search().list(part="snippet", q=f"@{query}", type="channel", maxResults=1),
        lambda: youtube.channels().list(part="snippet", forUsername=query)
    ]
    
    for strategy in search_strategies:
        try:
            request = strategy()
            response = request.execute()
            
            if 'items' in response and response['items']:
                item = response['items'][0]
                if 'id' in item:
                    channel_id = item['id'].get('channelId') or item['id']
                else:
                    channel_id = item['snippet']['channelId']
                channel_name = item['snippet']['title']
                
                # Fetch channel details to get the handle
                channel_response = youtube.channels().list(
                    part="snippet",
                    id=channel_id
                ).execute()
                
                if 'items' in channel_response and channel_response['items']:
                    channel_item = channel_response['items'][0]
                    channel_handle = channel_item['snippet'].get('customUrl', '')
                    return channel_id, channel_name, channel_handle
        except Exception as e:
            print(f"Error in search strategy: {str(e)}")
    
    print(f"Could not find the channel: {query}")
    return None, None, None

# Create a list to store channel details
channels_data = []
for channel in channels:
    channel_id, channel_name, channel_handle = get_channel_details(channel)
    if channel_id and channel_name:
        channels_data.append([channel_id, channel_name, channel_handle])

# Print the collected channel data
for channel_data in channels_data:
    print(channel_data)

# Cleaning up unnecessary variables
del channels, channel, channel_id, channel_name, channel_handle, channel_data

In [4]:
# If the channel data is not available, use the following code to add manually

# channels_data = [["UC...Channel ID", "Channel Name", "@Channel Handle"],
#                  ["UC...Channel ID", "Channel Name", "@Channel Handle"]
#                  ]

In [4]:
# Adding additional columns for playlists; videos, lives, shorts and exclusives, to the channel data.

# Creating a list to store the updated channel data
new_channels_data = []

# Iterating over the original list of channel data
for channel_info in channels_data:
    channel_id, channel_name, channel_handle = channel_info

    # Creating additional columns for playlists
    playlist_videos = "UULF" + channel_id[2:]
    playlist_lives = "UULV" + channel_id[2:]
    playlist_shorts = "UUSH" + channel_id[2:]
    playlist_exclusives = "UUMF" + channel_id[2:]

    # Creating a new list with the existing elements and the new playlist columns
    new_channel_info = [
        channel_id,
        channel_name,
        channel_handle,
        playlist_videos,
        playlist_lives,
        playlist_shorts,
        playlist_exclusives
    ]

    # Adding the new list to the updated list
    new_channels_data.append(new_channel_info)

# Replacing the original list with the updated list
channels_data = new_channels_data

# Cleaning up unnecessary variables
del channel_info, channel_id, channel_name, channel_handle, new_channel_info, new_channels_data, playlist_videos, playlist_lives, playlist_shorts, playlist_exclusives

In [None]:
# Function to fetch videos data from a YouTube playlist.
# During the process, the function also fetches the video statistics and live streaming details.
# It's normal to skip some playlists, as not all channels have playlists of live videos, shorts and exclusives.
# The data downloaded includes the video ID, title, published date, likes, duration, views, comments, tags, and live streaming details.

def get_videos_from_playlist(youtube, playlist_id, channel_handle, live_flag, short_flag, exclusive_flag):
    nextPageToken = None
    video_count = 0
    quota_cost = 0
    retries = 5  # Number of retries for transient errors
    video_data = []

    while True:
        try:
            request = youtube.playlistItems().list(
                part="contentDetails",
                playlistId=playlist_id,
                maxResults=50,
                pageToken=nextPageToken
            )
            response = request.execute()
            quota_cost += 1  # Cost for playlistItems().list call

            if 'items' not in response or not response['items']:
                break

            video_ids = [item['contentDetails']['videoId'] for item in response['items']]
            if not video_ids:
                break

            request = youtube.videos().list(
                part="snippet,statistics,contentDetails,liveStreamingDetails",
                id=','.join(video_ids),
                fields="items(id,snippet(title,publishedAt,tags),statistics(likeCount,viewCount,commentCount),contentDetails/duration,liveStreamingDetails(actualStartTime,actualEndTime,scheduledStartTime,scheduledEndTime))"
            )
            response_video = request.execute()
            quota_cost += 1  # Cost for videos().list call

            for video in response_video['items']:
                video_id = video['id']
                snippet = video['snippet']
                title = snippet['title']
                published_at = snippet['publishedAt']
                tags = ','.join(snippet.get('tags', [])) or 'N/A'
                likes = video['statistics'].get('likeCount', 'N/A')
                views = video['statistics'].get('viewCount', 'N/A')
                comments = video['statistics'].get('commentCount', 'N/A')
                duration = video['contentDetails']['duration']
                
                live_details = video.get('liveStreamingDetails', {})
                actual_start_time = live_details.get('actualStartTime', 'N/A')
                actual_end_time = live_details.get('actualEndTime', 'N/A')
                scheduled_start_time = live_details.get('scheduledStartTime', 'N/A')
                scheduled_end_time = live_details.get('scheduledEndTime', 'N/A')

                video_data.append([
                    channel_handle, video_id, title, published_at, likes, duration, views, comments,
                    live_flag, short_flag, exclusive_flag, actual_start_time, actual_end_time,
                    scheduled_start_time, scheduled_end_time, tags
                ])
                video_count += 1

            nextPageToken = response.get('nextPageToken')
            if not nextPageToken or quota_cost >= 10000:
                break

            retries = 5  # Reset retries after successful request
        except HttpError as e:
            if e.resp.status == 404:
                print(f"Playlist {playlist_id} not found or unavailable, skipping...")
                return video_data, quota_cost, False
            error_content = e.content.decode()
            print(f"An HTTP error {e.resp.status} occurred:\n{error_content}")
            if e.resp.status in [500, 503] and retries > 0:
                time.sleep(5)  # Wait before retrying
                retries -= 1
                continue  # Retry the request
            else:
                return video_data, quota_cost, True  # Return for other HTTP errors
        except Exception as e:
            print(f"An error occurred: {str(e)}")
            return video_data, quota_cost, True  # Return for other exceptions

    return video_data, quota_cost, True

# Function to fetch statistics of a YouTube channel
def get_channel_stats(youtube, channel_id):
    try:
        request = youtube.channels().list(
            part="statistics",
            id=channel_id,
            fields="items(statistics(subscriberCount,viewCount))"
        )
        response = request.execute()
        quota_cost = 1  # Cost for the channels().list call
        
        stats = response['items'][0]['statistics']
        subscribers = stats.get('subscriberCount', 'N/A')
        total_views = stats.get('viewCount', 'N/A')

        return subscribers, total_views, quota_cost
    except HttpError as e:
        print(f"An HTTP error {e.resp.status} occurred: {e.content.decode()}")
        return 'N/A', 'N/A', 1  # Return 1 as the quota cost for the error

def process_channels(youtube, channels_data):
    total_video_data = []
    total_video_count = 0
    total_quota_cost = 0
    not_found_playlists = set()
    channel_stats_data = []

    for channel in channels_data:
        channel_id, channel_name, channel_handle, playlist_videos, playlist_lives, playlist_shorts, playlist_exclusives = channel

        # Retrieve the channel details
        subscribers, total_views, quota_cost = get_channel_stats(youtube, channel_id)
        channel_stats_data.append([channel_id, channel_name, channel_handle, subscribers, total_views])
        total_quota_cost += quota_cost

        # Process each playlist with appropriate flags
        for playlist_id, live_flag, short_flag, exclusive_flag in [
            (playlist_videos, 0, 0, 0),
            (playlist_lives, 1, 0, 0),
            (playlist_shorts, 0, 1, 0),
            (playlist_exclusives, 0, 0, 1)
        ]:
            if playlist_id in not_found_playlists:
                continue
            video_data, quota_cost, found = get_videos_from_playlist(youtube, playlist_id, channel_handle, live_flag, short_flag, exclusive_flag)
            if not found:
                not_found_playlists.add(playlist_id)
            else:
                total_video_data.extend(video_data)
                total_video_count += len(video_data)
                total_quota_cost += quota_cost
                print(f"Fetched {len(video_data)} videos from playlist {playlist_id} ({channel_name})")

    # Get the current date and time to relate to the update time
    update_date = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

    return total_video_data, channel_stats_data, total_quota_cost, update_date

# Main execution
if __name__ == "__main__":
    # Assuming youtube object and channels_data are already defined
    total_video_data, channel_stats_data, total_quota_cost, update_date = process_channels(youtube, channels_data)

    # Here you can process the data as needed, e.g., save to CSV, database, etc.
    print(f"Data updated at: {update_date}")
    print(f"Total channels processed: {len(channel_stats_data)}")
    print(f"Total videos fetched: {len(total_video_data)}")
    print(f"Total quota cost: {total_quota_cost}")


# Cleaning up unnecessary variables
del total_quota_cost

In [6]:
# Formatting and manipulating data for database insertion

# Function to format duration in HH:MM:SS
def format_duration(duration_str):
    if not duration_str or duration_str == 'N/A':
        return None
    
    match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration_str)
    if not match:
        return None
    
    hours = int(match.group(1) or 0)
    minutes = int(match.group(2) or 0)
    seconds = int(match.group(3) or 0)
    
    return str(timedelta(hours=hours, minutes=minutes, seconds=seconds))

# Function to convert date to MySQL format
def convert_date(date_str):
    if isinstance(date_str, datetime):
        return date_str.replace(tzinfo=None)  # Remove timezone info if present
    if not date_str or date_str == 'N/A':
        return None
    try:
        dt = parser.isoparse(date_str)
        return dt.replace(tzinfo=None)  # Remove timezone info
    except ValueError:
        print(f"Error converting date: {date_str}")
        return None

# Function to safely convert to integer
def safe_int_convert(value):
    if value == 'N/A' or value is None:
        return None
    try:
        return int(value)
    except ValueError:
        print(f"Error converting to int: {value}")
        return None

# Function to replace 'N/A' with None
def replace_na(value):
    return None if value == 'N/A' else value

# Process video data
def process_video_data(video):
    date_fields = [3, 11, 12, 13, 14]
    int_fields = [4, 6, 7]
    
    for i in date_fields:
        video[i] = convert_date(video[i])
    
    for i in int_fields:
        video[i] = safe_int_convert(video[i])
    
    video[5] = format_duration(video[5])
    video[15] = replace_na(video[15])
    
    return video

# Main processing
total_video_data = [process_video_data(video) for video in total_video_data]

# Filter videos with zero duration
total_video_data = [video for video in total_video_data if video[5] not in [None, '0:00:00']]

# Check for remaining None values or ‘0:00:00’ in duration
for video in total_video_data:
    if video[5] is None or video[5] == '0:00:00':
        print(f"Videos with invalid duration still present: {video}")

# Check for remaining N/A values
for video in total_video_data:
    if 'N/A' in video:
        print(f"Videos with 'N/A': {video}")

# Cleaning up unnecessary variables
del video

In [None]:
# Database Connection Pool Settings
connection_pool = mysql.connector.pooling.MySQLConnectionPool(
    pool_name = "mypool",
    pool_size = 5,
    host = os.getenv('DB_HOST'),
    port = 3306, # Replace with your port
    user = os.getenv('DB_USER'),
    password = os.getenv('DB_PASSWORD'),
    database = os.getenv('DB_DATABASE')
)

def execute_batch(cursor, query, data):
    try:
        cursor.executemany(query, data)
    except mysql.connector.Error as error:
        print(f"Error when executing batch: {error}")
        raise

try:
    connection = connection_pool.get_connection()
    cursor = connection.cursor()

    # Start a transaction
    connection.start_transaction()

    # Prepare queries
    video_query = """
    INSERT INTO videosdata (Channel_Handle, Video_ID, Video_Title, Published_At, Likes, Duration, Views, Comments, Live, Short, Exclusive, Live_Start_Time, Live_End_Time, Scheduled_Start_Time)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        Channel_Handle = VALUES(Channel_Handle),
        Video_Title = VALUES(Video_Title),
        Published_At = VALUES(Published_At),
        Likes = VALUES(Likes),
        Duration = VALUES(Duration),
        Views = VALUES(Views),
        Comments = VALUES(Comments),
        Live = VALUES(Live),
        Short = VALUES(Short),
        Exclusive = VALUES(Exclusive),
        Live_Start_Time = VALUES(Live_Start_Time),
        Live_End_Time = VALUES(Live_End_Time),
        Scheduled_Start_Time = VALUES(Scheduled_Start_Time);
    """

    # Prepare data for batch insertion
    video_data = [(v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[11], v[12], v[13]) 
                  for v in total_video_data]

    # Perform batch insert/update for videos
    execute_batch(cursor, video_query, video_data)

    # Process tags
    tag_query = "INSERT IGNORE INTO tags (Tag_Name) VALUES (%s)"
    video_tag_query = "INSERT IGNORE INTO video_tags (Video_ID, Tag_ID) VALUES (%s, %s)"

    tag_data = set()
    video_tag_data = []

    for video in total_video_data:
        video_id = video[1]
        if video[-1]:  # Checks if the value of the tags is "None"
            tags = video[-1].split(',')
            for tag in tags:
                tag = tag.strip()
                if tag:
                    tag_data.add((tag,))

    # Insert tags in batch
    execute_batch(cursor, tag_query, list(tag_data))

    # Get IDs for all tags in a single query
    cursor.execute("SELECT Tag_Name, Tag_ID FROM tags WHERE Tag_Name IN (%s)" % ','.join(['%s'] * len(tag_data)), 
                   [tag[0] for tag in tag_data])
    tag_id_map = dict(cursor.fetchall())

    # Prepare data for sending to video_tags
    for video in total_video_data:
        video_id = video[1]
        if video[-1]:
            tags = video[-1].split(',')
            for tag in tags:
                tag = tag.strip()
                if tag and tag in tag_id_map:
                    video_tag_data.append((video_id, tag_id_map[tag]))

    # Insert data into video_tags in batch
    execute_batch(cursor, video_tag_query, video_tag_data)

    # Update channel_stats in batch
    channel_query = """
    INSERT INTO channel_stats (Channel_ID, Channel_Name, Channel_Handle, Subscribers, Total_Views)
    VALUES (%s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        Channel_Name = VALUES(Channel_Name),
        Channel_Handle = VALUES(Channel_Handle),
        Subscribers = VALUES(Subscribers),
        Total_Views = VALUES(Total_Views);
    """
    execute_batch(cursor, channel_query, channel_stats_data)

    # Update log
    cursor.execute("""
    INSERT INTO update_log (id, Update_Date) 
    VALUES (1, %s) 
    ON DUPLICATE KEY UPDATE Update_Date = VALUES(Update_Date)
    """, (update_date,))

    # Transaction commit
    connection.commit()

    print("Data imported/updated successfully!")

except mysql.connector.Error as error:
    connection.rollback()
    print(f"Error inserting/updating data: {error}")

finally:
    if cursor:
        cursor.close()
    if connection:
        connection.close()

# Cleaning up unnecessary variables
del connection, cursor, connection_pool, video_data, video_query, tag_data, video_tag_data, tag_query, video_tag_query, tag_id_map, channel_query, tag, tags, video, video_id

In [None]:
#Tentando corrigir.

# Database Connection Pool Settings
connection_pool = mysql.connector.pooling.MySQLConnectionPool(
    pool_name = "mypool",
    pool_size = 5,
    host = os.getenv('DB_HOST'),
    port = 3306, # Replace with your port
    user = os.getenv('DB_USER'),
    password = os.getenv('DB_PASSWORD'),
    database = os.getenv('DB_DATABASE')
)

def execute_batch(cursor, query, data):
    try:
        cursor.executemany(query, data)
    except mysql.connector.Error as error:
        print(f"Error when executing batch: {error}")
        raise

try:
    connection = connection_pool.get_connection()
    cursor = connection.cursor()

    # Start a transaction
    connection.start_transaction()

    # Prepare queries
    video_query = """
    INSERT INTO videosdata (Channel_Handle, Video_ID, Video_Title, Published_At, Likes, Duration, Views, Comments, Live, Short, Exclusive, Live_Start_Time, Live_End_Time, Scheduled_Start_Time)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        Channel_Handle = VALUES(Channel_Handle),
        Video_Title = VALUES(Video_Title),
        Published_At = VALUES(Published_At),
        Likes = VALUES(Likes),
        Duration = VALUES(Duration),
        Views = VALUES(Views),
        Comments = VALUES(Comments),
        Live = VALUES(Live),
        Short = VALUES(Short),
        Exclusive = VALUES(Exclusive),
        Live_Start_Time = VALUES(Live_Start_Time),
        Live_End_Time = VALUES(Live_End_Time),
        Scheduled_Start_Time = VALUES(Scheduled_Start_Time);
    """

    # Prepare data for batch insertion
    video_data = [(v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7], v[8], v[9], v[10], v[11], v[12], v[13]) 
                  for v in total_video_data]

    # Perform batch insert/update for videos
    execute_batch(cursor, video_query, video_data)

    # Process tags
    tag_query = "INSERT INTO tags (Tag_Name) VALUES (%s) ON DUPLICATE KEY UPDATE Tag_ID = LAST_INSERT_ID(Tag_ID)"
    video_tag_query = "INSERT IGNORE INTO video_tags (Video_ID, Tag_ID) VALUES (%s, %s)"

    tag_data = set()
    video_tag_data = []

    for video in total_video_data:
        video_id = video[1]
        if video[-1]:  # Checks if the value of the tags is not None
            tags = video[-1].split(',')
            for tag in tags:
                tag = tag.strip()
                if tag:
                    tag_data.add(tag)

    # Insert tags one by one to handle potential truncation
    tag_id_map = {}
    for tag in tag_data:
        cursor.execute(tag_query, (tag,))
        tag_id = cursor.lastrowid
        tag_id_map[tag] = tag_id

    # Prepare data for sending to video_tags
    for video in total_video_data:
        video_id = video[1]
        if video[-1]:
            tags = video[-1].split(',')
            for tag in tags:
                tag = tag.strip()
                if tag and tag in tag_id_map:
                    video_tag_data.append((video_id, tag_id_map[tag]))

    # Insert data into video_tags in batch
    execute_batch(cursor, video_tag_query, video_tag_data)

    # Update channel_stats in batch
    channel_query = """
    INSERT INTO channel_stats (Channel_ID, Channel_Name, Channel_Handle, Subscribers, Total_Views)
    VALUES (%s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        Channel_Name = VALUES(Channel_Name),
        Channel_Handle = VALUES(Channel_Handle),
        Subscribers = VALUES(Subscribers),
        Total_Views = VALUES(Total_Views);
    """
    execute_batch(cursor, channel_query, channel_stats_data)

    # Update log
    cursor.execute("""
    INSERT INTO update_log (id, Update_Date) 
    VALUES (1, %s) 
    ON DUPLICATE KEY UPDATE Update_Date = VALUES(Update_Date)
    """, (update_date,))

    # Transaction commit
    connection.commit()

    print("Data imported/updated successfully!")

except mysql.connector.Error as error:
    connection.rollback()
    print(f"Error inserting/updating data: {error}")

finally:
    if cursor:
        cursor.close()
    if connection:
        connection.close()

# Cleaning up unnecessary variables
del connection, cursor, connection_pool, video_data, video_query, tag_data, video_tag_data, tag_query, video_tag_query, tag_id_map, channel_query, tag, tags, video, video_id

In [None]:
# This code snippet is used to remove videos that have been removed from YouTube channels from the database
# It also removes orphaned tags from the 'tags' table

# Database Connection Settings
DB_CONFIG = {
    "host": os.getenv('DB_HOST'),
    "user": os.getenv('DB_USER'),
    "password": os.getenv('DB_PASSWORD'),
    "database": os.getenv('DB_DATABASE')
}

# Function to clean the YouTube database
def clean_youtube_database():
    try:
        # Establish connection to the database
        connection = mysql.connector.connect(**DB_CONFIG)
        cursor = connection.cursor()

        print("Starting database cleanup process...")

        # 1. Get all Video_IDs of latest data downloaded from YouTube
        current_video_ids = get_current_youtube_video_ids(total_video_data)

        # 2. Get all Video_IDs in the database
        cursor.execute("SELECT Video_ID FROM videosdata")
        db_video_ids = set(row[0] for row in cursor.fetchall())

        # 3. Find videos to remove
        videos_to_remove = db_video_ids - set(current_video_ids)

        if videos_to_remove:
            print(f"Found {len(videos_to_remove)} videos to remove.")

            # 4. Remove entries from video_tags table
            remove_tags_query = "DELETE FROM video_tags WHERE Video_ID IN (%s)" % ','.join(['%s'] * len(videos_to_remove))
            cursor.execute(remove_tags_query, tuple(videos_to_remove))
            print(f"Removed {cursor.rowcount} entries from the video_tags table.")

            # 5. Now remove videos from the videosdata table
            remove_query = "DELETE FROM videosdata WHERE Video_ID IN (%s)" % ','.join(['%s'] * len(videos_to_remove))
            cursor.execute(remove_query, tuple(videos_to_remove))
            print(f"Removed {cursor.rowcount} entries from the videosdata table.")

            # 6. Commit the changes
            connection.commit()
            print("Changes committed to the database.")
        else:
            print("No videos to remove.")

        # 7. Cleaning up orphan tags
        cursor.execute("SELECT t.Tag_ID, t.Tag_Name FROM tags t LEFT JOIN video_tags vt ON t.Tag_ID = vt.Tag_ID WHERE vt.Tag_ID IS NULL")
        orphan_tags = cursor.fetchall()
        print(f"Orphan tags found: {orphan_tags}")
        
        cursor.execute("""
            DELETE t FROM tags t
            LEFT JOIN video_tags vt ON t.Tag_ID = vt.Tag_ID
            WHERE vt.Tag_ID IS NULL
        """)
        print(f"Removed {cursor.rowcount} orphan tags.")

        connection.commit()
        print("Cleaning process completed successfully.")

    except Error as e:
        print(f"Error during database cleanup: {e}")
        if connection.is_connected():
            connection.rollback()
            print("Changes rolled back due to error.")

    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("Database connection closed.")

# Function to get current YouTube Video IDs
def get_current_youtube_video_ids(file):
    ids = [row[1] for row in file]
    return ids

# Perform cleaning
if __name__ == "__main__":
    clean_youtube_database()

# Cleaning up unnecessary variables
del DB_CONFIG

"""\
YouTube Data Collector\
Author: Davi Prata\
GitHub: https://github.com/RockManRK\
Email: rockmanrk@hotmail.com\
Date: September 20, 2024\
\
Description:\
This script collects data from YouTube channels using the YouTube API and stores it in a MySQL database. It retrieves video statistics such as views, likes, comments, and more, and formats the data for SQL compatibility.\
"""