In [5]:
import googleapiclient.discovery
import os
import sqlite3
from datetime import datetime
import csv
import isodate

In [2]:
# find channel ids

# Set up API key
API_KEY = '###############'  # Replace with your API key

# Initialize YouTube API client
youtube = googleapiclient.discovery.build('youtube', 'v3', developerKey=API_KEY)

# Function to get channel ID from handle
def get_channel_id_from_handle(youtube, handle):
    request = youtube.search().list(
        part='snippet',
        q=handle,
        type='channel'
    )
    response = request.execute()
    
    if 'items' in response:
        return response['items'][0]['snippet']['channelId']
    else:
        return None

# Handles of three different youtube banks
handles = ('@KOHOFinancial','@Wise', '@tangerinebank')

channel_ids_dict = {}
for handle in handles:
    channel_id = get_channel_id_from_handle(youtube, handle)
    if channel_id:
        channel_ids_dict.update({handle: channel_id})


print(channel_ids_dict)

{'@KOHOFinancial': 'UC-HnC-dcg5_KYW6pQ7H4JQA', '@Wise': 'UCm4E1b5TjstT8UUIk7mb-MA', '@tangerinebank': 'UCsMDZ4GA5pgLAxjc8TKjFiQ'}


In [3]:
#api returning wrong channel id
channel_ids_dict['@Wise']= 'UCm4E1b5TjstT8UUIk7mb-MA'
print(channel_ids_dict)

{'@KOHOFinancial': 'UC-HnC-dcg5_KYW6pQ7H4JQA', '@Wise': 'UCm4E1b5TjstT8UUIk7mb-MA', '@tangerinebank': 'UCsMDZ4GA5pgLAxjc8TKjFiQ'}


In [6]:
# List of YouTube channel IDs for the three bank channels
channel_ids = list(channel_ids_dict.values())


# Function to get channel statistics
def get_channel_stats(youtube, channel_ids):
    all_data = []
    request = youtube.channels().list(
        part='snippet,contentDetails,statistics',
        id=','.join(channel_ids)
    )
    response = request.execute()
    
    for item in response['items']:
        data = {
            'channel_id': item['id'],
            'title': item['snippet']['title'],
            'subscribers': item['statistics']['subscriberCount'],
            'views': item['statistics']['viewCount'],
            'total_videos': item['statistics']['videoCount'],
            'playlist_id': item['contentDetails']['relatedPlaylists']['uploads']
        }
        all_data.append(data)
    
    return all_data

# Function to get video IDs from a playlist
def get_video_ids(youtube, playlist_id):
    video_ids = []
    request = youtube.playlistItems().list(
        part='contentDetails',
        playlistId=playlist_id,
        maxResults=50
    )
    response = request.execute()
    
    for item in response['items']:
        video_ids.append(item['contentDetails']['videoId'])
    
    return video_ids

# Function to get video details
def get_video_details(youtube, video_ids, channel_stats):
    all_video_stats = []
    
    for i in range(0, len(video_ids), 50):
        request = youtube.videos().list(
            part='snippet,contentDetails,statistics',
            id=','.join(video_ids[i:i+50])
        )
        response = request.execute()
        
        for video in response['items']:
            try:
                # Find the channel_id from channel_stats based on video's channelId
                channel_id = next((ch['channel_id'] for ch in channel_stats if ch['channel_id'] == video['snippet']['channelId']), None)
                
                if channel_id:
                    # Extract duration in ISO 8601 format from contentDetails
                    duration = video['contentDetails']['duration']
                    # Parse duration to get total seconds
                    duration_parsed = isodate.parse_duration(duration)
                    duration_seconds = duration_parsed.total_seconds()
                    
                    video_stats = {
                        'video_id': video['id'],
                        'title': video['snippet']['title'],
                        'published_at': video['snippet']['publishedAt'],
                        'views': video['statistics'].get('viewCount', 0),
                        'likes': video['statistics'].get('likeCount', 0),
                        'comments': video['statistics'].get('commentCount', 0),
                        'channel_id': channel_id,
                        'duration_seconds': duration_seconds
                    }
                    all_video_stats.append(video_stats)
                else:
                    print(f"No matching channel found for video with channelId: {video['snippet']['channelId']}")
            except KeyError as e:
                print(f"KeyError: {e} for video: {video}")
                continue  # Skip this video and proceed with the next
        
    return all_video_stats



# Fetch channel statistics
channel_stats = get_channel_stats(youtube, channel_ids)

