In [20]:
from googleapiclient.discovery import build
import pandas as pd
import seaborn as sns
import matplotlib.ticker as ticker 
import matplotlib.pyplot as plt
from pathlib import Path
import os
import isodate
import psycopg2
from sqlalchemy import create_engine,text

## Data collection and storage

In [21]:
api_key = Path("YT_API_KEY.txt").read_text() # store YouTube API key in YT_API_KEY.txt
channel_handle = '@alextheanalyst' # YouTube Channel 'Alex The Analyst'
                   
api_service_name = 'youtube'
api_version = 'v3'

# Initialize YouTube Data API client using google api library
youtube = build(api_service_name, api_version, developerKey=api_key)

### Function to request channel information

In [22]:
# request channel information using youtube data api
def request_channel_info(youtube, channel_handle):    
    
    request = youtube.channels().list(
        part="snippet,contentDetails,statistics",
        forHandle=channel_handle        
        )
    
    response = request.execute()
    #return response   
        
    response_data = response['items'][0]
    data = {'channel': response_data['snippet']['title'],
            'description': response_data['snippet']['description'],
            'subscribers': response_data['statistics']['subscriberCount'],
            'views': response_data['statistics']['viewCount'],
            'total_videos': response_data['statistics']['videoCount'],
            'channel_playlist_id': response_data['contentDetails']['relatedPlaylists']['uploads'],
            'channel_id': response_data['id']            
            }              
        
        
    #return the statistic data of Channel 'Alex The Analyst'
    return data

In [23]:
channel_statistics = request_channel_info(youtube,channel_handle)
# print(channel_statistics)

In [67]:
# convert channel data to panda DataFrame
df_channel = pd.DataFrame([channel_statistics])

In [68]:
# Save channel data to a csv file
os.makedirs("data",exist_ok=True)
df_channel.to_csv('data/channel.csv', index=False)

### Funtion to get video IDs

In [26]:
def get_playlist_videos(youtube, channel_playlist_id): 
    
    video_ids = []
    next_page_token = None   
    
    while True: 
        request = youtube.playlistItems().list(
            part='snippet,contentDetails',
            maxResults=50,
            pageToken=next_page_token,
            playlistId=channel_playlist_id
            )
        
        response = request.execute()
        #return response
    
        for item in response['items']:
            video_id = item['contentDetails']['videoId']               
            video_ids.append(video_id)
        
        next_page_token=response.get('nextPageToken')
        #print(next_page_token)
        if not next_page_token:
            break    
           
    return video_ids 
   

In [27]:
channel_playlist_id = channel_data.loc[channel_data['channel']=='Alex The Analyst','channel_playlist_id'].iloc[0]

In [28]:
video_ids = get_playlist_videos(youtube, channel_playlist_id)
#print(video_ids)

### Function to get video details

In [29]:
def get_video_block_details(youtube, video_block_ids):
    """
    get video details with maximal 50 video IDs
    """
    
    request = youtube.videos().list(
        part='snippet,contentDetails,statistics',
        id=','.join(video_block_ids)
        )
    response = request.execute()
    
    return response


In [30]:
def get_all_video_details(youtube,video_ids):
    """
    get details of all videos
    """
    
    video_details = []
    video_len=len(video_ids)
    
    for i in range(0,video_len,50):
        video_block_ids=video_ids[i:i+50]        
        video_details.append(get_video_block_details(youtube, video_block_ids))   

    return video_details   
        

In [31]:
video_details = get_all_video_details(youtube,video_ids)
# print(video_details)

### Function to get video statistics

In [32]:
def get_video_stats(youtube,video_ids):
    """
    get the statistics of provided videos
    """
    
    all_video_stats = []
    video_details = get_all_video_details(youtube,video_ids)
    
    for details in video_details:
        for item in details['items']:                     
            stats_data = {'video_id': item['id'],
                          'title': item['snippet']['title'],
                          'published_at':item['snippet']['publishedAt'],                          
                          'views': item['statistics'].get('viewCount',0),
                          'likes': item['statistics'].get('likeCount',0),                      
                          'comments': item['statistics'].get('commentCount',0),  
                          'tags': item['snippet'].get('tags', []),
                          'duration': item['contentDetails']['duration']        
                         }
            all_video_stats.append(stats_data)
        
    return all_video_stats
        

In [33]:
all_video_stats = get_video_stats(youtube,video_ids)
# print(all_video_stats)

In [69]:
df_video = pd.DataFrame(all_video_stats)
# print(video_data)

In [70]:
# convert duration to seconds and only keep column duration_seconds
df_video['duration_seconds'] = df_video['duration'].apply(lambda x: int(isodate.parse_duration(x).total_seconds()))
video_data = df_video.drop(columns='duration')

### Extract playlist of each video

In [None]:
# Function to get all playlists from a channel

def get_all_playlists(youtube, channel_id):
    """
    Get all playlists from a channel
    """
    playlists = []
    request = youtube.playlists().list(
        part="snippet",
        channelId=channel_id,
        maxResults=50
    )
    response = request.execute()

    for item in response['items']:
        playlists.append({
            'playlist_id': item['id'],
            'playlist_title': item['snippet']['title']
        })

    return playlists


In [46]:
channel_playlists = get_all_playlists(youtube, channel_data['channel_id'][0])

In [47]:
channel_playlists

