In [58]:
import os
import sqlite3
import json
import random
import time
from datetime import datetime, timezone
import requests
from io import BytesIO
from PIL import Image
from googleapiclient.discovery import build
import google.generativeai as genai
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled

In [59]:
YOUTUBE_API_KEY = "AIzaSyBP9csKRKy6xUrKYn7ovmRcAiNn7pZhW1k"  # Replace with your YouTube Data API Key
GEMINI_API_KEY = "AIzaSyCVFKZOqUpoIz75Oi9fOmgtF42S49GIFD8"    # Replace with your Gemini API Key
DB_PATH = 'beauty_blueprint.db'

# Initialize Gemini AI
genai.configure(api_key=GEMINI_API_KEY)
# Using gemini-2.0-flash for multimodal capabilities and efficiency
GEMINI_MULTIMODAL_MODEL = 'gemini-2.0-flash'
GEMINI_TEXT_MODEL = 'gemini-2.0-flash'

# Search Queries (from your search query list.docx)
SEARCH_QUERIES = [
    'makeup artist'
    # 'beauty blogger', 'makeup guru', 'beauty influencer', 'makeup channel',
    # 'american makeup artist', 'us beauty blogger', 'english speaking makeup artist',
    # 'north american makeup channel', 'natural makeup artist', 'glam makeup channel',
    # 'editorial makeup artist', 'bridal makeup channel', 'everyday makeup tutorial channel',
    # 'clean beauty influencer', 'affordable makeup channel', 'luxury beauty guru',
    # 'skincare and makeup channel', 'pro makeup techniques channel',
    # 'makeup tips and tricks channel', 'south asian makeup artist', 'indian makeup channel',
    # 'desilook makeup', 'east asian makeup tutorial channel', 'korean makeup artist us',
    # 'japanese makeup channel usa', 'black makeup artist', 'african american beauty guru',
    # 'dark skin makeup channel', 'latinx beauty blogger', 'hispanic makeup artist',
    # 'latina beauty channel', 'white makeup artist', 'caucasian beauty blogger',
    # 'american beauty channel'
]


In [60]:
# --- Configuration ---

# Predefined Categories (from your Table Schema.docx)
PREDEFINED_CATEGORIES = {
    "face_shape": ["round_face", "oval_face", "square_face", "heart_face", "long_face", "diamond_face"],
    "eye_shape": ["hooded_eyes", "almond_eyes", "round_eyes", "downturned_eyes", "monolid_eyes",
                  "up-turned_eyes", "deep_set_eyes", "close_set_eyes", "wide_set_eyes"],
    "skin_undertone": ["warm_undertone", "cool_undertone", "neutral_undertone"],
    "skin_type_concerns": ["oily_skin", "dry_skin", "combination_skin", "acne_prone_skin",
                           "sensitive_skin", "mature_skin"],
    "lip_shape": ["fuller_lips", "thin_lips", "heart_shaped_lips", "bow_shaped_lips"],
    "other_features": ["narrow_nose", "wide_nose", "high_cheeks", "low_cheeks"],
    "primary_makeup_style": ["natural", "everyday", "glam", "editorial", "bridal", "dramatic",
                             "minimalist", "soft_glam", "no_makeup_makeup"],
    "target_skill_level": ["beginner", "intermediate", "advanced", "pro_artist"],
    "demographic_focus": ["teen_focused", "adult_focused", "mature_age_focused", "dark_skin_tones",
                          "light_skin_tones", "male_makeup", "non_binary_makeup"],
    "llm_thumbnail_face_shape": ["Round", "Oval", "Square", "Heart", "Long", "Diamond", "unknown"],
    "llm_thumbnail_skin_color": ["fair", "light", "medium", "tan", "dark", "deep", "unknown"],
    "llm_thumbnail_eye_shape": ["Almond", "Round", "Hooded", "Monolid", "Downturned", "Up-turned",
                                 "Deep-set", "Close-set", "Wide-set", "unknown"],
    "llm_thumbnail_lip_shape": ["Fuller", "Thin", "Heart", "Bow", "unknown"],
    "llm_thumbnail_status": ["analyzed", "skipped_multiple_faces", "skipped_no_clear_face",
                             "error_processing_image", "error_llm_response", "pending", "unknown"]
}

