In [None]:
# 1. Create Conda Environment
echo "Creating conda environment..."
conda create -n youtube-analyzer python=3.11 -y
conda activate youtube-analyzer


In [None]:
# 2. Create Project Directory Structure
echo "Setting up project structure..."
mkdir -p youtube-demo-mcp-server/{models,server,services,tools,deployment/firebase,output,old_scripts}
cd youtube-demo-mcp-server

In [None]:
# 3. Initialize Python Package Structure
touch models/__init__.py
touch server/__init__.py
touch services/__init__.py
touch tools/__init__.py

In [None]:
# 4. Install Core Dependencies
echo "Installing dependencies..."
pip install mcp>=0.9.0
pip install google-api-python-client
pip install pandas numpy
pip install nltk textblob
pip install matplotlib seaborn plotly
pip install anthropic
pip install fastapi uvicorn
pip install firebase-admin
pip install python-dotenv
pip install pydantic
pip install aiofiles
*pip install jupyter notebook jupyterlab ipywidgets


In [None]:
# 5. Download NLTK Data
echo "Downloading NLTK data..."
python -c "import nltk; nltk.download('punkt')"
python -c "import nltk; nltk.download('vader_lexicon')"
python -c "import nltk; nltk.download('stopwords')"


In [None]:
# for example we will be using it in one of our tools:  market analysis tool
from textblob import TextBlob
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize

# Analyze video title sentiment
analyzer = SentimentIntensityAnalyzer()  # Uses vader_lexicon
sentiment = analyzer.polarity_scores("Amazing Python Tutorial!")

# Split description into sentences
sentences = sent_tokenize(description)  # Uses punkt

# Filter out common words
stop_words = set(stopwords.words('english'))  # Uses stopwords
filtered_words = [word for word in words if word not in stop_words]

In [None]:
# 6. Create Requirements File
cat > requirements.txt << 'EOF'
mcp>=0.9.0
google-api-python-client==2.108.0
pandas==2.1.3
numpy==1.24.3
nltk==3.8.1
textblob==0.17.1
matplotlib==3.7.2
seaborn==0.12.2
plotly==5.17.0
anthropic==0.7.7
fastapi==0.104.1
uvicorn==0.24.0
firebase-admin==6.2.0
python-dotenv==1.0.0
pydantic==2.5.0
aiofiles==23.2.1
EOF

In [None]:
# config.py - System Configuration
# Configuration & API Setup

import os
from dotenv import load_dotenv
from pathlib import Path

# Load environment variables
load_dotenv()

class Config:
    """System-wide configuration settings"""
    
    # YouTube API Configuration
    YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
    YOUTUBE_API_SERVICE_NAME = 'youtube'
    YOUTUBE_API_VERSION = 'v3'
    
    # Firebase Configuration
    FIREBASE_PROJECT_ID = os.getenv('FIREBASE_PROJECT_ID')
    FIREBASE_CREDENTIALS_PATH = os.getenv('FIREBASE_CREDENTIALS_PATH', 'deployment/firebase/credentials.json')
    
    # Claude AI Configuration
    ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
    CLAUDE_MODEL = 'claude-3-sonnet-20240229'
    
    # Analysis Parameters
    MAX_VIDEOS_PER_SEARCH = 50
    DEFAULT_REGION_CODE = 'US'
    DEFAULT_LANGUAGE = 'en'
    
    # Scoring Weights
    SENTIMENT_WEIGHT = 0.3
    ENGAGEMENT_WEIGHT = 0.4
    CREDIBILITY_WEIGHT = 0.3
    
    # Directory Configuration
    PROJECT_ROOT = Path(__file__).parent
    OUTPUT_DIR = PROJECT_ROOT / 'output'
    MODELS_DIR = PROJECT_ROOT / 'models'
    
    # MCP Server Configuration
    MCP_SERVER_NAME = 'youtube-intelligence'
    MCP_SERVER_VERSION = '1.0.0'
    MCP_SERVER_PORT = 8000
    
    # Rate Limiting
    API_RATE_LIMIT = 100  # requests per minute
    
    @classmethod
    def validate_config(cls):
        """Validate that required configuration is present"""
        required_vars = [
            'YOUTUBE_API_KEY',
            'FIREBASE_PROJECT_ID',
            'ANTHROPIC_API_KEY'
        ]
        
        missing_vars = []
        for var in required_vars:
            if not getattr(cls, var):
                missing_vars.append(var)
        
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
        
        return True

# .env template file
ENV_TEMPLATE = """
# YouTube Intelligence MCP Server - Environment Configuration
# Episode 3: Configuration & API Setup

# YouTube Data API
YOUTUBE_API_KEY=your_youtube_api_key_here

# Firebase Configuration
FIREBASE_PROJECT_ID=your_firebase_project_id
FIREBASE_CREDENTIALS_PATH=deployment/firebase/credentials.json

# Claude AI API
ANTHROPIC_API_KEY=your_anthropic_api_key_here

# Optional: Custom Settings
MAX_VIDEOS_PER_SEARCH=50
DEFAULT_REGION_CODE=US
DEFAULT_LANGUAGE=en
"""