[{'playlist_id': 'PLUaB-1hjhk8FEghrGL0WO0Nc9jzG6J8s-',
  'playlist_title': 'Data Fundamentals for Beginners'},
 {'playlist_id': 'PLUaB-1hjhk8G0dEE2QuFw-qiMX6cRH-7t',
  'playlist_title': 'AWS Fundamentals for Analysts'},
 {'playlist_id': 'PLUaB-1hjhk8HYjVA_8gvl2StVvVGr4yoK',
  'playlist_title': 'Azure Fundamentals for Analysts'},
 {'playlist_id': 'PLUaB-1hjhk8GuvN7Yym7iNJps0y4DehhN',
  'playlist_title': 'Learn Shiny for Python'},
 {'playlist_id': 'PLUaB-1hjhk8GbuwvxkmPeKFDUUTkxP9M1',
  'playlist_title': 'All Things Analyst Builder'},
 {'playlist_id': 'PLUaB-1hjhk8G5zci4HA8E21x2BJS3jzNm',
  'playlist_title': 'Intermediate MySQL Tutorial Series'},
 {'playlist_id': 'PLUaB-1hjhk8GjfgvWlreA6BvTvazz8RHG',
  'playlist_title': 'Advanced MySQL Tutorial Series'},
 {'playlist_id': 'PLUaB-1hjhk8Fm_xpyeN4292wgE542Jy17',
  'playlist_title': 'SQL Technical Interview Prep'},
 {'playlist_id': 'PLUaB-1hjhk8Fq6RBY-3MQ5MCXB5qxb8VA',
  'playlist_title': 'MySQL Basics Tutorial Series'},
 {'playlist_id': 'PLU

In [None]:
# Function to get a mapping of video IDs to playlist titles

def get_playlist_video_map(youtube, channel_playlists):
    
    playlist_video_map = {}
    for pl in channel_playlists:
        pl_id = pl['playlist_id']
        pl_title = pl['playlist_title']
        next_page_token = None
        while True:
            request = youtube.playlistItems().list(
                part="snippet",
                playlistId=pl_id,
                maxResults=50,
                pageToken=next_page_token
            )
            response = request.execute()
            for item in response['items']:
                video_id = item['snippet']['resourceId']['videoId']
                playlist_video_map[video_id] = pl_title
            next_page_token = response.get('nextPageToken')
            if not next_page_token:
                break
        
    return playlist_video_map

In [65]:
playlist_video_map = get_playlist_video_map(youtube, channel_playlists)

In [66]:
print(playlist_video_map)

{'WpX2F2BS3Qc': 'Data Fundamentals for Beginners', 'cd_jj0IRmaA': 'Data Fundamentals for Beginners', 'OcG0B4ny1cE': 'Data Fundamentals for Beginners', 'hXpt8zj_ajc': 'Data Fundamentals for Beginners', 'ItZlTixh6Bs': 'Data Fundamentals for Beginners', 'PEWMgtu-1e4': 'Data Fundamentals for Beginners', 'YSjbGET6R1A': 'Data Analyst Bootcamp', 'hgaVi4sOHkM': 'Data Analyst Bootcamp', 'K2GfrERtliU': 'Data Analyst Bootcamp', 'AycpRKyRagE': 'Data Analyst Bootcamp', 'rxyLC247h6E': 'Data Analyst Bootcamp', 'ZYps6TmBkWk': 'Data Analyst Bootcamp', 'sEImMaovc1Q': 'Data Analyst Bootcamp', '9ur0OpMADuM': 'Data Analyst Bootcamp', 'sge9qTf8GdY': 'Data Analyst Bootcamp', 'vDVcXXfc9e8': 'Data Analyst Bootcamp', 'zv1nfZTYpio': 'Learn Shiny for Python', 'rn0dSsYXhIE': 'Learn Shiny for Python', 'siHou7lObbo': 'Learn Shiny for Python', 'Y-4ri9QjsTI': 'All Things Analyst Builder', '_R6K1aNtKpE': 'All Things Analyst Builder', '5tKmTdvEAVk': 'All Things Analyst Builder', 'KZKizyBS9YM': 'Data Analyst Bootcamp', '

In [71]:
df_video['playlist_title'] = df_video['video_id'].map(playlist_video_map)

In [72]:
# save video data into video.csv file
df_video.to_csv('data/video.csv', index=False)

### Load data into PostgreSQL Database youtube_analysis

In [None]:

# load database credentials from a credential file

def load_db_credentials(filepath):
    credentials = {}
    with open(filepath,'r') as file:
        lines = file.readlines()
        for line in lines:
            key, value = line.strip().split('=',1)
            credentials[key.strip()] = value.strip()
            
    return credentials

# Load database credentials
creds = load_db_credentials('db_credentials.txt')    
        

# Build connection string using credentials
conn_str = f"postgresql://{creds['user']}:{creds['password']}@{creds['host']}:{creds['port']}/{creds['dbname']}"

# Create SQLAlchemy engine
engine = create_engine(conn_str)

try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT 1"))
        print("✅ Connection successful! Now loading data into PostgreSQL...")
        
        # Load CSV to DataFrame
        df = pd.read_csv("data/video.csv")

        # Write DataFrame to PostgreSQL
        df.to_sql("raw_videos", engine, if_exists="append", index=False)
        print("✅ Data successfully loaded into 'raw_videos' table.")
         
except Exception as e:
    print("❌ Connection or data load failed:", e)     
      

❌ Connection or data load failed: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)