In [61]:
# --- Database Functions ---
def create_channels_table(db_path=DB_PATH):
    """
    Creates an SQLite database and the 'channels' table within it.
    """
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        create_table_sql = """
        CREATE TABLE IF NOT EXISTS channels_new (
            channel_id                      TEXT PRIMARY KEY,
            channel_name                    TEXT NOT NULL,
            channel_url                     TEXT,
            channel_description             TEXT,
            published_at                    TEXT,
            profile_picture_url             TEXT,
            subscriber_count                INTEGER,
            video_count                     INTEGER,
            total_view_count                INTEGER,
            country                         TEXT,
            default_language                TEXT, -- New column for channel's default language
            keywords                        TEXT, -- New column for channel keywords (JSON array string)
            topic_ids                       TEXT,
            last_content_upload_at          TEXT,
            last_updated                    TEXT,
            features_aligned_with           TEXT,
            primary_makeup_style            TEXT,
            target_skill_level              TEXT,
            demographic_focus               TEXT,
            llm_thumbnail_face_shape        TEXT,
            llm_thumbnail_skin_color        TEXT,
            llm_thumbnail_eye_shape         TEXT,
            llm_thumbnail_lip_shape         TEXT,
            llm_thumbnail_status            TEXT,
            llm_thumbnail_analysis_date     TEXT
        );
        """
        cursor.execute(create_table_sql)
        conn.commit()
        print(f"Table 'channels' created successfully in {db_path} or already exists.")
    except sqlite3.Error as e:
        print(f"Database error: {e}")
    finally:
        if conn:
            conn.close()

In [62]:
def insert_channel_data(channel_data, db_path=DB_PATH):
    """
    Inserts or updates channel data into the 'channels' table.
    """
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        columns = ', '.join(channel_data.keys())
        placeholders = ', '.join(['?' for _ in channel_data.keys()])
        update_placeholders = ', '.join([f"{key} = ?" for key in channel_data.keys() if key != 'channel_id'])

        # Using INSERT OR REPLACE to handle both inserts and updates based on channel_id
        # This will replace an existing row if channel_id matches.
        insert_sql = f"""
        INSERT OR REPLACE INTO channels ({columns}) VALUES ({placeholders})
        """
        # For simplicity, if we only need to update specific fields, a more targeted UPDATE
        # statement might be better, but INSERT OR REPLACE is fine for full record upserts.

        cursor.execute(insert_sql, tuple(channel_data.values()))
        conn.commit()
        # print(f"Channel '{channel_data.get('channel_name', 'Unknown')}' inserted/updated.")
    except sqlite3.Error as e:
        print(f"Error inserting/updating channel data: {e}")
    finally:
        if conn:
            conn.close()


In [63]:
# --- YouTube API Functions ---
def get_youtube_service():
    """Builds and returns a YouTube Data API service object."""
    return build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)

def search_youtube_channels(youtube, query, region_code='US', max_results=20):
    """Searches for YouTube channels based on a query."""
    channels = []
    try:
        search_response = youtube.search().list(
            q=query,
            type='channel',
            part='id,snippet',
            maxResults=max_results,
            regionCode=region_code
        ).execute()

        for item in search_response.get('items', []):
            channel_id = item['id']['channelId']
            channel_name = item['snippet']['title']
            channel_description = item['snippet']['description']
            published_at = item['snippet']['publishedAt']
            profile_picture_url = item['snippet']['thumbnails']['high']['url']
            country = item['snippet'].get('country') # Country might not always be present
            default_language = item['snippet'].get('defaultLanguage') # Added default language
            print("channel_id :",channel_id)
            # --- MODIFIED LOGIC HERE ---
            # Include channels where country is explicitly 'US' or if country is not specified
            # (as regionCode='US' search implies a US bias, even if country field is missing)
            if country == 'US' or country is None:
                channels.append({
                    'channel_id': channel_id,
                    'channel_name': channel_name,
                    'channel_description': channel_description,
                    'published_at': published_at,
                    'profile_picture_url': profile_picture_url,
                    'default_language': default_language,
                    'country': country, # Keep the actual country value (could be None)
                    'uploads_playlist_id': None # Placeholder, will be fetched later
                })
    except Exception as e:
        print(f"Error searching channels for query '{query}': {e}")
    return channels