if __name__ == "__main__":
    # Create .env file if it doesn't exist
    if not os.path.exists('.env'):
        with open('.env', 'w') as f:
            f.write(ENV_TEMPLATE)
        print("Created .env template file. Please fill in your API keys.")
    else:
        print("Configuration file already exists.")
    
    # Validate configuration
    try:
        Config.validate_config()
        print("Configuration validated successfully!")
    except ValueError as e:
        print(f"Configuration error: {e}")
        print("Please check your .env file and add missing API keys.")

In [None]:
# models/youtube_models.py - YouTube Data Structures
# Data Models & Architecture
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum


class VideoCategory(str, Enum):
    """YouTube video categories"""
    EDUCATION = "education"
    ENTERTAINMENT = "entertainment"
    TECHNOLOGY = "technology"
    GAMING = "gaming"
    MUSIC = "music"
    NEWS = "news"
    SPORTS = "sports"
    OTHER = "other"


class VideoStats(BaseModel):
    """Video statistics and metrics"""
    view_count: int = 0
    like_count: int = 0
    comment_count: int = 0
    duration_seconds: int = 0

    @property
    def engagement_rate(self) -> float:
        """Calculate engagement rate (likes + comments) / views"""
        if self.view_count == 0:
            return 0.0
        return (self.like_count + self.comment_count) / self.view_count


class VideoData(BaseModel):
    """Complete video data structure"""
    video_id: str
    title: str
    description: str
    channel_id: str
    channel_title: str
    published_at: datetime
    duration: str
    category: VideoCategory = VideoCategory.OTHER
    stats: VideoStats
    tags: List[str] = []
    thumbnail_url: Optional[str] = None

    # Analysis fields
    sentiment_score: Optional[float] = None
    credibility_score: Optional[float] = None
    trending_score: Optional[float] = None

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }


class ChannelData(BaseModel):
    """YouTube channel information"""
    channel_id: str
    title: str
    description: str
    subscriber_count: int = 0
    video_count: int = 0
    view_count: int = 0
    created_at: Optional[datetime] = None
    thumbnail_url: Optional[str] = None

    # Analysis fields
    authority_score: Optional[float] = None
    consistency_score: Optional[float] = None


class SearchQuery(BaseModel):
    """YouTube search query parameters"""
    query: str
    max_results: int = 25
    region_code: str = "US"
    language: str = "en"
    published_after: Optional[datetime] = None
    published_before: Optional[datetime] = None
    order: str = "relevance"  # relevance, date, rating, viewCount


class SearchResult(BaseModel):
    """YouTube search results"""
    query: SearchQuery
    videos: List[VideoData]
    total_results: int
    search_time: datetime = Field(default_factory=datetime.now)

    @property
    def average_engagement(self) -> float:
        """Calculate average engagement rate across all videos"""
        if not self.videos:
            return 0.0
        return sum(video.stats.engagement_rate for video in self.videos) / len(self.videos)


class MarketTrend(BaseModel):
    """Market trend analysis data"""
    keyword: str
    trend_score: float
    growth_rate: float
    competition_level: str  # low, medium, high
    related_topics: List[str] = []
    analysis_date: datetime = Field(default_factory=datetime.now)


class MarketAnalysis(BaseModel):
    """Complete market analysis results"""
    query: str
    trends: List[MarketTrend]
    top_performers: List[VideoData]
    market_insights: Dict[str, Any] = {}
    analysis_summary: str = ""
    created_at: datetime = Field(default_factory=datetime.now)


# Example usage and validation
if __name__ == "__main__":
    # Create sample video data
    sample_video = VideoData(
        video_id="sample123",
        title="How to Build YouTube Intelligence AI",
        description="A comprehensive tutorial on building AI systems",
        channel_id="channel123",
        channel_title="Tech Tutorials",
        published_at=datetime.now(),
        duration="PT15M30S",
        category=VideoCategory.EDUCATION,
        stats=VideoStats(
            view_count=10000,
            like_count=500,
            comment_count=100,
            duration_seconds=930
        ),
        tags=["AI", "YouTube", "Tutorial", "Programming"]
    )

    print("Sample Video Data:")
    print(f"Title: {sample_video.title}")
    print(f"Engagement Rate: {sample_video.stats.engagement_rate:.2%}")
    print(f"JSON: {sample_video.json(indent=2)}")

    # Validate model
    print("\nVideo data model validation successful!")


In [None]:
#video 4 - mcp inspector

In [None]:
# server/__main__.py - MCP Inspector Entry Point

from mcp_server import main
import sys
import asyncio
from pathlib import Path

# this adds a project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))


if __name__ == "__main__":
    asyncio.run(main())


