In [0]:
%pip install google-api-python-client

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# ===== CREATE JOB WIDGETS =====
# Not needed for production
# dbutils.widgets.text("YOUTUBE_API_KEY", "")
# dbutils.widgets.text("S3_BUCKET", "")
# dbutils.widgets.text("CHANNEL_CSV_PATH", "")
# dbutils.widgets.text("SAVE_FORMAT", "csv")
# dbutils.widgets.text("UPLOAD_TO_S3", "true")
# dbutils.widgets.text("AWS_ACCESS_KEY_ID", "")
# dbutils.widgets.text("AWS_SECRET_ACCESS_KEY", "")

# ===== JOB PARAMETERS =====
YOUTUBE_API_KEY = dbutils.widgets.get("YOUTUBE_API_KEY")
S3_BUCKET = dbutils.widgets.get("S3_BUCKET")
CHANNEL_CSV_PATH = dbutils.widgets.get("CHANNEL_CSV_PATH")
SAVE_FORMAT = dbutils.widgets.get("SAVE_FORMAT")
UPLOAD_TO_S3 = dbutils.widgets.get("UPLOAD_TO_S3").lower() == "true"
AWS_ACCESS_KEY_ID = dbutils.widgets.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = dbutils.widgets.get("AWS_SECRET_ACCESS_KEY")


'''
YouTube Analytics Data Ingestion Script - Databricks Version

This script extracts comprehensive data from YouTube channels using the YouTube Data API v3.
Optimized for Databricks environment with DBFS integration.

OPTIMIZATION FEATURES:
- Incremental fetching: Only fetches new videos since last run
- Video ID caching: Stores previously fetched video IDs to avoid duplicates
- Reduced API calls: Skips re-fetching details for already processed videos

Data Extracted:
Channel-level:
    - Channel ID
    - Channel name
    - Subscribers count
    - Total views
    - Video count

Video-level:
    - Video ID
    - Title
    - Published date
    - View count
    - Like count
    - Comment count
    - Duration

Optional (bonus):
    - Tags
    - Category
    - Language
'''

# Import required libraries
from googleapiclient.discovery import build  # Google API client for YouTube API
import pandas as pd  # Data manipulation and CSV export
import isodate  # Parse ISO 8601 duration format (e.g., PT4M13S)
import datetime  # Timestamp generation for file naming
import logging
import json  # For caching metadata


# ===== DATABRICKS CONFIGURATION =====
# In Databricks, use dbutils for secrets management and file operations
# Secrets should be stored in Databricks Secret Scopes
# Example: dbutils.secrets.put(scope="youtube-api", key="api-key", value="YOUR_API_KEY")

try:
    # Try to get dbutils (available in Databricks notebooks)
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    IS_DATABRICKS = True
except ImportError:
    # Fallback for testing outside Databricks
    IS_DATABRICKS = False
    print("Warning: Not running in Databricks environment. Using fallback configuration.")


# ===== LOGGING CONFIGURATION =====
# Set up logging for Databricks environment
# Logs will be visible in the notebook output and cluster logs
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# ===== DBFS PATH CONFIGURATION =====
# Databricks File System (DBFS) paths for storing data
# Use /dbfs/ prefix for local file system access or dbfs:/ for Spark operations
DBFS_BASE_PATH = '/Workspace/Users/shamanthkrishna0@gmail.com/youtube-analytics-data-pipeline'
DBFS_OUTPUT_PATH = f'{DBFS_BASE_PATH}/Output'
DBFS_LOGS_PATH = f'{DBFS_BASE_PATH}/Logs'
DBFS_INPUT_PATH = f'{DBFS_BASE_PATH}/input'
DBFS_CACHE_PATH = f'{DBFS_BASE_PATH}/cache'  # New: Cache directory for incremental fetching