In [64]:
def get_channel_statistics_and_topics(youtube, channel_ids):
    """Fetches statistics, topic details, and branding settings for given channel IDs."""
    if not channel_ids:
        return {}

    details = {}
    # YouTube API allows up to 50 IDs per request for channels.list
    id_chunks = [channel_ids[i:i + 50] for i in range(0, len(channel_ids), 50)]

    for chunk in id_chunks:
        try:
            channels_response = youtube.channels().list(
                part='statistics,topicDetails,contentDetails,brandingSettings', # Added brandingSettings
                id=','.join(chunk)
            ).execute()

            for item in channels_response.get('items', []):
                channel_id = item['id']
                stats = item.get('statistics', {})
                topic_details = item.get('topicDetails', {})
                content_details = item.get('contentDetails', {})
                branding_settings = item.get('brandingSettings', {}).get('channel', {}) # Get channel branding settings

                details[channel_id] = {
                    'subscriber_count': int(stats.get('subscriberCount', 0)),
                    'video_count': int(stats.get('videoCount', 0)),
                    'total_view_count': int(stats.get('viewCount', 0)),
                    'topic_ids': topic_details.get('topicIds', []),
                    'uploads_playlist_id': content_details.get('relatedPlaylists', {}).get('uploads'),
                    'keywords': branding_settings.get('keywords', []) # Added keywords
                }
        except Exception as e:
            print(f"Error fetching details for channel IDs {chunk}: {e}")
    return details


In [65]:
def get_last_video_upload_timestamp(youtube, uploads_playlist_id):
    """Fetches the timestamp of the most recent video in an uploads playlist."""
    if not uploads_playlist_id:
        return None
    try:
        playlist_items_response = youtube.playlistItems().list(
            part='snippet',
            playlistId=uploads_playlist_id,
            maxResults=1, # We only need the latest one
            fields='items/snippet/publishedAt'
        ).execute()

        if playlist_items_response.get('items'):
            # The API returns items in reverse chronological order by default for playlists
            # so the first item is the most recent.
            return playlist_items_response['items'][0]['snippet']['publishedAt']
    except Exception as e:
        # print(f"Error fetching last upload timestamp for playlist {uploads_playlist_id}: {e}")
        pass # Gracefully handle cases where playlist might be empty or restricted
    return None

In [66]:
# --- Gemini AI Functions ---
def get_image_as_base64(image_url):
    """Downloads an image and returns its Base64 encoded string."""
    try:
        response = requests.get(image_url, timeout=10)
        response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)

        img_data = BytesIO(response.content)
        img = Image.open(img_data)

        # Convert to RGB if not already (some images might be RGBA, P, etc.)
        if img.mode != 'RGB':
            img = img.convert('RGB')

        # Resize for faster processing and to avoid exceeding token limits if images are huge
        # Max dimension 512-1024px is usually good for LLMs
        max_dim = 512
        if max(img.size) > max_dim:
            img.thumbnail((max_dim, max_dim), Image.LANCZOS) # LANCZOS is a high-quality downsampling filter

        buffered = BytesIO()
        img.save(buffered, format="JPEG") # Use JPEG for smaller size
        import base64 # Import base64 here to avoid circular dependency with PIL
        return base64.b64encode(buffered.getvalue()).decode('utf-8')
    except requests.exceptions.RequestException as e:
        print(f"Error downloading image {image_url}: {e}")
        return None
    except Exception as e:
        print(f"Error processing image {image_url}: {e}")
        return None