In [None]:
# mcp_server.py - Main MCP Server Implementation
# MCP Server Implementation

import asyncio
import logging
from typing import Any, Dict, List, Optional

# Correct imports for MCP Server
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp import ServerCapabilities, ToolsCapability
from mcp.types import (
    CallToolResult,
    ListToolsResult,
    TextContent,
    Tool,
)

from config import Config
from tools.video_search_tool import VideoSearchTool
from tools.market_analysis_tool import MarketAnalysisTool
from tools.system_status_tool import SystemStatusTool

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Create the MCP server
server = Server(Config.MCP_SERVER_NAME, Config.MCP_SERVER_VERSION)

# Initialize tools
youtube_tools = {
    'search_videos': VideoSearchTool(),
    'analyze_market': MarketAnalysisTool(),
    'system_status': SystemStatusTool()
}

logger.info(f"Initialized {len(youtube_tools)} MCP tools")


@server.list_tools()
async def list_tools() -> List[Tool]:
    """List available MCP tools"""
    try:
        tools_list = []
        for tool_name, tool_instance in youtube_tools.items():
            mcp_tool = Tool(
                name=tool_instance.name,
                description=tool_instance.description,
                inputSchema=tool_instance.inputSchema
            )
            tools_list.append(mcp_tool)

        logger.info(f"Listed {len(tools_list)} available tools")
        return tools_list

    except Exception as e:
        logger.error(f"Error listing tools: {e}")
        return []


@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
    """Execute MCP tool calls"""
    try:
        logger.info(f"Tool call: {name} with arguments: {arguments}")

        # Find and execute tool
        if name not in youtube_tools:
            error_msg = f"Tool '{name}' not found. Available tools: {list(youtube_tools.keys())}"
            logger.error(error_msg)
            return [TextContent(
                type="text",
                text=f"Error: {error_msg}"
            )]

        # Execute tool
        tool_instance = youtube_tools[name]
        result = await tool_instance.call(arguments)

        # Format response
        if result.get('success', False):
            response_text = format_tool_response(name, result)
        else:
            response_text = f"Tool execution failed: {result.get('message', 'Unknown error')}"

        return [TextContent(type="text", text=response_text)]

    except Exception as e:
        error_msg = f"Error executing tool '{name}': {str(e)}"
        logger.error(error_msg)
        return [TextContent(type="text", text=error_msg)]


def format_tool_response(tool_name: str, result: Dict[str, Any]) -> str:
    """Format tool response for display"""
    try:
        if tool_name == 'search_videos':
            return format_video_search_response(result)
        elif tool_name == 'analyze_market':
            return format_market_analysis_response(result)
        elif tool_name == 'system_status':
            return format_system_status_response(result)
        else:
            return f"Results from {tool_name}:\n{str(result)}"

    except Exception as e:
        logger.error(f"Error formatting response: {e}")
        return f"Tool executed successfully but response formatting failed: {str(result)}"


def format_video_search_response(result: Dict[str, Any]) -> str:
    """Format video search results"""
    response = []
    response.append(f"🔍 **YouTube Video Search Results**")
    response.append(f"Query: {result.get('query', 'Unknown')}")
    response.append(f"Found: {result.get('videos_found', 0)} videos")
    response.append(
        f"Average Engagement: {result.get('average_engagement', 'N/A')}")
    response.append("")

    videos = result.get('videos', [])
    for i, video in enumerate(videos[:10], 1):  # Show top 10
        response.append(f"{i}. **{video['title']}**")
        response.append(f"   Channel: {video['channel']}")
        response.append(
            f"   Views: {video['views']:,} | Likes: {video['likes']:,} | Comments: {video['comments']:,}")
        response.append(
            f"   Engagement: {video['engagement_rate']} | Published: {video['published']}")
        response.append(
            f"   Duration: {video['duration']} | Category: {video['category']}")
        response.append(f"   URL: {video['url']}")
        response.append("")

    return "\n".join(response)