def setup_dbfs_directories():
    """
    Create necessary DBFS directories for the pipeline.
    Uses dbutils.fs for Databricks-native file operations.
    """
    if IS_DATABRICKS:
        # Create directories using dbutils
        for path in [DBFS_OUTPUT_PATH, DBFS_LOGS_PATH, DBFS_INPUT_PATH, DBFS_CACHE_PATH]:
            dbfs_path = path.replace('/dbfs', 'dbfs:')
            try:
                dbutils.fs.mkdirs(dbfs_path)
                logger.info(f"Created/verified directory: {dbfs_path}")
            except Exception as e:
                logger.warning(f"Could not create directory {dbfs_path}: {e}")
    else:
        # Fallback for local testing
        import os
        for path in [DBFS_OUTPUT_PATH, DBFS_LOGS_PATH, DBFS_INPUT_PATH, DBFS_CACHE_PATH]:
            os.makedirs(path, exist_ok=True)


# ===== CACHING FUNCTIONS FOR INCREMENTAL FETCHING =====
def get_cache_file_path(channel_id):
    """Get the cache file path for a specific channel."""
    return f'{DBFS_CACHE_PATH}/channel_{channel_id}_cache.json'


def load_channel_cache(channel_id):
    """
    Load cached data for a channel including:
    - Last fetch timestamp
    - Previously fetched video IDs
    
    Returns:
        dict: Cache data with 'last_fetch_time' and 'video_ids' keys
              Returns empty cache if file doesn't exist
    """
    cache_file = get_cache_file_path(channel_id)
    try:
        with open(cache_file, 'r') as f:
            cache_data = json.load(f)
            logger.info(f"Loaded cache for channel {channel_id}: {len(cache_data.get('video_ids', []))} cached videos")
            return cache_data
    except FileNotFoundError:
        logger.info(f"No cache found for channel {channel_id}, will fetch all videos")
        return {'last_fetch_time': None, 'video_ids': []}
    except Exception as e:
        logger.warning(f"Error loading cache for {channel_id}: {e}")
        return {'last_fetch_time': None, 'video_ids': []}


def save_channel_cache(channel_id, video_ids, fetch_time=None):
    """
    Save cache data for a channel.
    
    Parameters:
        channel_id (str): The channel ID
        video_ids (list): List of all video IDs fetched (cumulative)
        fetch_time (str): ISO format timestamp of this fetch
    """
    cache_file = get_cache_file_path(channel_id)
    if fetch_time is None:
        fetch_time = datetime.datetime.utcnow().isoformat() + 'Z'
    
    cache_data = {
        'last_fetch_time': fetch_time,
        'video_ids': list(set(video_ids)),  # Deduplicate
        'updated_at': datetime.datetime.now().isoformat()
    }
    
    try:
        with open(cache_file, 'w') as f:
            json.dump(cache_data, f, indent=2)
        logger.info(f"Saved cache for channel {channel_id}: {len(cache_data['video_ids'])} total videos")
    except Exception as e:
        logger.error(f"Error saving cache for {channel_id}: {e}")


def get_api_credentials():
    """
    Retrieve API credentials from Databricks secrets.
    
    In Databricks, create a secret scope and add your API key:
    - Scope name: 'youtube-api' (or customize)
    - Key name: 'api-key'
    
    Command to create secret scope (run in Databricks notebook):
    dbutils.secrets.help()
    
    Returns:
        tuple: (API_KEY, S3_BUCKET_NAME)
    """
    if IS_DATABRICKS:
        try:
            # Retrieve API key from Databricks secrets
            # Replace 'youtube-api' with your actual secret scope name
            # Optional: Get S3 bucket name from secrets
            API_KEY = YOUTUBE_API_KEY
            bucket = S3_BUCKET
            
            logger.info("Successfully retrieved credentials from Databricks secrets")
            return API_KEY, bucket
        except Exception as e:
            logger.error(f"Error retrieving secrets: {e}")
            raise ValueError("Failed to retrieve API credentials from Databricks secrets. "
                           "Please configure the 'youtube-api' secret scope.")
    else:
        # Fallback for local testing
        import os
        from dotenv import load_dotenv
        load_dotenv()
        return API_KEY, S3_BUCKET


