<a href="https://colab.research.google.com/github/VintiShukla/YoutubeCreatorAnalytics/blob/main/content_creator_analytics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pandas numpy scikit-learn requests flask joblib beautifulsoup4



In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
import json
import time
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score
import joblib

In [None]:
class YouTubeDataCollector:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://www.googleapis.com/youtube/v3"
        self.quota_used = 0

    def get_trending_videos(self, region_code='US', max_results=50):
        """Get trending videos - perfect starting dataset"""
        url = f"{self.base_url}/videos"
        params = {
            'part': 'statistics,snippet,contentDetails',
            'chart': 'mostPopular',
            'regionCode': region_code,
            'maxResults': max_results,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        self.quota_used += 1  # Track quota usage
        return response.json()

    def get_channel_videos(self, channel_id, max_results=50):
        """Get videos from specific channel"""
        # First get uploads playlist
        url = f"{self.base_url}/channels"
        params = {
            'part': 'contentDetails',
            'id': channel_id,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        uploads_playlist = response.json()['items'][0]['contentDetails']['relatedPlaylists']['uploads']

        # Get videos from playlist
        url = f"{self.base_url}/playlistItems"
        params = {
            'part': 'snippet',
            'playlistId': uploads_playlist,
            'maxResults': max_results,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        self.quota_used += 2
        return response.json()

    def get_video_details(self, video_ids):
        """Get detailed stats for multiple videos"""
        url = f"{self.base_url}/videos"
        params = {
            'part': 'statistics,snippet,contentDetails',
            'id': ','.join(video_ids),  # Can get up to 50 videos at once
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        self.quota_used += 1
        return response.json()

In [None]:
def start_data_collection():
    """First thing to do after getting API key"""

    # Replace with your actual API key
    API_KEY = "YOUR_API_KEY_HERE"
    collector = YouTubeDataCollector(API_KEY)

    print("üöÄ Starting data collection...")

    # Test API connection
    trending = collector.get_trending_videos(max_results=5)
    if 'items' in trending:
        print("‚úÖ API working! First video:", trending['items'][0]['snippet']['title'])
    else:
        print("‚ùå API error:", trending)
        return

    # Collect initial dataset
    print("üìä Collecting trending videos...")
    all_videos = []

    # Get trending from different regions
    regions = ['US', 'GB', 'CA', 'AU', 'IN']
    for region in regions:
        videos = collector.get_trending_videos(region_code=region, max_results=20)
        if 'items' in videos:
            all_videos.extend(videos['items'])
        time.sleep(1)  # Be respectful to API

    print(f"üìà Collected {len(all_videos)} videos")
    print(f"üìä Quota used: {collector.quota_used}/10000")

    return all_videos

In [None]:
class FeatureEngineer:
    def __init__(self):
        self.features = []

    def extract_features(self, video_data):
        """Convert raw YouTube data into ML features"""

        features_list = []

        for video in video_data:
            snippet = video['snippet']
            stats = video['statistics']
            content = video['contentDetails']

            # Content Features
            title_length = len(snippet['title'])
            description_length = len(snippet.get('description', ''))
            tags_count = len(snippet.get('tags', []))

            # Timing Features
            publish_time = pd.to_datetime(snippet['publishedAt'])
            hour = publish_time.hour
            day_of_week = publish_time.weekday()

            # Duration parsing
            duration = content['duration']  # Format: PT4M13S
            duration_seconds = self.parse_duration(duration)

            # Engagement Metrics (targets)
            views = int(stats.get('viewCount', 0))
            likes = int(stats.get('likeCount', 0))
            comments = int(stats.get('commentCount', 0))

            # Calculate engagement rate
            engagement_rate = (likes + comments) / max(views, 1) * 100

            # Feature dictionary
            features = {
                # Input features
                'title_length': title_length,
                'description_length': description_length,
                'tags_count': tags_count,
                'duration_seconds': duration_seconds,
                'publish_hour': hour,
                'publish_day': day_of_week,
                'has_thumbnail': 'maxres' in snippet.get('thumbnails', {}),

                # Target variables
                'views': views,
                'likes': likes,
                'comments': comments,
                'engagement_rate': engagement_rate,

                # Metadata
                'video_id': video['id'],
                'channel_id': snippet['channelId'],
                'title': snippet['title']
            }

            features_list.append(features)

        return pd.DataFrame(features_list)

    def parse_duration(self, duration_str):
        """Convert PT4M13S to seconds"""
        import re

        match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration_str)
        if not match:
            return 0

        hours = int(match.group(1) or 0)
        minutes = int(match.group(2) or 0)
        seconds = int(match.group(3) or 0)

        return hours * 3600 + minutes * 60 + seconds

In [None]:
class EngagementPredictor:
    def __init__(self):
        self.model = None
        self.feature_columns = None

    def prepare_data(self, df):
        """Prepare data for training"""

        # Remove outliers (videos with extremely high views)
        df = df[df['views'] < df['views'].quantile(0.99)]

        # Feature selection
        feature_cols = [
            'title_length', 'description_length', 'tags_count',
            'duration_seconds', 'publish_hour', 'publish_day', 'has_thumbnail'
        ]

        # Target variable
        target = 'engagement_rate'

        X = df[feature_cols]
        y = df[target]

        # Handle missing values
        X = X.fillna(0)

        self.feature_columns = feature_cols
        return X, y

    def train_model(self, X, y):
        """Train engagement prediction model"""

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Train model
        self.model = RandomForestRegressor(n_estimators=100, random_state=42)
        self.model.fit(X_train, y_train)

        # Evaluate
        y_pred = self.model.predict(X_test)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        print(f"üìä Model Performance:")
        print(f"   MAE: {mae:.4f}")
        print(f"   R¬≤ Score: {r2:.4f}")

        # Feature importance
        importance_df = pd.DataFrame({
            'feature': self.feature_columns,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)

        print(f"üéØ Top Features:")
        print(importance_df.head())

        return {'mae': mae, 'r2': r2, 'feature_importance': importance_df}

    def predict_engagement(self, video_features):
        """Predict engagement for new video"""
        if self.model is None:
            raise ValueError("Model not trained yet!")

        prediction = self.model.predict([video_features])
        return prediction[0]

    def save_model(self, filepath):
        """Save trained model"""
        joblib.dump({
            'model': self.model,
            'feature_columns': self.feature_columns
        }, filepath)
        print(f"üíæ Model saved to {filepath}")