In [67]:
def analyze_thumbnail_with_gemini(image_base64):
    """
    Analyzes a thumbnail image with Gemini AI to infer facial attributes.
    Returns a dictionary of inferred attributes and status.
    """
    model = genai.GenerativeModel(GEMINI_MULTIMODAL_MODEL)
    response_data = {
        "llm_thumbnail_face_shape": "unknown",
        "llm_thumbnail_skin_color": "unknown",
        "llm_thumbnail_eye_shape": "unknown",
        "llm_thumbnail_lip_shape": "unknown",
        "llm_thumbnail_status": "unknown"
    }

    if not image_base64:
        response_data["llm_thumbnail_status"] = "error_processing_image"
        return response_data


In [68]:
def analyze_thumbnail_with_gemini(image_base64):
    """
    Analyzes a thumbnail image with Gemini AI to infer facial attributes.
    Returns a dictionary of inferred attributes and status.
    """
    model = genai.GenerativeModel(GEMINI_MULTIMODAL_MODEL)
    response_data = {
        "llm_thumbnail_face_shape": "unknown",
        "llm_thumbnail_skin_color": "unknown",
        "llm_thumbnail_eye_shape": "unknown",
        "llm_thumbnail_lip_shape": "unknown",
        "llm_thumbnail_status": "unknown"
    }

    if not image_base64:
        response_data["llm_thumbnail_status"] = "error_processing_image"
        return response_data

    # Prompt with explicit instructions for image analysis and error handling
    prompt_text = f"""
    Analyze the human face in this image.
    1. Determine if there is exactly one clear, straight-facing human face visible. If not, state "skipped_multiple_faces" if multiple faces, or "skipped_no_clear_face" if no clear straight face is visible. In these skipped cases, return 'unknown' for all attribute values.
    2. If a clear, straight-facing face is visible, infer the following attributes based *only* on the image:
        - Face Shape (choose from: {', '.join(PREDEFINED_CATEGORIES['llm_thumbnail_face_shape'][:-1])})
        - Skin Color (choose from: {', '.join(PREDEFINED_CATEGORIES['llm_thumbnail_skin_color'][:-1])})
        - Eye Shape (choose from: {', '.join(PREDEFINED_CATEGORIES['llm_thumbnail_eye_shape'][:-1])})
        - Lip Shape (choose from: {', '.join(PREDEFINED_CATEGORIES['llm_thumbnail_lip_shape'][:-1])})
    3. For any attribute that cannot be confidently determined even if a clear face is present, state 'unknown'.
    4. Return the result as a JSON object with keys: 'face_shape', 'skin_color', 'eye_shape', 'lip_shape', and 'status'.
    Example: {{ "face_shape": "Oval", "skin_color": "Medium", "eye_shape": "Almond", "lip_shape": "Fuller", "status": "analyzed" }}
    Example (skipped): {{ "face_shape": "unknown", "skin_color": "unknown", "eye_shape": "unknown", "lip_shape": "unknown", "status": "skipped_multiple_faces" }}
    """

    image_parts = {
        "mimeType": "image/jpeg", # Assuming JPEG output from get_image_as_base64
        "data": image_base64
    }

    try:
        response = model.generate_content([prompt_text, image_parts])
        # print(f"Gemini raw response: {response.text}") # For debugging

        # Parse the JSON response
        if response and response.text:
            json_response = json.loads(response.text.strip())
            response_data["llm_thumbnail_face_shape"] = json_response.get("face_shape", "unknown")
            response_data["llm_thumbnail_skin_color"] = json_response.get("skin_color", "unknown")
            response_data["llm_thumbnail_eye_shape"] = json_response.get("eye_shape", "unknown")
            response_data["llm_thumbnail_lip_shape"] = json_response.get("lip_shape", "unknown")
            response_data["llm_thumbnail_status"] = json_response.get("status", "unknown")

            # Validate against predefined categories (optional, LLM should adhere with good prompt)
            if response_data["llm_thumbnail_face_shape"] not in PREDEFINED_CATEGORIES['llm_thumbnail_face_shape']:
                response_data["llm_thumbnail_face_shape"] = "unknown"
            if response_data["llm_thumbnail_skin_color"] not in PREDEFINED_CATEGORIES['llm_thumbnail_skin_color']:
                response_data["llm_thumbnail_skin_color"] = "unknown"
            if response_data["llm_thumbnail_eye_shape"] not in PREDEFINED_CATEGORIES['llm_thumbnail_eye_shape']:
                response_data["llm_thumbnail_eye_shape"] = "unknown"
            if response_data["llm_thumbnail_lip_shape"] not in PREDEFINED_CATEGORIES['llm_thumbnail_lip_shape']:
                response_data["llm_thumbnail_lip_shape"] = "unknown"
            if response_data["llm_thumbnail_status"] not in PREDEFINED_CATEGORIES['llm_thumbnail_status']:
                response_data["llm_thumbnail_status"] = "error_llm_response" # If status itself is invalid

            if response_data["llm_thumbnail_status"] == "analyzed" and \
               "unknown" in [response_data["llm_thumbnail_face_shape"],
                             response_data["llm_thumbnail_skin_color"],
                             response_data["llm_thumbnail_eye_shape"],
                             response_data["llm_thumbnail_lip_shape"]]:
                # If it said "analyzed" but returned unknown for all/most attributes,
                # it's likely not a good analysis. Revert status if necessary.
                # This is a heuristic.
                pass # Can add more sophisticated checks here if needed

        else:
            response_data["llm_thumbnail_status"] = "error_llm_response"

    except (json.JSONDecodeError, AttributeError) as e:
        print(f"Error parsing Gemini response for thumbnail: {e}. Raw text: {response.text if response else 'No response text'}")
        response_data["llm_thumbnail_status"] = "error_llm_response"
    except Exception as e:
        print(f"Gemini thumbnail analysis failed: {e}")
        response_data["llm_thumbnail_status"] = "error_llm_response"

    return response_data