def get_channel_stats(youtube, channel_id):
    """
    Fetch channel-level statistics from YouTube API.
    
    This function retrieves basic information and statistics for a YouTube channel,
    including subscriber count, total views, and number of videos published.
    
    Parameters:
        youtube: The YouTube API client object
        channel_id (str): The unique identifier for the YouTube channel
    
    Returns:
        dict: A dictionary containing channel statistics with keys:
              - channel_id: The channel's unique identifier
              - channel_name: The display name of the channel
              - subscribers: Total subscriber count
              - total_views: Cumulative view count across all videos
              - video_count: Total number of videos published
    """
    request = youtube.channels().list(
        part='snippet,statistics',
        id=channel_id
    )
    
    response = request.execute()
    data = {}
    
    for item in response['items']:
        data['channel_id'] = item['id']
        data['channel_name'] = item['snippet']['title']
        data['subscribers'] = item['statistics'].get('subscriberCount', 0)
        data['total_views'] = item['statistics'].get('viewCount', 0)
        data['video_count'] = item['statistics'].get('videoCount', 0)
    
    return data


def get_video_ids(youtube, channel_id, published_after=None):
    """
    Retrieve a list of video IDs from a YouTube channel.
    
    OPTIMIZATION: Supports incremental fetching by filtering videos
    published after a specific date, reducing API quota usage.
    
    Parameters:
        youtube: The YouTube API client object
        channel_id (str): The unique identifier for the YouTube channel
        published_after (str): ISO 8601 timestamp to filter videos (optional)
                              Only fetches videos published after this time
    
    Returns:
        list: A list of video ID strings
    
    Note:
        Uses pagination to fetch all videos when needed.
    """
    video_ids = []
    next_page_token = None
    
    while True:
        # Build request parameters
        request_params = {
            'part': 'id',
            'channelId': channel_id,
            'maxResults': 50,
            'type': 'video',
            'order': 'date'  # Order by date to get newest first
        }
        
        # Add publishedAfter filter for incremental fetching
        if published_after:
            request_params['publishedAfter'] = published_after
            logger.info(f"Fetching videos published after: {published_after}")
        
        # Add pagination token if available
        if next_page_token:
            request_params['pageToken'] = next_page_token
        
        request = youtube.search().list(**request_params)
        response = request.execute()
        
        # Extract video IDs from response
        for item in response['items']:
            video_ids.append(item['id']['videoId'])
        
        # Check for more pages (only paginate for full fetches, not incremental)
        next_page_token = response.get('nextPageToken')
        if not next_page_token or published_after:
            # For incremental fetches, one page is usually enough
            # For full fetches, continue pagination
            if published_after:
                break
            elif not next_page_token:
                break
    
    return video_ids


def get_video_details(youtube, video_ids):
    """
    Fetch detailed information for multiple YouTube videos.
    
    This function retrieves comprehensive metadata and statistics for a list
    of video IDs. It processes videos in batches of 50 (API limit) to handle
    large numbers of videos efficiently.
    
    Parameters:
        youtube: The YouTube API client object
        video_ids (list): List of video ID strings to fetch details for
    
    Returns:
        list: A list of dictionaries, where each dictionary contains:
              - video_id: Unique video identifier
              - title: Video title
              - published_date: Date and time the video was published (ISO format)
              - view_count: Number of views
              - like_count: Number of likes
              - comment_count: Number of comments
              - duration: Video length in seconds (converted from ISO 8601)
              - tags: Comma-separated list of video tags
              - category_id: YouTube category identifier
              - language: Video language code
    """
    all_video_info = []
    
    # Skip if no videos to process
    if not video_ids:
        logger.info("No new videos to fetch details for")
        return all_video_info
    
    for i in range(0, len(video_ids), 50):
        request = youtube.videos().list(
            part='snippet,statistics,contentDetails',
            id=','.join(video_ids[i:i+50])
        )

        response = request.execute()
        
        for item in response['items']:
            video_info = {
                'video_id': item['id'],
                'title': item['snippet']['title'],
                'published_date': item['snippet']['publishedAt'],
                'view_count': item['statistics'].get('viewCount', 0),
                'like_count': item['statistics'].get('likeCount', 0),
                'comment_count': item['statistics'].get('commentCount', 0),
                'duration': isodate.parse_duration(item['contentDetails']['duration']).total_seconds(),
                'tags': ','.join(item['snippet'].get('tags', [])),
                'category_id': item['snippet'].get('categoryId', ''),
                'language': item['snippet'].get('defaultAudioLanguage', item['snippet'].get('defaultLanguage', ''))
            }
            all_video_info.append(video_info)
    
    return all_video_info