# Fetch video details for each channel
for channel in channel_stats:
    print(f"Fetching videos for channel: {channel['title']}")
    playlist_id = channel['playlist_id']
    video_ids = get_video_ids(youtube, playlist_id)
    video_details = get_video_details(youtube, video_ids, channel_stats)
    
    # Print or save the video details
    for video in video_details:
        print(video)

# Print channel statistics
for channel in channel_stats:
    print(channel)


Fetching videos for channel: Wise
{'video_id': '1lPHoPkeccU', 'title': "Why These EU Countries Don't Use the Euro Currency? 🤔💶 #WiseMoneyStories", 'published_at': '2024-07-10T12:58:26Z', 'views': '1669', 'likes': '34', 'comments': '7', 'channel_id': 'UCm4E1b5TjstT8UUIk7mb-MA', 'duration_seconds': 45.0}
{'video_id': 'CtA8s7HOflA', 'title': 'Wise Travel Card: REAL TEST & Review on a Norway Road Trip 🚗', 'published_at': '2024-07-05T16:46:43Z', 'views': '1370', 'likes': '68', 'comments': '29', 'channel_id': 'UCm4E1b5TjstT8UUIk7mb-MA', 'duration_seconds': 276.0}
{'video_id': '449OLZ9mwsQ', 'title': 'What It Really Costs to Be a Musician... 💸  #musicgear #musicianlife #wisecard', 'published_at': '2024-07-01T13:00:04Z', 'views': '464', 'likes': '16', 'comments': '2', 'channel_id': 'UCm4E1b5TjstT8UUIk7mb-MA', 'duration_seconds': 12.0}
{'video_id': 'EtIFZBeiVYU', 'title': "How to say 'Pardon me' in 6 Languages 🗣️🌏", 'published_at': '2024-06-29T12:00:20Z', 'views': '3067', 'likes': '50', 'commen

In [19]:
# Function to create channels table
def create_channels_table(conn):
    sql_create_channels_table = """
    CREATE TABLE IF NOT EXISTS channels (
        id TEXT PRIMARY KEY,
        title TEXT,
        subscribers INTEGER,
        views INTEGER,
        total_videos INTEGER,
        playlist_id TEXT
    );
    """
    try:
        cursor = conn.cursor()
        cursor.execute(sql_create_channels_table)
        conn.commit()
        print("Channels table created successfully.")
    except sqlite3.Error as e:
        print(f"Error creating channels table: {e}")

# Function to create videos table
def create_videos_table(conn):
    sql_create_videos_table = """
    CREATE TABLE IF NOT EXISTS videos (
        id TEXT PRIMARY KEY,
        title TEXT,
        published_date TEXT,
        views INTEGER,
        likes INTEGER,
        comments INTEGER,
        channel_id TEXT,
        duration_seconds INTEGER,
        FOREIGN KEY (channel_id) REFERENCES channels(channel_id)
    );
    """
    try:
        cursor = conn.cursor()
        cursor.execute(sql_create_videos_table)
        conn.commit()
        print("Videos table created successfully.")
    except sqlite3.Error as e:
        print(f"Error creating videos table: {e}")

# Main function to execute database operations
def main():
    try:
        # Connect to SQLite database (create it if it doesn't exist)
        conn = sqlite3.connect('youtube_data.db')
        print("Connected to SQLite database.")
        
        # Create channels and videos tables
        create_channels_table(conn)
        create_videos_table(conn)
        
    except sqlite3.Error as e:
        print(f"SQLite error: {e}")
    finally:
        if conn:
            conn.close()
            print("SQLite connection closed.")

if __name__ == "__main__":
    main()

Connected to SQLite database.
Channels table created successfully.
Videos table created successfully.
SQLite connection closed.


In [8]:
def fetch_youtube_data(channel_ids):
    channel_stats = get_channel_stats(youtube, channel_ids)
    
    all_video_details = []
    for channel in channel_stats:
        print(f"Fetching videos for channel: {channel['title']}")
        playlist_id = channel['playlist_id']
        video_ids = get_video_ids(youtube, playlist_id)
        video_details = get_video_details(youtube, video_ids, channel_stats)
        all_video_details.extend(video_details)
    
    return channel_stats, all_video_details

In [20]:
# Function to insert data into SQL tables
def insert_data_into_sql(channel_stats, video_details):
    # Connect to SQLite database (or any other SQL database)
    conn = sqlite3.connect('youtube_data.db')
    cursor = conn.cursor()
    
    try:
        # Insert channel data into 'channels' table
        for channel in channel_stats:
            cursor.execute("""
                INSERT INTO channels (id, title, subscribers, views, total_videos, playlist_id)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                channel['channel_id'],
                channel['title'],
                channel['subscribers'],
                channel['views'],
                channel['total_videos'],
                channel['playlist_id']
            ))
        
        # Insert video data into 'videos' table
        for video in video_details:
            # Convert published_at datetime to date
            published_date = datetime.strptime(video['published_at'], '%Y-%m-%dT%H:%M:%SZ').date()
            
            cursor.execute("""
                INSERT INTO videos (id, title, published_date, views, likes, comments, channel_id, duration_seconds)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                video['video_id'],
                video['title'],
                published_date,
                video['views'],
                video['likes'],
                video['comments'],
                video['channel_id'],
                video['duration_seconds']
            ))
        
        # Commit changes and close connection
        conn.commit()
        print("Data inserted successfully into SQL tables.")
    
    except sqlite3.Error as e:
        print(f"Error inserting data: {e}")
    
    finally:
        conn.close()