In [69]:
def get_video_transcripts(video_id):
    """Fetches transcripts for a given video ID."""
    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
        # Prioritize manually created transcripts, then auto-generated English
        transcript = None
        for t in transcript_list:
            if t.is_generated == False and t.language_code == 'en':
                transcript = t
                break
        if not transcript: # If no manual English, try auto-generated English
            for t in transcript_list:
                if t.is_generated == True and t.language_code == 'en':
                    transcript = t
                    break
        if not transcript: # If still no English, take first available
            transcript = transcript_list[0]

        full_transcript = " ".join([item['text'] for item in transcript.fetch()])
        return full_transcript
    except TranscriptsDisabled:
        # print(f"Transcripts disabled for video {video_id}")
        return None
    except NoTranscriptFound:
        # print(f"No transcript found for video {video_id}")
        return None
    except Exception as e:
        # print(f"Error fetching transcript for video {video_id}: {e}")
        return None

# --- Gemini AI Function (retained from previous version) ---
def analyze_content_with_gemini(channel_description, video_transcripts_sample):
    """
    Analyzes channel content (description + sample transcripts) with Gemini AI
    to infer content-related attributes using predefined categories.
    Returns a dictionary of inferred attributes.
    """
    model = genai.GenerativeModel(GEMINI_TEXT_MODEL)
    combined_text = f"Channel Description: {channel_description}\n\nRecent Video Content:\n{video_transcripts_sample}"

    # Basic truncation if combined_text is too long for prompt (safety)
    max_text_length = 200000 # Roughly 100k tokens to be safe
    if len(combined_text) > max_text_length:
        combined_text = combined_text[:max_text_length] + " [...truncated for length...]"

    prompt_text = f"""
    Analyze the following text content from a YouTube channel (description and recent video transcripts).
    Infer the channel's primary focus based on the categories provided. Select ALL applicable tags for each category.
    If a category is not applicable or cannot be confidently determined, return an empty array for that category.

    Predefined Categories:
    - features_aligned_with (choose from: {', '.join(PREDEFINED_CATEGORIES['face_shape'] + PREDEFINED_CATEGORIES['eye_shape'] + PREDEFINED_CATEGORIES['skin_undertone'] + PREDEFINED_CATEGORIES['skin_type_concerns'] + PREDEFINED_CATEGORIES['lip_shape'] + PREDEFINED_CATEGORIES['other_features'])})
    - primary_makeup_style (choose from: {', '.join(PREDEFINED_CATEGORIES['primary_makeup_style'])})
    - target_skill_level (choose from: {', '.join(PREDEFINED_CATEGORIES['target_skill_level'])})
    - demographic_focus (choose from: {', '.join(PREDEFINED_CATEGORIES['demographic_focus'])})

    Return the result as a JSON object. Ensure all keys from the predefined categories list are present, even if their value is an empty array.
    Example: {{
      "features_aligned_with": ["round_face", "oily_skin"],
      "primary_makeup_style": ["glam"],
      "target_skill_level": ["beginner"],
      "demographic_focus": []
    }}

    Text Content:
    {combined_text}
    """

    content_attributes = {
        "features_aligned_with": [],
        "primary_makeup_style": [],
        "target_skill_level": [],
        "demographic_focus": []
    }

    try:
        response = model.generate_content(prompt_text)
        # print(f"Raw LLM Response: {response.text}") # Uncomment for raw LLM output debugging

        if response and response.text:
            # --- DEBUG FIX: Strip Markdown code block if present ---
            clean_text = response.text.strip()
            if clean_text.startswith('```json') and clean_text.endswith('```'):
                clean_text = clean_text[len('```json'):-len('```')].strip()

            json_response = json.loads(clean_text)

            # Extract and validate categories.
            for key in content_attributes.keys():
                extracted_tags = json_response.get(key, [])
                if not isinstance(extracted_tags, list):
                    extracted_tags = [] # Ensure it's a list

                valid_tags = []
                # Compile allowed tags for validation
                allowed_tags = []
                if key == "features_aligned_with":
                    allowed_tags = (PREDEFINED_CATEGORIES['face_shape'] + PREDEFINED_CATEGORIES['eye_shape'] +
                                    PREDEFINED_CATEGORIES['skin_undertone'] + PREDEFINED_CATEGORIES['skin_type_concerns'] +
                                    PREDEFINED_CATEGORIES['lip_shape'] + PREDEFINED_CATEGORIES['other_features'])
                else:
                    allowed_tags = PREDEFINED_CATEGORIES.get(key, [])

                for tag in extracted_tags:
                    if tag in allowed_tags:
                        valid_tags.append(tag)

                content_attributes[key] = valid_tags

    except (json.JSONDecodeError, AttributeError) as e:
        print(f"Error parsing Gemini response: {e}. Raw text: {response.text if response else 'No response text'}")
    except Exception as e:
        print(f"Gemini content analysis failed: {e}")

    return content_attributes