def upload_to_s3_from_databricks(local_file_path, bucket):
    """
    Upload files to S3 from Databricks using AWS credentials.
    
    In Databricks, AWS credentials can be configured at cluster level or
    retrieved from secrets for S3 access.
    
    Parameters:
        local_file_path (str): Local DBFS path to the file
        s3_bucket (str): S3 bucket name
    
    Returns:
        bool: True if successful, False otherwise
    """
    try:
        import boto3
        from botocore.exceptions import ClientError
        
        # Get AWS credentials from Databricks secrets
        if IS_DATABRICKS:
            try:
                
                s3_client = boto3.client(
                    's3',
                    aws_access_key_id = AWS_ACCESS_KEY_ID,
                    aws_secret_access_key = AWS_SECRET_ACCESS_KEY
                )
            except:
                # If secrets not configured, try using instance profile/IAM role
                s3_client = boto3.client('s3')
        else:
            s3_client = boto3.client('s3')
        
        # Extract filename and create S3 key with hierarchical structure
        filename = local_file_path.split('/')[-1]
        now = datetime.datetime.now()
        s3_key = f"youtube-raw-data/{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/{filename}"
        
        # Upload file to S3
        s3_client.upload_file(local_file_path, bucket, s3_key)
        
        s3_uri = f"s3://{bucket}/{s3_key}"
        logger.info(f"Successfully uploaded to {s3_uri}")
        print(f"‚úì Uploaded: {s3_uri}")
        
        return True
    except ClientError as e:
        logger.error(f"S3 upload failed: {e}")
        print(f"‚úó Failed to upload {local_file_path}: {e}")
        return False
    except Exception as e:
        logger.error(f"Unexpected error during S3 upload: {e}")
        print(f"‚úó Error: {e}")
        return False


def save_to_delta_table(df, table_name, mode='append'):
    """
    Save DataFrame to Delta Lake table (Databricks native format).
    
    Delta Lake provides ACID transactions, schema enforcement, and time travel.
    This is the recommended storage format in Databricks.
    
    Parameters:
        df (pandas.DataFrame or pyspark.sql.DataFrame): Data to save
        table_name (str): Name of the Delta table
        mode (str): Save mode - 'overwrite', 'append', 'error', 'ignore'
    """
    if IS_DATABRICKS:
        try:
            # Convert pandas DataFrame to Spark DataFrame if needed
            if isinstance(df, pd.DataFrame):
                spark_df = spark.createDataFrame(df)
            else:
                spark_df = df
            
            # Write to Delta table
            spark_df.write \
                .format('delta') \
                .mode(mode) \
                .saveAsTable(table_name)
            
            logger.info(f"Successfully saved to Delta table: {table_name}")
            print(f"‚úì Data saved to Delta table: {table_name}")
            
        except Exception as e:
            logger.error(f"Failed to save to Delta table: {e}")
            print(f"‚úó Error saving to Delta table: {e}")
    else:
        logger.warning("Delta table save skipped - not in Databricks environment")