def format_market_analysis_response(result: Dict[str, Any]) -> str:
    """Format market analysis results"""
    response = []
    response.append(f"📊 **Market Analysis Results**")
    response.append(f"Topic: {result.get('topic', 'Unknown')}")
    response.append(f"Videos Analyzed: {result.get('videos_analyzed', 0)}")
    response.append(f"Timeframe: {result.get('timeframe_days', 0)} days")
    response.append("")

    analysis = result.get('analysis', {})

    # Market Overview
    overview = analysis.get('market_overview', {})
    response.append("**📈 Market Overview**")
    response.append(f"• Total Views: {overview.get('total_views', 0):,}")
    response.append(f"• Average Views: {overview.get('average_views', 0):,}")
    response.append(
        f"• Average Engagement: {overview.get('average_engagement_rate', 'N/A')}")
    response.append(f"• Unique Creators: {overview.get('unique_creators', 0)}")
    response.append(
        f"• Competition Level: {overview.get('competition_level', 'Unknown')}")
    response.append(
        f"• Market Sentiment: {overview.get('market_sentiment', 'N/A')}")
    response.append("")

    # Top Videos
    top_videos = analysis.get('top_performing_videos', [])
    if top_videos:
        response.append("**🏆 Top Performing Videos**")
        for i, video in enumerate(top_videos[:5], 1):
            response.append(
                f"{i}. {video['title']} - {video['views']:,} views ({video['channel']})")
        response.append("")

    # Top Channels
    top_channels = analysis.get('top_channels', [])
    if top_channels:
        response.append("**🌟 Top Channels**")
        for i, channel in enumerate(top_channels[:5], 1):
            response.append(
                f"{i}. {channel['channel']} - {channel['total_views']:,} total views ({channel['video_count']} videos)")
        response.append("")

    # Insights
    insights = analysis.get('insights', [])
    if insights:
        response.append("**💡 Key Insights**")
        for insight in insights:
            response.append(f"• {insight}")

    return "\n".join(response)


def format_system_status_response(result: Dict[str, Any]) -> str:
    """Format system status results"""
    response = []
    status = result.get('status', {})

    response.append(
        f"⚡ **{status.get('server_name', 'YouTube Intelligence')} System Status**")
    response.append(f"Version: {status.get('version', 'Unknown')}")
    response.append(f"Status: {status.get('status', 'Unknown')}")
    response.append(f"Timestamp: {status.get('timestamp', 'Unknown')}")
    response.append("")

    # Configuration
    config = status.get('configuration', {})
    response.append("**🔧 Configuration**")
    response.append(
        f"• YouTube API: {'✅ Configured' if config.get('youtube_api_configured') else '❌ Not Configured'}")
    response.append(
        f"• Firebase: {'✅ Configured' if config.get('firebase_configured') else '❌ Not Configured'}")
    response.append(
        f"• Claude API: {'✅ Configured' if config.get('claude_api_configured') else '❌ Not Configured'}")
    response.append(
        f"• Max Videos Per Search: {config.get('max_videos_per_search', 'Unknown')}")
    response.append("")

    # System Metrics (if available)
    metrics = status.get('system_metrics', {})
    if metrics and 'error' not in metrics:
        response.append("**📊 System Metrics**")
        response.append(
            f"• CPU Usage: {metrics.get('cpu_percent', 'Unknown')}%")
        response.append(
            f"• Memory Usage: {metrics.get('memory_percent', 'Unknown')}%")
        response.append(
            f"• Disk Usage: {metrics.get('disk_usage_percent', 'Unknown')}%")

    return "\n".join(response)


async def main():
    """Main entry point for the MCP server"""
    try:
        logger.info("Starting YouTube Intelligence MCP Server")

        # Validate configuration
        Config.validate_config()
        logger.info("Configuration validated successfully")

        # Create server capabilities
        capabilities = ServerCapabilities(tools=ToolsCapability())

        # Create initialization options
        init_options = InitializationOptions(
            server_name=Config.MCP_SERVER_NAME,
            server_version=Config.MCP_SERVER_VERSION,
            capabilities=capabilities
        )

        # Run stdio server
        async with stdio_server() as (read_stream, write_stream):
            await server.run(read_stream, write_stream, init_options)

    except KeyboardInterrupt:
        logger.info("Server shutdown requested")
    except Exception as e:
        logger.error(f"Server failed to start: {e}")
        raise


if __name__ == "__main__":
    asyncio.run(main())


In [None]:
# tools/system_status_tool.py - System Status Tool

import logging
import os
import psutil
from typing import Any, Dict
from datetime import datetime
from mcp import Tool

from config import Config

logger = logging.getLogger(__name__)