In [70]:
# --- Main Logic ---
def run_channel_ingestion():
    create_channels_table()
    youtube = get_youtube_service()

    all_channels_data = {} # Use a dictionary to avoid duplicates from different search queries
    processed_channel_ids = set() # Keep track of IDs already processed for Gemini analysis

    shuffled_queries = random.sample(SEARCH_QUERIES, len(SEARCH_QUERIES)) # Shuffle queries

    for query in shuffled_queries:
        print(f"\nSearching for channels with query: '{query}'...")
        # Get more results per query to increase chances of finding US channels
        found_channels_for_query = search_youtube_channels(youtube, query, max_results=50)
        print("found_channels_for_query", found_channels_for_query)
        # Filter for 'US' country and collect unique channel IDs for batch details
        us_channel_ids_for_batch = []
        for channel in found_channels_for_query:
            if (channel.get('country') == 'US' or channel.get('country') is None) and channel['channel_id'] not in all_channels_data:
                all_channels_data[channel['channel_id']] = {
                    'channel_id': channel['channel_id'],
                    'channel_name': channel['channel_name'],
                    'channel_url': f"https://www.youtube.com/channel/{channel['channel_id']}",
                    'channel_description': channel['channel_description'],
                    'published_at': channel['published_at'],
                    'profile_picture_url': channel['profile_picture_url'],
                    'country': channel['country'],
                    'default_language': channel['default_language'], # Initialize new field
                    'keywords': json.dumps([]), # Initialize new field, will be populated later
                    # Initialize other fields to default/None
                    'subscriber_count': 0,
                    'video_count': 0,
                    'total_view_count': 0,
                    'topic_ids': json.dumps([]),
                    'last_content_upload_at': None,
                    'last_updated': datetime.now(timezone.utc).isoformat(),
                    'features_aligned_with': json.dumps([]),
                    'primary_makeup_style': json.dumps([]),
                    'target_skill_level': json.dumps([]),
                    'demographic_focus': json.dumps([]),
                    'llm_thumbnail_face_shape': "unknown",
                    'llm_thumbnail_skin_color': "unknown",
                    'llm_thumbnail_eye_shape': "unknown",
                    'llm_thumbnail_lip_shape': "unknown",
                    'llm_thumbnail_status': "pending",
                    'llm_thumbnail_analysis_date': None
                }
                us_channel_ids_for_batch.append(channel['channel_id'])

        print(f"Found {len(us_channel_ids_for_batch)} new US channels from query '{query}'.")

        # Batch fetch statistics, topics, and uploads playlist IDs for new US channels
        if us_channel_ids_for_batch:
            print(f"Fetching statistics, topic details, and branding settings for {len(us_channel_ids_for_batch)} channels...")
            batch_details = get_channel_statistics_and_topics(youtube, us_channel_ids_for_batch)
            for channel_id, details in batch_details.items():
                if channel_id in all_channels_data:
                    all_channels_data[channel_id].update({
                        'subscriber_count': details['subscriber_count'],
                        'video_count': details['video_count'],
                        'total_view_count': details['total_view_count'],
                        'topic_ids': json.dumps(details['topic_ids']), # Store as JSON string
                        'keywords': json.dumps(details['keywords'].split(',')) if isinstance(details['keywords'], str) else json.dumps(details['keywords']) # Store as JSON array, handle comma-separated string if needed
                    })
                    # Get last content upload timestamp
                    last_upload = get_last_video_upload_timestamp(youtube, details['uploads_playlist_id'])
                    all_channels_data[channel_id]['last_content_upload_at'] = last_upload
            time.sleep(0.1) # Small delay to respect API limits

    print(f"\n--- Total unique US channels found: {len(all_channels_data)} ---")

    # Process each unique US channel with Gemini AI
    channels_to_process = list(all_channels_data.values())
    random.shuffle(channels_to_process) # Shuffle again for processing order

    for i, channel in enumerate(channels_to_process):
        channel_id = channel['channel_id']
        channel_name = channel['channel_name']

        if channel_id in processed_channel_ids:
            continue # Skip if already fully processed in a previous iteration

        print(f"\nProcessing channel {i+1}/{len(channels_to_process)}: '{channel_name}' ({channel_id})")

        # --- Gemini Thumbnail Analysis ---
        thumbnail_url = channel['profile_picture_url']
        image_base64 = get_image_as_base64(thumbnail_url)
        if image_base64:
            thumbnail_llm_attrs = analyze_thumbnail_with_gemini(image_base64)
            channel['llm_thumbnail_face_shape'] = thumbnail_llm_attrs['llm_thumbnail_face_shape']
            channel['llm_thumbnail_skin_color'] = thumbnail_llm_attrs['llm_thumbnail_skin_color']
            channel['llm_thumbnail_eye_shape'] = thumbnail_llm_attrs['llm_thumbnail_eye_shape']
            channel['llm_thumbnail_lip_shape'] = thumbnail_llm_attrs['llm_thumbnail_lip_shape']
            channel['llm_thumbnail_status'] = thumbnail_llm_attrs['llm_thumbnail_status']
            channel['llm_thumbnail_analysis_date'] = datetime.now(timezone.utc).isoformat()
            time.sleep(1) # Small delay for Gemini Vision API

        # --- Gemini Content (Transcript) Analysis ---
        # Get some recent video IDs for transcript analysis
        video_ids_for_transcripts = []
        try:
            # Fetch most recent videos for this channel's uploads playlist
            # Assuming all_channels_data[channel_id]['uploads_playlist_id'] was populated earlier
            uploads_playlist_id = all_channels_data[channel_id].get('uploads_playlist_id')
            if uploads_playlist_id:
                playlist_items_response = youtube.playlistItems().list(
                    part='snippet',
                    playlistId=uploads_playlist_id,
                    maxResults=5, # Get up to 5 most recent videos for content analysis
                    fields='items/snippet/resourceId/videoId'
                ).execute()
                video_ids_for_transcripts = [
                    item['snippet']['resourceId']['videoId']
                    for item in playlist_items_response.get('items', [])
                ]
        except Exception as e:
            print(f"Could not fetch recent video IDs for channel {channel_id}: {e}")

        transcripts_sample_text = []
        for vid_id in video_ids_for_transcripts:
            transcript = get_video_transcripts(vid_id)
            if transcript:
                transcripts_sample_text.append(transcript)
            time.sleep(0.5) # Delay between transcript fetches

        combined_content_for_llm = channel['channel_description'] + "\n\n" + "\n\n".join(transcripts_sample_text)

        if combined_content_for_llm.strip(): # Only analyze if there's actual content
            content_llm_attrs = analyze_content_with_gemini(
                channel['channel_description'], "\n\n".join(transcripts_sample_text)
            )
            channel['features_aligned_with'] = json.dumps(content_llm_attrs['features_aligned_with'])
            channel['primary_makeup_style'] = json.dumps(content_llm_attrs['primary_makeup_style'])
            channel['target_skill_level'] = json.dumps(content_llm_attrs['target_skill_level'])
            channel['demographic_focus'] = json.dumps(content_llm_attrs['demographic_focus'])
            time.sleep(1) # Small delay for Gemini Text API
        else:
            print(f"No sufficient content (description/transcripts) for LLM analysis for channel {channel_id}.")

        # Update last_updated timestamp before inserting/updating
        channel['last_updated'] = datetime.now(timezone.utc).isoformat()

        # Insert/update into DB
        insert_channel_data(channel)
        processed_channel_ids.add(channel_id) # Mark as fully processed

        time.sleep(0.2) # Small delay between channel processing for overall API limits

    print("\n--- Channel data ingestion complete! ---")