def main(channel_id, youtube, incremental=True):
    """
    Main orchestration function to extract and save YouTube channel data.
    
    OPTIMIZATION: Supports incremental mode to only fetch new videos
    since the last run, significantly reducing API calls.
    
    This function coordinates the entire data extraction pipeline:
    1. Fetch channel-level statistics (always refreshed)
    2. Load cache to get last fetch time
    3. Retrieve only NEW video IDs (incremental) or all (full)
    4. Get detailed information only for NEW videos
    5. Update cache with new video IDs
    6. Return data for consolidation
    
    Parameters:
        channel_id (str): The unique identifier for the YouTube channel
        youtube: The YouTube API client object
        incremental (bool): If True, only fetch new videos since last run
    
    Returns:
        tuple: (channel_stats, video_details) - Dictionary and list containing
               channel statistics and video details respectively
    """
    # STEP 1: Get channel-level statistics (always fetch fresh stats)
    logger.info(f"Fetching channel statistics for {channel_id}...")
    print(f"Fetching channel statistics for {channel_id}...")
    channel_stats = get_channel_stats(youtube, channel_id)
    
    # Display channel information
    logger.info(f"Channel: {channel_stats['channel_name']}")
    print(f"Channel: {channel_stats['channel_name']}")
    logger.info(f"Subscribers: {channel_stats['subscribers']}")
    print(f"Subscribers: {channel_stats['subscribers']}")
    logger.info(f"Total Views: {channel_stats['total_views']}")
    print(f"Total Views: {channel_stats['total_views']}")
    logger.info(f"Video Count: {channel_stats['video_count']}\n")
    print(f"Video Count: {channel_stats['video_count']}\n")
    
    # STEP 2: Load cache for incremental fetching
    cache_data = load_channel_cache(channel_id)
    cached_video_ids = set(cache_data.get('video_ids', []))
    last_fetch_time = cache_data.get('last_fetch_time')
    
    # STEP 3: Get video IDs (incremental or full)
    logger.info("Fetching video IDs...")
    print("Fetching video IDs...")
    
    if incremental and last_fetch_time:
        print(f"üìä INCREMENTAL MODE: Fetching videos since {last_fetch_time}")
        logger.info(f"Incremental fetch - last run: {last_fetch_time}")
        video_ids = get_video_ids(youtube, channel_id, published_after=last_fetch_time)
    else:
        print("üìä FULL MODE: Fetching all videos")
        logger.info("Full fetch - no previous cache or incremental disabled")
        video_ids = get_video_ids(youtube, channel_id)
    
    # STEP 4: Filter out already cached video IDs
    new_video_ids = [vid for vid in video_ids if vid not in cached_video_ids]
    
    logger.info(f"Found {len(video_ids)} videos from API")
    logger.info(f"Already cached: {len(cached_video_ids)} videos")
    logger.info(f"New videos to fetch: {len(new_video_ids)}")
    print(f"Found {len(video_ids)} videos from API")
    print(f"Already cached: {len(cached_video_ids)} videos") 
    print(f"üÜï New videos to fetch details: {len(new_video_ids)}\n")
    
    # STEP 5: Get detailed information only for NEW videos
    logger.info("Fetching video details for new videos...")
    print("Fetching video details for new videos...")
    video_details = get_video_details(youtube, new_video_ids)
    logger.info(f"Retrieved details for {len(video_details)} new videos\n")
    print(f"Retrieved details for {len(video_details)} new videos\n")
    
    # STEP 6: Update cache with all video IDs (existing + new)
    all_video_ids = list(cached_video_ids.union(set(new_video_ids)))
    current_time = datetime.datetime.utcnow().isoformat() + 'Z'
    save_channel_cache(channel_id, all_video_ids, current_time)
    
    # STEP 7: Enrich video details with channel information
    for video in video_details:
        video['channel_id'] = channel_stats['channel_id']
        video['channel_name'] = channel_stats['channel_name']
    
    # Print API savings summary
    if cached_video_ids:
        saved_calls = len(cached_video_ids) // 50 + 1
        print(f"üí∞ API SAVINGS: Skipped ~{saved_calls} API calls by using cache!")
    
    return channel_stats, video_details