# Main function to orchestrate the process
def main():
    # List of YouTube channel IDs for the three bank channels

    # List of YouTube channel IDs
    channel_ids = list(channel_ids_dict.values())

    # Fetch data from YouTube API
    channel_stats, video_details = fetch_youtube_data(channel_ids)

    # Insert data into SQL tables
    insert_data_into_sql(channel_stats, video_details)


if __name__ == "__main__":
    main()

Fetching videos for channel: KOHO
Fetching videos for channel: Wise
Fetching videos for channel: Tangerine
Data inserted successfully into SQL tables.


In [22]:
# Function to execute SQL queries and fetch results
def execute_query(query):
    conn = sqlite3.connect('youtube_data.db')
    cursor = conn.cursor()
    
    try:
        cursor.execute(query)
        results = cursor.fetchall()
        return results
    except sqlite3.Error as e:
        print(f"Error executing query: {e}")
    finally:
        conn.close()

# Query 1: Count of Videos per Channel
query1 = """
    SELECT
        c.title AS channel_title,
        COUNT(v.id) AS video_count
    FROM
        channels c
    LEFT JOIN
        videos v ON c.id = v.channel_id
    GROUP BY
        c.title
    ORDER BY
        video_count DESC;
"""

# Query 2: Average Views per Video
query2 = """
    SELECT
        c.title AS channel_title,
        AVG(v.views) AS avg_views_per_video
    FROM
        channels c
    LEFT JOIN
        videos v ON c.id = v.channel_id
    GROUP BY
        c.title
    ORDER BY
        avg_views_per_video DESC;
"""

# Query 3: Views to Comment Ratio
query3 = """
    SELECT
        c.title AS channel_title,
        SUM(v.views) AS total_views,
        SUM(v.comments) AS total_comments,
        CASE
            WHEN SUM(v.comments) > 0 THEN SUM(v.views) / SUM(v.comments)
            ELSE 0
        END AS views_to_comment_ratio
    FROM
        channels c
    LEFT JOIN
        videos v ON c.id = v.channel_id
    GROUP BY
        c.title
    ORDER BY
        views_to_comment_ratio DESC;
"""

# Execute and print results for Query 1
print("Query 1: Count of Videos per Channel")
results1 = execute_query(query1)
for row in results1:
    print(row)

# Execute and print results for Query 2
print("\nQuery 2: Average Views per Video")
results2 = execute_query(query2)
for row in results2:
    print(row)

# Execute and print results for Query 3
print("\nQuery 3: Views to Comment Ratio")
results3 = execute_query(query3)
for row in results3:
    print(row)

Query 1: Count of Videos per Channel
('Wise', 50)
('Tangerine', 50)
('KOHO', 28)

Query 2: Average Views per Video
('KOHO', 413382.6785714286)
('Tangerine', 9022.06)
('Wise', 4824.58)

Query 3: Views to Comment Ratio
('KOHO', 11574715, 361, 32062)
('Tangerine', 451103, 89, 5068)
('Wise', 241229, 1481, 162)


In [24]:
# Function to fetch data from SQLite and export to CSV
def export_to_csv(table_name, file_name):
    conn = sqlite3.connect('youtube_data.db')
    cursor = conn.cursor()
    
    # Fetch data from the table
    cursor.execute(f"SELECT * FROM {table_name};")
    data = cursor.fetchall()
    
    # Export data to CSV
    with open(file_name, 'w', newline='', encoding='utf-8') as csv_file:
        csv_writer = csv.writer(csv_file)
        # Write header (column names)
        csv_writer.writerow([description[0] for description in cursor.description])
        # Write data rows
        csv_writer.writerows(data)
    
    conn.close()

# Export channels table to CSV
export_to_csv('channels', 'channels.csv')

# Export videos table to CSV
export_to_csv('videos', 'videos.csv')

print("Export to CSV complete.")

Export to CSV complete.
