file path: .../vocab/src/scripts/youtube-recommendation-generator.ipynb

In [3]:
import os
import sys
import json
import logging
import frontmatter
from typing import Dict, List, Set
from googleapiclient.discovery import build

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants
BATCH_SIZE = 5

# Handle path resolution for both notebook and script contexts
try:
    # When running as a script
    current_dir = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # When running in Jupyter notebook
    current_dir = os.getcwd()

# Define paths relative to the current directory
ROOT_DIR = os.path.abspath(os.path.join(current_dir, "../.."))
ARTICLES_PATH = os.path.join(ROOT_DIR, "src/content/articles")
OUTPUT_PATH = os.path.join(ROOT_DIR, "src/data/youtube-video-list.json")

# Print paths for debugging
logger.info(f"Current directory: {current_dir}")
logger.info(f"Root directory: {ROOT_DIR}")
logger.info(f"Articles path: {ARTICLES_PATH}")
logger.info(f"Output path: {OUTPUT_PATH}")

def setup_youtube_client(api_key: str):
    """Initialize YouTube API client."""
    try:
        return build('youtube', 'v3', developerKey=api_key)
    except Exception as e:
        logger.error(f"Failed to initialize YouTube client: {e}")
        raise

def search_videos(youtube_client, concept: str) -> List[Dict]:
    """Search for YouTube videos related to a concept."""
    try:
        # Build search query with AI/ML context
        search_query = f"{concept} in AI or machine learning"
        
        request = youtube_client.search().list(
            q=search_query,
            part='snippet',
            maxResults=3,  # Limit to top 3 results
            type='video',
            order='relevance'
        )
        response = request.execute()
        
        # Format the response
        videos = []
        for item in response.get('items', []):
            video = {
                'title': item['snippet']['title'],
                'link': f"https://www.youtube.com/watch?v={item['id']['videoId']}"
            }
            videos.append(video)
        
        return videos
    except Exception as e:
        logger.error(f"Failed to search videos for {concept}: {e}")
        return []

def load_existing_data() -> tuple[List[Dict], Set[str]]:
    """Load existing JSON data and processed slugs."""
    try:
        if os.path.exists(OUTPUT_PATH):
            with open(OUTPUT_PATH, 'r', encoding='utf-8') as f:
                existing_data = json.load(f)
                processed_slugs = {item['slug'] for item in existing_data}
                return existing_data, processed_slugs
        return [], set()
    except Exception as e:
        logger.error(f"Error loading existing data: {e}")
        return [], set()

def get_unprocessed_files(processed_slugs: Set[str]) -> List[str]:
    """Get list of unprocessed markdown files."""
    try:
        if not os.path.exists(ARTICLES_PATH):
            logger.error(f"Articles directory not found: {ARTICLES_PATH}")
            return []

        all_files = [f for f in os.listdir(ARTICLES_PATH) if f.endswith('.md')]
        all_files.sort()  # Ensure consistent ordering
        
        unprocessed_files = []
        for file in all_files:
            file_path = os.path.join(ARTICLES_PATH, file)
            try:
                post = frontmatter.load(file_path)
                if post.get('slug') not in processed_slugs:
                    unprocessed_files.append(file_path)
                    if len(unprocessed_files) >= BATCH_SIZE:
                        break
            except Exception as e:
                logger.warning(f"Error reading file {file}: {e}")
                continue
                
        return unprocessed_files
    except Exception as e:
        logger.error(f"Error scanning directory: {e}")
        return []

def process_files(youtube_client, files: List[str]) -> List[Dict]:
    """Process markdown files and get video recommendations."""
    results = []
    
    for file_path in files:
        try:
            # Parse markdown file
            post = frontmatter.load(file_path)
            title = post.get('title', '')
            slug = post.get('slug', '')
            
            if not title or not slug:
                continue
                
            # Get video recommendations
            videos = search_videos(youtube_client, title)
            
            # Create result entry
            result = {
                'slug': slug,
                'title': title,
                'recommendations': videos
            }
            
            results.append(result)
            logger.info(f"Processed {slug}")
            
        except Exception as e:
            logger.error(f"Error processing file {file_path}: {e}")
            continue
            
    return results

def save_results(all_results: List[Dict]) -> None:
    """Save results to JSON file."""
    try:
        os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
        with open(OUTPUT_PATH, 'w', encoding='utf-8') as f:
            json.dump(all_results, f, indent=2, ensure_ascii=False)
        logger.info(f"Results saved to {OUTPUT_PATH}")
    except Exception as e:
        logger.error(f"Error saving results: {e}")
        # Try backup location
        backup_path = 'youtube-video-list.json'
        try:
            with open(backup_path, 'w', encoding='utf-8') as f:
                json.dump(all_results, f, indent=2, ensure_ascii=False)
            logger.info(f"Results saved to backup location: {backup_path}")
        except Exception as e:
            logger.error(f"Error saving to backup location: {e}")

def main():
    """Main execution function."""
    try:
        # Load API key (you'll need to implement this)
        try:
            from config import GOOGLECLOUD_API_KEY
        except ImportError:
            logger.error("Could not import GOOGLECLOUD_API_KEY from config.py")
            return
        
        youtube_client = setup_youtube_client(GOOGLECLOUD_API_KEY)
        
        # Load existing data
        existing_data, processed_slugs = load_existing_data()
        
        # Get unprocessed files
        files_to_process = get_unprocessed_files(processed_slugs)
        
        if not files_to_process:
            logger.info("No new files to process")
            return
            
        # Process files
        new_results = process_files(youtube_client, files_to_process)
        
        # Combine results
        all_results = existing_data + new_results
        
        # Save results
        save_results(all_results)
        
    except Exception as e:
        logger.error(f"An error occurred in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

NameError: name '__file__' is not defined