def run_ingestion_pipeline(channel_csv_path=None, save_format='csv', upload_to_s3=True, incremental=True):
    """
    Execute the complete YouTube data ingestion pipeline for Databricks.
    
    This is the main entry point for the Databricks notebook.
    
    OPTIMIZATION: Set incremental=True to only fetch new videos since last run.
    This significantly reduces YouTube API quota usage.
    
    Parameters:
        channel_csv_path (str): Path to CSV file with channel IDs (DBFS path)
                               If None, uses default location
        save_format (str): Output format - 'csv', 'delta', or 'both'
        upload_to_s3 (bool): Whether to upload CSV files to S3
        incremental (bool): If True, only fetch new videos since last run
                           If False, fetch all videos (full refresh)
    
    Returns:
        tuple: (channel_df, videos_df) - DataFrames with collected data
    """
    # Setup directories
    setup_dbfs_directories()
    
    # Get API credentials
    API_KEY, S3_BUCKET = get_api_credentials()
    
    # Build YouTube API client
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'
    youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=API_KEY)
    
    # Load channel IDs from CSV
    if channel_csv_path is None:
        # Default location in DBFS
        channel_csv_path = f'{DBFS_INPUT_PATH}/top10channelid.csv'
    
    logger.info(f"Loading channel IDs from: {channel_csv_path}")
    print(f"Loading channel IDs from: {channel_csv_path}")
    
    # Print mode information
    print("\n" + "="*60)
    if incremental:
        print("üöÄ RUNNING IN INCREMENTAL MODE (Optimized)")
        print("   Only new videos since last run will be fetched")
    else:
        print("üîÑ RUNNING IN FULL REFRESH MODE")
        print("   All videos will be fetched (higher API usage)")
    print("="*60 + "\n")
    
    try:
        channelid_data = pd.read_csv(channel_csv_path)
    except FileNotFoundError:
        logger.error(f"Channel ID file not found: {channel_csv_path}")
        print(f"‚úó Error: Channel ID file not found at {channel_csv_path}")
        print(f"Please upload your 'top10channelid.csv' file to {DBFS_INPUT_PATH}/")
        return None, None
    
    # Initialize data collection lists
    all_channel_stats = []
    all_video_details = []
    
    # Track API call savings
    total_new_videos = 0
    total_cached_videos = 0
    
    # Process each channel
    for index, row in channelid_data.iterrows():
        CHANNEL_ID = row['channel_id']
        logger.info(f"Processing channel ID: {CHANNEL_ID}")
        print(f"\n{'='*50}")
        print(f"Processing channel ID: {CHANNEL_ID}")
        print(f"{'='*50}")
        
        channel_stats, video_details = main(CHANNEL_ID, youtube, incremental=incremental)
        
        all_channel_stats.append(channel_stats)
        all_video_details.extend(video_details)
        total_new_videos += len(video_details)
    
    # Create DataFrames
    print("\nCreating final DataFrames...")
    logger.info("Creating final DataFrames...")
    
    channel_df = pd.DataFrame(all_channel_stats)
    videos_df = pd.DataFrame(all_video_details)
    
    # ---- FIX DATA TYPES BEFORE SPARK ----
    if len(videos_df) > 0:
        numeric_cols = [
            "view_count",
            "like_count",
            "comment_count",
            "duration"
        ]

        for col in numeric_cols:
            videos_df[col] = pd.to_numeric(videos_df[col], errors="coerce").fillna(0).astype("int64")

    channel_numeric_cols = [
        "subscribers",
        "total_views",
        "video_count"
    ]

    for col in channel_numeric_cols:
        channel_df[col] = pd.to_numeric(channel_df[col], errors="coerce").fillna(0).astype("int64")

    # Generate timestamp
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Save to CSV if requested (only if we have new data)
    if save_format in ['csv', 'both']:
        channel_filename = f'{DBFS_OUTPUT_PATH}/channel_stats_{timestamp}.csv'
        videos_filename = f'{DBFS_OUTPUT_PATH}/video_details_{timestamp}.csv'
        
        channel_df.to_csv(channel_filename, index=False)
        videos_df.to_csv(videos_filename, index=False)
        
        print(f"\nCSV files saved to:")
        print(f"  - {channel_filename}")
        print(f"  - {videos_filename}")
        logger.info(f"CSV files saved to {DBFS_OUTPUT_PATH}")
        
        # Upload to S3 if requested
        if upload_to_s3 and S3_BUCKET:
            print(f"\nUploading files to S3 bucket: {S3_BUCKET}...")
            logger.info(f"Starting S3 upload to bucket: {S3_BUCKET}")
            
            success_channel = upload_to_s3_from_databricks(channel_filename, S3_BUCKET)
            success_videos = upload_to_s3_from_databricks(videos_filename, S3_BUCKET)
            
            if success_channel and success_videos:
                print("\n‚úì All files successfully uploaded to S3!")
                logger.info("All files successfully uploaded to S3")
            else:
                print("\n‚úó Some files failed to upload to S3. Check logs for details.")
                logger.warning("Some S3 uploads failed")
        elif upload_to_s3 and not S3_BUCKET:
            print("\n‚ö† S3 bucket not configured. Skipping S3 upload.")
            logger.warning("S3 bucket not configured, skipping S3 upload")
    
    # Save to Delta tables if requested
    if save_format in ['delta', 'both']:
        save_to_delta_table(channel_df, f'youtube_channel_stats_{timestamp}', mode='overwrite')
        if len(videos_df) > 0:
            save_to_delta_table(videos_df, f'youtube_video_details_{timestamp}', mode='overwrite')
        else:
            print("‚ÑπÔ∏è No new videos to save to Delta table")
    
    # Print summary
    print("\n" + "="*60)
    print("‚úì Pipeline execution completed successfully!")
    print("="*60)
    print(f"üìä SUMMARY:")
    print(f"   - Channels processed: {len(channel_df)}")
    print(f"   - New videos fetched: {total_new_videos}")
    if incremental:
        print(f"   - Mode: INCREMENTAL (optimized)")
    else:
        print(f"   - Mode: FULL REFRESH")
    print("="*60)
    
    logger.info("Pipeline execution completed")
    
    return channel_df, videos_df