class SystemStatusTool(Tool):
    """MCP tool for system status and health monitoring"""

    def __init__(self):
        super().__init__(
            name="system_status",
            description="Get YouTube Intelligence MCP Server system status and health metrics",
            inputSchema={
                "type": "object",
                "properties": {
                    "include_detailed": {
                        "type": "boolean",
                        "description": "Include detailed system metrics",
                        "default": False
                    }
                }
            }
        )

    async def call(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Get system status"""
        try:
            include_detailed = arguments.get("include_detailed", False)

            # Basic system info
            status = {
                "server_name": Config.MCP_SERVER_NAME,
                "version": Config.MCP_SERVER_VERSION,
                "timestamp": datetime.now().isoformat(),
                "status": "healthy",
                "uptime_seconds": self._get_uptime(),
                "configuration": {
                    "youtube_api_configured": bool(Config.YOUTUBE_API_KEY),
                    "firebase_configured": bool(Config.FIREBASE_PROJECT_ID),
                    "claude_api_configured": bool(Config.ANTHROPIC_API_KEY),
                    "output_directory": str(Config.OUTPUT_DIR),
                    "max_videos_per_search": Config.MAX_VIDEOS_PER_SEARCH
                }
            }

            # Add detailed metrics if requested
            if include_detailed:
                status["system_metrics"] = self._get_system_metrics()
                status["api_status"] = self._check_api_status()

            return {
                "success": True,
                "status": status
            }

        except Exception as e:
            logger.error(f"System status error: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to get system status"
            }

    def _get_uptime(self) -> float:
        """Get system uptime in seconds"""
        try:
            return psutil.boot_time()
        except:
            return 0.0

    def _get_system_metrics(self) -> Dict[str, Any]:
        """Get detailed system metrics"""
        try:
            return {
                "cpu_percent": psutil.cpu_percent(interval=1),
                "memory_percent": psutil.virtual_memory().percent,
                "disk_usage_percent": psutil.disk_usage('/').percent,
                "process_count": len(psutil.pids()),
                "network_connections": len(psutil.net_connections())
            }
        except Exception as e:
            logger.error(f"Error getting system metrics: {e}")
            return {"error": "Unable to retrieve system metrics"}

    def _check_api_status(self) -> Dict[str, str]:
        """Check API configuration status"""
        return {
            "youtube_api": "configured" if Config.YOUTUBE_API_KEY else "not_configured",
            "firebase": "configured" if Config.FIREBASE_PROJECT_ID else "not_configured",
            "claude_api": "configured" if Config.ANTHROPIC_API_KEY else "not_configured"
        }


In [None]:
DANGEROUSLY_OMIT_AUTH=true npx @modelcontextprotocol/inspector python mcp_server.py

In [None]:
video #5

In [None]:
# services/youtube_service.py - YouTube API Integration
# YouTube Service Integration

import logging
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import isodate

from config import Config
from models.youtube_models import VideoData, ChannelData, SearchQuery, SearchResult, VideoStats, VideoCategory

logger = logging.getLogger(__name__)


class YouTubeService:
    """YouTube Data API service for video and channel operations"""

    def __init__(self, api_key: str = None):
        self.api_key = api_key or Config.YOUTUBE_API_KEY
        if not self.api_key:
            raise ValueError("YouTube API key is required")

        self.youtube = build(
            Config.YOUTUBE_API_SERVICE_NAME,
            Config.YOUTUBE_API_VERSION,
            developerKey=self.api_key
        )

        logger.info("YouTube service initialized successfully")

    def search_videos(self, query: SearchQuery) -> SearchResult:
        """Search for videos using YouTube Data API"""
        try:
            logger.info(f"Searching videos for query: {query.query}")

            # Build search parameters
            search_params = {
                'part': 'snippet',
                'q': query.query,
                'type': 'video',
                'maxResults': min(query.max_results, 50),  # API limit
                'regionCode': query.region_code,
                'relevanceLanguage': query.language,
                'order': query.order
            }

            # Add date filters if specified
            if query.published_after:
                search_params['publishedAfter'] = query.published_after.isoformat()
            if query.published_before:
                search_params['publishedBefore'] = query.published_before.isoformat()

            # Execute search
            search_response = self.youtube.search().list(**search_params).execute()

            # Extract video IDs for detailed information
            video_ids = [item['id']['videoId']
                         for item in search_response['items']]

            # Get detailed video information
            videos = self._get_video_details(video_ids)

            # Create search result
            result = SearchResult(
                query=query,
                videos=videos,
                total_results=search_response.get(
                    'pageInfo', {}).get('totalResults', 0)
            )

            logger.info(f"Found {len(videos)} videos for query: {query.query}")
            return result

        except HttpError as e:
            logger.error(f"YouTube API error: {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error in video search: {e}")
            raise

    def _get_video_details(self, video_ids: List[str]) -> List[VideoData]:
        """Get detailed information for specific video IDs"""
        if not video_ids:
            return []

        try:
            # YouTube API allows max 50 IDs per request
            videos = []
            for i in range(0, len(video_ids), 50):
                batch_ids = video_ids[i:i+50]
                batch_videos = self._fetch_video_batch(batch_ids)
                videos.extend(batch_videos)

            return videos

        except Exception as e:
            logger.error(f"Error getting video details: {e}")
            return []

    def _fetch_video_batch(self, video_ids: List[str]) -> List[VideoData]:
        """Fetch a batch of video details"""
        try:
            video_response = self.youtube.videos().list(
                part='snippet,statistics,contentDetails',
                id=','.join(video_ids)
            ).execute()

            videos = []
            for item in video_response['items']:
                video = self._parse_video_item(item)
                if video:
                    videos.append(video)

            return videos

        except Exception as e:
            logger.error(f"Error fetching video batch: {e}")
            return []

    def _parse_video_item(self, item: Dict[str, Any]) -> Optional[VideoData]:
        """Parse YouTube API video item into VideoData model"""
        try:
            snippet = item['snippet']
            statistics = item.get('statistics', {})
            content_details = item.get('contentDetails', {})

            # Parse duration
            duration_str = content_details.get('duration', 'PT0S')
            duration_seconds = int(isodate.parse_duration(
                duration_str).total_seconds())

            # Create video stats
            stats = VideoStats(
                view_count=int(statistics.get('viewCount', 0)),
                like_count=int(statistics.get('likeCount', 0)),
                comment_count=int(statistics.get('commentCount', 0)),
                duration_seconds=duration_seconds
            )

            # Determine category
            category = self._categorize_video(snippet.get(
                'title', ''), snippet.get('description', ''))

            # Parse published date
            published_at = datetime.fromisoformat(
                snippet['publishedAt'].replace('Z', '+00:00'))

            # Create video data
            video = VideoData(
                video_id=item['id'],
                title=snippet['title'],
                description=snippet.get('description', ''),
                channel_id=snippet['channelId'],
                channel_title=snippet['channelTitle'],
                published_at=published_at,
                duration=duration_str,
                category=category,
                stats=stats,
                tags=snippet.get('tags', []),
                thumbnail_url=snippet.get(
                    'thumbnails', {}).get('high', {}).get('url')
            )

            return video

        except Exception as e:
            logger.error(f"Error parsing video item: {e}")
            return None

    def _categorize_video(self, title: str, description: str) -> VideoCategory:
        """Categorize video based on title and description"""
        content = (title + ' ' + description).lower()

        # Simple keyword-based categorization
        if any(word in content for word in ['tutorial', 'learn', 'education', 'course']):
            return VideoCategory.EDUCATION
        elif any(word in content for word in ['tech', 'programming', 'software', 'ai']):
            return VideoCategory.TECHNOLOGY
        elif any(word in content for word in ['game', 'gaming', 'play']):
            return VideoCategory.GAMING
        elif any(word in content for word in ['music', 'song', 'album']):
            return VideoCategory.MUSIC
        elif any(word in content for word in ['news', 'breaking', 'report']):
            return VideoCategory.NEWS
        elif any(word in content for word in ['sport', 'football', 'basketball']):
            return VideoCategory.SPORTS
        else:
            return VideoCategory.OTHER

    def get_channel_info(self, channel_id: str) -> Optional[ChannelData]:
        """Get detailed channel information"""
        try:
            channel_response = self.youtube.channels().list(
                part='snippet,statistics',
                id=channel_id
            ).execute()

            if not channel_response['items']:
                return None

            item = channel_response['items'][0]
            snippet = item['snippet']
            statistics = item.get('statistics', {})

            # Parse creation date
            created_at = None
            if 'publishedAt' in snippet:
                created_at = datetime.fromisoformat(
                    snippet['publishedAt'].replace('Z', '+00:00'))

            channel = ChannelData(
                channel_id=channel_id,
                title=snippet['title'],
                description=snippet.get('description', ''),
                subscriber_count=int(statistics.get('subscriberCount', 0)),
                video_count=int(statistics.get('videoCount', 0)),
                view_count=int(statistics.get('viewCount', 0)),
                created_at=created_at,
                thumbnail_url=snippet.get(
                    'thumbnails', {}).get('high', {}).get('url')
            )

            return channel

        except Exception as e:
            logger.error(f"Error getting channel info: {e}")
            return None

    def get_trending_videos(self, region_code: str = 'US', max_results: int = 25) -> List[VideoData]:
        """Get trending videos for a specific region"""
        try:
            videos_response = self.youtube.videos().list(
                part='snippet,statistics,contentDetails',
                chart='mostPopular',
                regionCode=region_code,
                maxResults=min(max_results, 50)
            ).execute()

            videos = []
            for item in videos_response['items']:
                video = self._parse_video_item(item)
                if video:
                    videos.append(video)

            return videos

        except Exception as e:
            logger.error(f"Error getting trending videos: {e}")
            return []


# Example usage and testing
if __name__ == "__main__":
    # Test YouTube service
    try:
        # Initialize service
        youtube_service = YouTubeService()

        # Test search
        query = SearchQuery(
            query="python tutorial",
            max_results=5,
            order="viewCount"
        )

        results = youtube_service.search_videos(query)
        print(f"Search Results: {len(results.videos)} videos found")

        for video in results.videos:
            print(f"- {video.title} ({video.stats.view_count:,} views)")

        print("\nYouTube service test completed successfully!")

    except Exception as e:
        print(f"YouTube service test failed: {e}")
        print("Please check your YouTube API key in the .env file")


In [None]:
# tools/video_search_tool.py - MCP Video Search Tool

import logging
from typing import Any, Dict, List
from datetime import datetime
from mcp import Tool

from services.youtube_service import YouTubeService
from models.youtube_models import SearchQuery

logger = logging.getLogger(__name__)


class VideoSearchTool(Tool):
    """MCP tool for searching YouTube videos"""

    def __init__(self):
        super().__init__(
            name="search_videos",
            description="Search YouTube videos with detailed analysis and insights",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query for YouTube videos"
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "Maximum number of results to return (default: 10)",
                        "default": 10
                    },
                    "order": {
                        "type": "string",
                        "description": "Sort order: relevance, date, rating, viewCount",
                        "default": "relevance"
                    },
                    "region_code": {
                        "type": "string",
                        "description": "Region code for localized results (default: US)",
                        "default": "US"
                    }
                },
                "required": ["query"]
            }
        )
        self.youtube_service = YouTubeService()

    async def call(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Execute video search"""
        try:
            # Extract parameters
            query_text = arguments.get("query", "")
            max_results = min(arguments.get("max_results", 10), 50)
            order = arguments.get("order", "relevance")
            region_code = arguments.get("region_code", "US")

            logger.info(f"Searching videos: {query_text}")

            # Create search query
            search_query = SearchQuery(
                query=query_text,
                max_results=max_results,
                order=order,
                region_code=region_code
            )

            # Execute search
            results = self.youtube_service.search_videos(search_query)

            # Format results for MCP response
            response = {
                "success": True,
                "query": query_text,
                "total_results": results.total_results,
                "videos_found": len(results.videos),
                "average_engagement": f"{results.average_engagement:.2%}",
                "videos": []
            }

            # Add video details
            for video in results.videos:
                video_data = {
                    "title": video.title,
                    "channel": video.channel_title,
                    "views": video.stats.view_count,
                    "likes": video.stats.like_count,
                    "comments": video.stats.comment_count,
                    "engagement_rate": f"{video.stats.engagement_rate:.2%}",
                    "published": video.published_at.strftime("%Y-%m-%d"),
                    "duration": video.duration,
                    "category": video.category.value,
                    "url": f"https://youtube.com/watch?v={video.video_id}"
                }
                response["videos"].append(video_data)

            return response

        except Exception as e:
            logger.error(f"Video search error: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to search videos"
            }


=======     Video #6  the Market Tool   ======

In [None]:
need to uncomment the tool that we will be working with on mcp_server.py

In [None]:

Real Estate Topics (Punta Cana)

{
  "topic": "Punta Cana real estate market",
  "timeframe_days": 30,
  "sample_size": 50
}

{
  "topic": "Punta Cana property investment",
  "timeframe_days": 60,
  "sample_size": 40
}

{
  "topic": "Buying a vacation home in Punta Cana",
  "timeframe_days": 90,
  "sample_size": 50
}

{
  "topic": "Punta Cana beachfront condos for sale",
  "timeframe_days": 30,
  "sample_size": 25
}
{
  "topic": "Punta Cana real estate 2025 predictions",
  "timeframe_days": 180,
  "sample_size": 20
}


Investment Topics (Punta Cana Business & Finance)

{
  "topic": "Investing in Punta Cana",
  "timeframe_days": 30,
  "sample_size": 50
}

{
  "topic": "Punta Cana rental property investment",
  "timeframe_days": 60,
  "sample_size": 30
}

{
  "topic": "Punta Cana Airbnb income potential",
  "timeframe_days": 45,
  "sample_size": 35
}

{
  "topic": "Punta Cana business opportunities",
  "timeframe_days": 90,
  "sample_size": 40
}

{
  "topic": "Punta Cana luxury resort investments",
  "timeframe_days": 180,
  "sample_size": 25
}





In [None]:
# tools/market_analysis_tool.py - Market Analysis Tool

import logging
from typing import Any, Dict, List
from datetime import datetime, timedelta
from mcp import Tool
import pandas as pd
from textblob import TextBlob

from services.youtube_service import YouTubeService
from models.youtube_models import SearchQuery

logger = logging.getLogger(__name__)


class MarketAnalysisTool(Tool):
    """MCP tool for YouTube market analysis and trends"""

    def __init__(self):
        super().__init__(
            name="analyze_market",
            description="Analyze YouTube market trends and competition for specific topics",
            inputSchema={
                "type": "object",
                "properties": {
                    "topic": {
                        "type": "string",
                        "description": "Topic or niche to analyze"
                    },
                    "timeframe_days": {
                        "type": "integer",
                        "description": "Analyze videos from last N days (default: 30)",
                        "default": 30
                    },
                    "sample_size": {
                        "type": "integer",
                        "description": "Number of videos to analyze (default: 50)",
                        "default": 50
                    }
                },
                "required": ["topic"]
            }
        )
        self.youtube_service = YouTubeService()

    async def call(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Execute market analysis"""
        try:
            topic = arguments.get("topic", "")
            timeframe_days = arguments.get("timeframe_days", 30)
            sample_size = min(arguments.get("sample_size", 50), 100)

            logger.info(f"Analyzing market for topic: {topic}")

            # Search for recent videos
            published_after = datetime.now() - timedelta(days=timeframe_days)
            search_query = SearchQuery(
                query=topic,
                max_results=sample_size,
                published_after=published_after,
                order="viewCount"
            )

            results = self.youtube_service.search_videos(search_query)

            if not results.videos:
                return {
                    "success": False,
                    "message": f"No videos found for topic: {topic}"
                }

            # Analyze market metrics
            analysis = self._analyze_market_data(results.videos, topic)

            return {
                "success": True,
                "topic": topic,
                "timeframe_days": timeframe_days,
                "videos_analyzed": len(results.videos),
                "analysis": analysis
            }

        except Exception as e:
            logger.error(f"Market analysis error: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to analyze market"
            }

    def _analyze_market_data(self, videos, topic):
        """Analyze market data and generate insights"""
        # Convert to DataFrame for analysis
        data = []
        for video in videos:
            data.append({
                'title': video.title,
                'views': video.stats.view_count,
                'likes': video.stats.like_count,
                'comments': video.stats.comment_count,
                'engagement_rate': video.stats.engagement_rate,
                'channel': video.channel_title,
                'published': video.published_at,
                'duration_seconds': video.stats.duration_seconds
            })

        df = pd.DataFrame(data)

        # Calculate market metrics
        total_views = df['views'].sum()
        avg_views = df['views'].mean()
        avg_engagement = df['engagement_rate'].mean()

        # Find top performers
        top_videos = df.nlargest(5, 'views')[
            ['title', 'views', 'channel']].to_dict('records')

        # Channel analysis
        channel_performance = df.groupby('channel').agg({
            'views': ['sum', 'mean', 'count'],
            'engagement_rate': 'mean'
        }).round(2)

        top_channels = channel_performance.nlargest(5, ('views', 'sum'))

        # Sentiment analysis on titles
        sentiments = [
            TextBlob(title).sentiment.polarity for title in df['title']]
        avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0

        # Competition analysis
        unique_channels = df['channel'].nunique()
        competition_level = "High" if unique_channels > 30 else "Medium" if unique_channels > 15 else "Low"

        return {
            "market_overview": {
                "total_views": int(total_views),
                "average_views": int(avg_views),
                "average_engagement_rate": f"{avg_engagement:.2%}",
                "unique_creators": unique_channels,
                "competition_level": competition_level,
                "market_sentiment": f"{avg_sentiment:.2f}"
            },
            "top_performing_videos": top_videos,
            "top_channels": [
                {
                    "channel": idx,
                    "total_views": int(row[('views', 'sum')]),
                    "avg_views": int(row[('views', 'mean')]),
                    "video_count": int(row[('views', 'count')]),
                    "avg_engagement": f"{row[('engagement_rate', 'mean')]:.2%}"
                }
                for idx, row in top_channels.head(5).iterrows()
            ],
            "insights": self._generate_market_insights(df, topic, competition_level)
        }

    def _generate_market_insights(self, df, topic, competition_level):
        """Generate actionable market insights"""
        insights = []

        # View distribution insight
        views_std = df['views'].std()
        views_mean = df['views'].mean()
        if views_std > views_mean:
            insights.append(
                "High variance in video performance - opportunity for viral content")

        # Engagement insight
        high_engagement = df[df['engagement_rate'] > 0.05]
        if len(high_engagement) > 0:
            avg_duration = high_engagement['duration_seconds'].mean()
            insights.append(
                f"High-engagement videos average {int(avg_duration/60)} minutes duration")

        # Competition insight
        if competition_level == "Low":
            insights.append(
                "Low competition - good opportunity to establish authority")
        elif competition_level == "High":
            insights.append(
                "High competition - focus on unique angle or underserved subtopics")

        # Upload timing insight
        recent_videos = df[df['published'] > (
            datetime.now() - timedelta(days=7))]
        if len(recent_videos) > len(df) * 0.3:
            insights.append(
                "High recent activity - trending topic with immediate opportunity")

        return insights

In [None]:
# pyproject.toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "youtube-intelligence-mcp"
version = "1.0.0"
description = "YouTube Intelligence MCP Server for AI-powered video analysis"
authors = [{ name = "Francesco Piscani's YouTube Intelligence AI Platform Team" }]
license = { text = "MIT" }
readme = "README.md"
requires-python = ">=3.9"
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.9",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
]
dependencies = [
    "mcp>=0.9.0",
    "google-api-python-client==2.108.0",
    "pandas==2.1.3",
    "numpy==1.24.3",
    "nltk==3.8.1",
    "textblob==0.17.1",
    "anthropic==0.7.7",
    "firebase-admin==6.2.0",
    "python-dotenv==1.0.0",
    "pydantic==2.5.0",
    "aiofiles==23.2.1",
    "psutil==5.9.6",
]

[project.scripts]
youtube-intelligence-mcp = "server.__main__:main"

[tool.setuptools.packages.find]
include = ["*"]
exclude = ["tests*", "*.ipynb_checkpoints*"]