In [71]:
if __name__ == "__main__":
    run_channel_ingestion()

Table 'channels' created successfully in beauty_blueprint.db or already exists.

Searching for channels with query: 'makeup artist'...
channel_id : UCDLSt_KFcxWP-hsqAMuh0vg
channel_id : UCz1HJ03TVULAEgYm8UgSHKg
channel_id : UCru67MQuuq-hbbidUXT6CYA
channel_id : UC_WmI554ltRf2hwzTqhWy4Q
channel_id : UColpMuiLe9_vtmZPHFK-WvQ
channel_id : UCQhBchX4GuCRr9OUc-arXVw
channel_id : UCEcT5Ttsnl3kBMqyHmojobQ
channel_id : UCCydBRF6RjrT58FV6KO-wwg
channel_id : UCcFoijpjsNhVMQS33X5_kJA
channel_id : UCHZBvhmGhnpJF4Q3YZGPz8g
channel_id : UCp-hC1SfCbGjzbdY7A9ptFA
channel_id : UCwUG82Z94P_kuiq0e5pSxBg
channel_id : UCh57xkB9URnPOOCZifWG0Lw
channel_id : UCFjCz-eA1FsQG77i9tJ--vg
channel_id : UCRi278WPXTFmmcf8taOg_vA
channel_id : UCwLk1tcZV1UGWOcwup7Wn7g
channel_id : UCs2dbihiQTm6gJHBo-fE2Xw
channel_id : UCHQO9xPSROEHYHUk_T82l6w
channel_id : UCe39X6QuOsZl8irjfe_1c8A
channel_id : UCMXav6XYQsteXkeblvLKMVw
channel_id : UCaUqvfjmwcGepG9nCOcS9zw
channel_id : UC2GUcyD6KYjmU_ofQtPTSSA
channel_id : UCmcN9KeZhK2TmeR

KeyboardInterrupt: 

In [73]:
import numpy as np
process_1 = np.random.random(size=10**7)
process_2 = np.random.random(size=10**7)
overlap_percentage = np.mean(np.abs(process_1 - process_2) <= 0.20)
annual_cost = overlap_percentage * 365 * 1000

In [74]:
annual_cost

131406.97149999999