def clear_cache(channel_id=None):
    """
    Clear the video cache for a specific channel or all channels.
    
    Use this when you want to do a full refresh of video data.
    
    Parameters:
        channel_id (str): Specific channel ID to clear cache for.
                         If None, clears all channel caches.
    """
    if channel_id:
        cache_file = get_cache_file_path(channel_id)
        try:
            import os
            os.remove(cache_file)
            print(f"‚úì Cleared cache for channel: {channel_id}")
            logger.info(f"Cleared cache for channel: {channel_id}")
        except FileNotFoundError:
            print(f"‚ÑπÔ∏è No cache found for channel: {channel_id}")
        except Exception as e:
            print(f"‚úó Error clearing cache: {e}")
    else:
        # Clear all caches
        try:
            if IS_DATABRICKS:
                dbfs_path = DBFS_CACHE_PATH.replace('/dbfs', 'dbfs:')
                files = dbutils.fs.ls(dbfs_path)
                for f in files:
                    if f.name.endswith('_cache.json'):
                        dbutils.fs.rm(f.path)
                print(f"‚úì Cleared all channel caches")
            else:
                import os
                import glob
                for cache_file in glob.glob(f'{DBFS_CACHE_PATH}/*_cache.json'):
                    os.remove(cache_file)
                print(f"‚úì Cleared all channel caches")
        except Exception as e:
            print(f"‚úó Error clearing caches: {e}")


# ===== DATABRICKS NOTEBOOK EXECUTION =====
# To run this in a Databricks notebook, execute:
#
# INCREMENTAL MODE (recommended for daily runs - saves API quota):
# channel_df, videos_df = run_ingestion_pipeline(
#     save_format='both',
#     upload_to_s3=True,
#     incremental=True  # Only fetch new videos
# )
#
# FULL REFRESH MODE (use when you need all data):
# channel_df, videos_df = run_ingestion_pipeline(
#     save_format='both',
#     upload_to_s3=True,
#     incremental=False  # Fetch all videos
# )
#
# TO CLEAR CACHE (before full refresh):
# clear_cache()  # Clear all caches
# clear_cache('UCxxxxxxx')  # Clear specific channel

channel_df, videos_df = run_ingestion_pipeline(
    save_format='both',  # Save as both CSV and Delta tables
    upload_to_s3=True,
    incremental=True  # Set to False for full refresh
)
# display(channel_df)
# display(videos_df)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can