In [None]:
# Install required packages
!pip install pytube opencv-python pillow pytesseract numpy

# Install tesseract OCR
!apt-get install tesseract-ocr

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  tesseract-ocr-eng tesseract-ocr-osd
The following NEW packages will be installed:
  tesseract-ocr tesseract-ocr-eng tesseract-ocr-osd
0 upgraded, 3 newly installed, 0 to remove and 49 not upgraded.
Need to get 4,816 kB of archives.
After this operation, 15.6 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr-eng all 1:4.00~git30-7274cfa-1.1 [1,591 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr-osd all 1:4.00~git30-7274cfa-1.1 [2,990 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy/universe amd64 tesseract-ocr amd64 4.1.1-2.1build1 [236 kB]
Fetched 4,816 kB in 1s (6,065 kB/s)
Selecting previously unselected package tesseract-ocr-eng.
(Reading database ... 123630 files and directories currently installed.)
Preparing to unpack .../tesseract-ocr-

In [None]:
!pip install yt-dlp

Collecting yt-dlp
  Downloading yt_dlp-2024.11.18-py3-none-any.whl.metadata (172 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/172.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.1/172.1 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2024.11.18-py3-none-any.whl (3.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━[0m [32m2.9/3.2 MB[0m [31m82.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m50.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2024.11.18


In [None]:
!pip install --upgrade pytube



In [None]:
pip install anthropic

Collecting anthropic
  Downloading anthropic-0.39.0-py3-none-any.whl.metadata (22 kB)
Downloading anthropic-0.39.0-py3-none-any.whl (198 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/198.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.4/198.4 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: anthropic
Successfully installed anthropic-0.39.0


In [None]:
# Without LLM
import cv2
import numpy as np
import os
from PIL import Image
import pytesseract
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
import json
from datetime import datetime
import asyncio
import yt_dlp

class EnhancedJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, (tuple, set)):
            return list(obj)
        return super().default(obj)

@dataclass
class AdDetails:
    video_id: str
    video_title: str
    timestamp: float
    ad_text: str
    confidence_score: float
    ad_position: tuple
    processed_date: str

    def to_dict(self) -> Dict[str, Any]:
        """Convert AdDetails to a dictionary with properly converted types."""
        result = asdict(self)
        # Convert tuple to list for JSON serialization
        result['ad_position'] = list(result['ad_position'])
        return result

class YouTubeAdScanner:
    def __init__(self, playlist_url: str, output_dir: str = "ad_scan_results"):
        self.playlist_url = playlist_url
        self.output_dir = output_dir
        self.results: List[AdDetails] = []

        # Create output directories
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "temp"), exist_ok=True)

        # Configure yt-dlp options
        self.ydl_opts = {
            'format': 'best[height<=720]',
            'quiet': True,
            'no_warnings': True,
            'extract_flat': True,
        }

    def get_playlist_videos(self):
        with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
            try:
                playlist_info = ydl.extract_info(self.playlist_url, download=False)
                if 'entries' in playlist_info:
                    return [(entry['id'], entry['title']) for entry in playlist_info['entries']]
                return []
            except Exception as e:
                print(f"Error extracting playlist info: {str(e)}")
                return []

    def extract_frames(self, video_path: str, num_frames: int = 5) -> List[np.ndarray]:
        frames = []
        cap = cv2.VideoCapture(video_path)

        for _ in range(num_frames):
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
            else:
                break

        cap.release()
        return frames

    def detect_ad_strip(self, frame: np.ndarray) -> Optional[tuple]:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        height = frame.shape[0]
        bottom_third = edges[2*height//3:]

        lines = cv2.HoughLinesP(bottom_third, 1, np.pi/180, 100,
                               minLineLength=frame.shape[1]//3, maxLineGap=20)

        if lines is not None:
            max_length = 0
            best_line = None

            for line in lines:
                x1, y1, x2, y2 = line[0]
                if abs(y2 - y1) < 10:
                    length = abs(x2 - x1)
                    if length > max_length:
                        max_length = length
                        best_line = line[0]

            if best_line is not None:
                x1, y1, x2, y2 = best_line
                return (0, y1 + 2*height//3 - 50, frame.shape[1], 50)

        return None

    def preprocess_image_for_ocr(self, image: np.ndarray) -> List[np.ndarray]:
        """Apply different preprocessing techniques to improve OCR accuracy."""
        preprocessed_images = []

        # Original image
        preprocessed_images.append(image)

        # Convert to grayscale and apply threshold
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        _, thresh1 = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        preprocessed_images.append(cv2.cvtColor(thresh1, cv2.COLOR_GRAY2BGR))

        # Apply adaptive threshold
        adaptive_thresh = cv2.adaptiveThreshold(
            gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
        preprocessed_images.append(cv2.cvtColor(adaptive_thresh, cv2.COLOR_GRAY2BGR))

        # Increase contrast
        lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
        cl = clahe.apply(l)
        enhanced = cv2.merge((cl, a, b))
        enhanced = cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR)
        preprocessed_images.append(enhanced)

        return preprocessed_images

    def extract_ad_text(self, frame: np.ndarray, ad_region: tuple) -> tuple:
        """Extract text from the ad region using multiple preprocessing techniques."""
        x, y, w, h = ad_region
        ad_image = frame[y:y+h, x:x+w]

        # Scale up the image to improve OCR
        scaled_image = cv2.resize(ad_image, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC)

        # Get preprocessed versions of the image
        preprocessed_images = self.preprocess_image_for_ocr(scaled_image)

        best_text = ""
        best_confidence = 0

        # Custom configuration for Tesseract
        custom_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789 ,.!?-"'

        for img in preprocessed_images:
            # Convert to PIL Image
            pil_image = Image.fromarray(img)

            # Try OCR with different page segmentation modes
            for psm in [6, 7, 3]:  # Single uniform block, single text line, auto
                try:
                    config = f'--oem 3 --psm {psm} {custom_config}'
                    ocr_data = pytesseract.image_to_data(pil_image, config=config,
                                                       output_type=pytesseract.Output.DICT)

                    text_parts = []
                    confidence_sum = 0
                    confidence_count = 0

                    for i in range(len(ocr_data['text'])):
                        conf = int(ocr_data['conf'][i])
                        if conf > 0:
                            text_parts.append(ocr_data['text'][i])
                            confidence_sum += float(conf)
                            confidence_count += 1

                    if confidence_count > 0:
                        text = " ".join(text_parts).strip()
                        avg_confidence = confidence_sum / confidence_count

                        # Keep the result with highest confidence
                        if avg_confidence > best_confidence and len(text) > 3:
                            best_text = text
                            best_confidence = avg_confidence

                except Exception as e:
                    print(f"OCR error with PSM {psm}: {str(e)}")
                    continue

        # Additional post-processing of the text
        if best_text:
            # Remove extra whitespace
            best_text = " ".join(best_text.split())

            # Remove common OCR artifacts
            best_text = re.sub(r'[^\w\s.,!?-]', '', best_text)

            # Save the processed image for debugging
            debug_path = os.path.join(self.output_dir, "frames", f"debug_ocr_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg")
            cv2.imwrite(debug_path, scaled_image)

        return best_text, best_confidence

    async def process_video(self, video_id: str, video_title: str):
        try:
            print(f"Processing video: {video_title} ({video_id})")

            video_url = f"https://www.youtube.com/watch?v={video_id}"
            ydl_opts = {
                'format': 'best[height<=720]',
                'quiet': True,
                'no_warnings': True,
                'outtmpl': os.path.join(self.output_dir, "temp", f"{video_id}.%(ext)s")
            }

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(video_url, download=True)
                video_path = ydl.prepare_filename(info)

            if os.path.exists(video_path):
                frames = self.extract_frames(video_path)

                for frame_idx, frame in enumerate(frames):
                    ad_region = self.detect_ad_strip(frame)

                    if ad_region:
                        frame_path = os.path.join(
                            self.output_dir, "frames",
                            f"{video_id}_frame_{frame_idx}.jpg")
                        cv2.imwrite(frame_path, frame)

                        ad_text, confidence = self.extract_ad_text(frame, ad_region)

                        if ad_text:
                            ad_details = AdDetails(
                                video_id=video_id,
                                video_title=video_title,
                                timestamp=float(frame_idx) / 30.0,  # Convert to float explicitly
                                ad_text=ad_text,
                                confidence_score=float(confidence),  # Convert to float explicitly
                                ad_position=tuple(int(x) for x in ad_region),  # Convert to regular integers
                                processed_date=datetime.now().isoformat()
                            )
                            self.results.append(ad_details)
                            print(f"Found ad in video {video_id} at frame {frame_idx}")
                            print(f"Ad text: {ad_text}")

                os.remove(video_path)

        except Exception as e:
            print(f"Error processing video {video_id}: {str(e)}")

    async def process_playlist(self):
        try:
            print(f"Processing playlist: {self.playlist_url}")
            videos = self.get_playlist_videos()

            if not videos:
                print("No videos found in playlist")
                return

            print(f"Found {len(videos)} videos")

            semaphore = asyncio.Semaphore(2)

            async def process_with_semaphore(video_id, video_title):
                async with semaphore:
                    await self.process_video(video_id, video_title)
                    await asyncio.sleep(1)

            tasks = [process_with_semaphore(vid_id, title) for vid_id, title in videos]
            await asyncio.gather(*tasks)

            self.save_results()

        except Exception as e:
            print(f"Error processing playlist: {str(e)}")
            raise  # Re-raise the exception to see the full traceback

    def save_results(self):
        """Save scanning results to a JSON file."""
        results_file = os.path.join(self.output_dir, "scan_results.json")

        # Convert results to dictionaries with proper type conversion
        results_data = [result.to_dict() for result in self.results]

        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(results_data, f, indent=2, cls=EnhancedJSONEncoder, ensure_ascii=False)

        print(f"Results saved to {results_file}")
        print(f"Found {len(self.results)} ads across all videos")



In [None]:
playlist_url = "https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO"
scanner = YouTubeAdScanner(playlist_url)
await scanner.process_playlist()  # Use await directly in Colab

Processing playlist: https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO
Found 4 videos
Processing video: PM Modi Hails Mahayuti Landslide Victory In Maharashtra Elections | Ntv (oP-xZpQkOJ4)
Processing video: కార్తిక శనివారం శుభవేళ లింగ రూపంలో సాక్షాత్కారం అయినా శివుడిని దర్శించడం మహాద్భుతం | Lingodbhavam (4FR4MHGMj0o)
Found ad in video 4FR4MHGMj0o at frame 0
Ad text: hE Ty, ry ce ght. BR
Found ad in video 4FR4MHGMj0o at frame 1
Ad text: a mal 7 4 J PY ti I vg ni 7, 7
Found ad in video 4FR4MHGMj0o at frame 2
Ad text: ine id J eae ee
Found ad in video 4FR4MHGMj0o at frame 3
Ad text: 7 wh Pe A a
Found ad in video 4FR4MHGMj0o at frame 4
Ad text: et J t- ey crs
Processing video: కోటి దీపోత్సవ జ్ఞానదీపాన్ని వెలిగించిన తర్వాత మీ ఇష్టదైవాన్ని స్మరించండి.. సకల శుభాలు చేకూరుతాయి..! (sHVqXoWQrZ4)
Found ad in video sHVqXoWQrZ4 at frame 0
Ad text: a ee a ee I al a ee ER ee Or Se a ee. ary We Ak Oe
Found ad in video sHVqXoWQrZ4 at frame 1
Ad text: ae A ee 4 i ri i, sy . Sd a a

In [None]:
# With Vision LLM- Claude Version 1.0 and 2.0
import cv2
import numpy as np
import os
from PIL import Image
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
import json
from datetime import datetime
import asyncio
import yt_dlp
import anthropic
import base64
import io

# Replace this with your API key
CLAUDE_API_KEY = "Your_api_key_here"

@dataclass
class AdDetails:
    video_id: str
    video_title: str
    timestamp: float
    ad_text: str
    confidence_score: float
    ad_position: tuple
    processed_date: str

    def to_dict(self) -> Dict[str, Any]:
        result = asdict(self)
        result['ad_position'] = list(result['ad_position'])
        return result

class YouTubeAdScanner:
    def __init__(self, playlist_url: str, output_dir: str = "ad_scan_results"):
        self.playlist_url = playlist_url
        self.output_dir = output_dir
        self.results: List[AdDetails] = []
        self.client = anthropic.Anthropic(api_key=CLAUDE_API_KEY)

        # Create output directories
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "temp"), exist_ok=True)

        # Configure yt-dlp options
        self.ydl_opts = {
            'format': 'best[height<=720]',
            'quiet': True,
            'no_warnings': True,
            'extract_flat': True,
        }

    def get_playlist_videos(self):
        with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
            try:
                playlist_info = ydl.extract_info(self.playlist_url, download=False)
                if 'entries' in playlist_info:
                    return [(entry['id'], entry['title']) for entry in playlist_info['entries']]
                return []
            except Exception as e:
                print(f"Error extracting playlist info: {str(e)}")
                return []

    def extract_frames(self, video_path: str, num_frames: int = 5) -> List[np.ndarray]:
        frames = []
        cap = cv2.VideoCapture(video_path)

        for _ in range(num_frames):
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
            else:
                break

        cap.release()
        return frames

    def detect_ad_strip(self, frame: np.ndarray) -> Optional[tuple]:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)

        height = frame.shape[0]
        bottom_third = edges[2*height//3:]

        lines = cv2.HoughLinesP(bottom_third, 1, np.pi/180, 100,
                               minLineLength=frame.shape[1]//3, maxLineGap=20)

        if lines is not None:
            max_length = 0
            best_line = None

            for line in lines:
                x1, y1, x2, y2 = line[0]
                if abs(y2 - y1) < 10:
                    length = abs(x2 - x1)
                    if length > max_length:
                        max_length = length
                        best_line = line[0]

            if best_line is not None:
                x1, y1, x2, y2 = best_line
                return (0, y1 + 2*height//3 - 50, frame.shape[1], 50)

        return None

    def image_to_base64(self, image: np.ndarray) -> str:
        """Convert an OpenCV image to base64 string."""
        # Convert from BGR to RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Convert to PIL Image
        pil_image = Image.fromarray(image_rgb)

        # Save to bytes buffer
        buffer = io.BytesIO()
        pil_image.save(buffer, format='JPEG')

        # Get base64 string
        base64_string = base64.b64encode(buffer.getvalue()).decode('utf-8')
        return base64_string

    async def extract_text_with_claude(self, frame: np.ndarray, ad_region: tuple) -> tuple:
        """Extract text from image using Claude's vision capabilities."""
        try:
            x, y, w, h = ad_region
            ad_image = frame[y:y+h, x:x+w]

            # Convert image to base64
            image_base64 = self.image_to_base64(ad_image)

            # Create message with Claude
            message = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/jpeg",
                                    "data": image_base64,
                                },
                            },
                            {
                                "type": "text",
                                "text": "What logos and text appears in this image?"
                            }
                        ],
                    }
                ],
            )

            # Extract text from Claude's response
            extracted_text = message.content[0].text.strip()

            # For now, use a fixed confidence score since Claude doesn't provide one
            confidence_score = 0.9 if extracted_text else 0.0

            return extracted_text, confidence_score

        except Exception as e:
            print(f"Error using Claude for text extraction: {str(e)}")
            return "", 0.0

    async def process_video(self, video_id: str, video_title: str):
        try:
            print(f"Processing video: {video_title} ({video_id})")

            video_url = f"https://www.youtube.com/watch?v={video_id}"
            ydl_opts = {
                'format': 'best[height<=720]',
                'quiet': True,
                'no_warnings': True,
                'outtmpl': os.path.join(self.output_dir, "temp", f"{video_id}.%(ext)s")
            }

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(video_url, download=True)
                video_path = ydl.prepare_filename(info)

            if os.path.exists(video_path):
                frames = self.extract_frames(video_path)

                for frame_idx, frame in enumerate(frames):
                    ad_region = self.detect_ad_strip(frame)

                    if ad_region:
                        frame_path = os.path.join(
                            self.output_dir, "frames",
                            f"{video_id}_frame_{frame_idx}.jpg")
                        cv2.imwrite(frame_path, frame)

                        ad_text, confidence = await self.extract_text_with_claude(frame, ad_region)

                        if ad_text:
                            ad_details = AdDetails(
                                video_id=video_id,
                                video_title=video_title,
                                timestamp=float(frame_idx) / 30.0,
                                ad_text=ad_text,
                                confidence_score=float(confidence),
                                ad_position=tuple(int(x) for x in ad_region),
                                processed_date=datetime.now().isoformat()
                            )
                            self.results.append(ad_details)
                            print(f"Found ad in video {video_id} at frame {frame_idx}")
                            print(f"Ad text: {ad_text}")

                os.remove(video_path)

        except Exception as e:
            print(f"Error processing video {video_id}: {str(e)}")

    async def process_playlist(self):
        try:
            print(f"Processing playlist: {self.playlist_url}")
            videos = self.get_playlist_videos()

            if not videos:
                print("No videos found in playlist")
                return

            print(f"Found {len(videos)} videos")

            semaphore = asyncio.Semaphore(2)

            async def process_with_semaphore(video_id, video_title):
                async with semaphore:
                    await self.process_video(video_id, video_title)
                    await asyncio.sleep(1)

            tasks = [process_with_semaphore(vid_id, title) for vid_id, title in videos]
            await asyncio.gather(*tasks)

            self.save_results()

        except Exception as e:
            print(f"Error processing playlist: {str(e)}")
            raise

    def save_results(self):
        results_file = os.path.join(self.output_dir, "scan_results.json")
        results_data = [result.to_dict() for result in self.results]

        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(results_data, f, indent=2, ensure_ascii=False)

        print(f"Results saved to {results_file}")
        print(f"Found {len(self.results)} ads across all videos")



In [None]:
playlist_url = "https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO"
scanner = YouTubeAdScanner(playlist_url)
await scanner.process_playlist()  # Use await directly in Colab

Processing playlist: https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO
Found 4 videos
Processing video: PM Modi Hails Mahayuti Landslide Victory In Maharashtra Elections | Ntv (oP-xZpQkOJ4)
Found ad in video oP-xZpQkOJ4 at frame 0
Ad text: In this image, there appears to be a "NO" logo in white text against a red background, with what looks like "MAKING" text underneath it. Below that, there's text that reads "DON'T MISS" where "DON'T" appears in white text and "MISS" appears in yellow text, all on a red banner or background.
Found ad in video oP-xZpQkOJ4 at frame 1
Ad text: This appears to be some kind of header or banner with a red background. It contains "NO" with some additional text next to it, and "DON'T MISS" in white and yellow text. The design appears to be part of a website or digital interface, with a clean, bold style typical of web banners or notification bars.
Found ad in video oP-xZpQkOJ4 at frame 2
Ad text: In this image, there appears to be a red

In [None]:
import cv2
import numpy as np
import os
from PIL import Image
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
import json
from datetime import datetime
import asyncio
import yt_dlp
import anthropic
import base64
import io

# Replace this with your API key
CLAUDE_API_KEY = "your_api_key_here"

@dataclass
class AdDetails:
    video_id: str
    video_title: str
    timestamp: float
    company_name: str
    offers: List[str]
    disclaimer_text: str
    ad_position: tuple
    processed_date: str

    def to_dict(self) -> Dict[str, Any]:
        result = asdict(self)
        result['ad_position'] = list(result['ad_position'])
        return result

class YouTubeAdScanner:
    def __init__(self, playlist_url: str, output_dir: str = "ad_scan_results"):
        self.playlist_url = playlist_url
        self.output_dir = output_dir
        self.results: List[AdDetails] = []
        self.client = anthropic.Anthropic(api_key=CLAUDE_API_KEY)

        # Create output directories
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "temp"), exist_ok=True)

        # Configure yt-dlp options
        self.ydl_opts = {
            'format': 'best[height<=720]',
            'quiet': True,
            'no_warnings': True,
            'extract_flat': True,
        }

    def get_playlist_videos(self):
        with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
            try:
                playlist_info = ydl.extract_info(self.playlist_url, download=False)
                if 'entries' in playlist_info:
                    return [(entry['id'], entry['title']) for entry in playlist_info['entries']]
                return []
            except Exception as e:
                print(f"Error extracting playlist info: {str(e)}")
                return []

    def extract_frames(self, video_path: str, num_frames: int = 5) -> List[np.ndarray]:
        frames = []
        cap = cv2.VideoCapture(video_path)

        for _ in range(num_frames):
            ret, frame = cap.read()
            if ret:
                frames.append(frame)
            else:
                break

        cap.release()
        return frames

    def detect_ad_strip(self, frame: np.ndarray) -> Optional[tuple]:
        """Extract the bottom advertisement strip from the frame."""
        height, width = frame.shape[:2]

        # Define the bottom region (approximately last 10% of the frame)
        bottom_start = int(height * 0.9)

        # Return coordinates for bottom strip
        return (0, bottom_start, width, height - bottom_start)

    def image_to_base64(self, image: np.ndarray) -> str:
        """Convert an OpenCV image to base64 string."""
        # Convert from BGR to RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Convert to PIL Image
        pil_image = Image.fromarray(image_rgb)

        # Save to bytes buffer
        buffer = io.BytesIO()
        pil_image.save(buffer, format='JPEG')

        # Get base64 string
        base64_string = base64.b64encode(buffer.getvalue()).decode('utf-8')
        return base64_string

    async def extract_text_with_claude(self, frame: np.ndarray, ad_region: tuple) -> Dict[str, Any]:
        """Analyze advertisement using Claude's vision capabilities."""
        try:
            x, y, w, h = ad_region
            ad_image = frame[y:y+h, x:x+w]

            # Convert image to base64
            image_base64 = self.image_to_base64(ad_image)

            # Create message with Claude
            message = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/jpeg",
                                    "data": image_base64,
                                },
                            },
                            {
                                "type": "text",
                                "text": "Look at the bottom banner/advertisement strip of this image. Tell me all the promotional offers, deals, or advertisement text you can see. Format your response as a JSON with these fields: company_name, offers (array of strings), and any_disclaimer_text. Only analyze the bottom advertisement strip."
                            }
                        ],
                    }
                ],
            )

            # Extract JSON from Claude's response
            response_text = message.content[0].text.strip()

            try:
                # Try to parse JSON from the response
                ad_info = json.loads(response_text)
            except json.JSONDecodeError:
                # If JSON parsing fails, create a basic structure
                print(f"Failed to parse JSON from Claude's response. Raw response: {response_text}")
                ad_info = {
                    "company_name": "",
                    "offers": [],
                    "any_disclaimer_text": ""
                }

            return ad_info

        except Exception as e:
            print(f"Error using Claude for text extraction: {str(e)}")
            return {
                "company_name": "",
                "offers": [],
                "any_disclaimer_text": ""
            }

    async def process_video(self, video_id: str, video_title: str):
        try:
            print(f"Processing video: {video_title} ({video_id})")

            video_url = f"https://www.youtube.com/watch?v={video_id}"
            ydl_opts = {
                'format': 'best[height<=720]',
                'quiet': True,
                'no_warnings': True,
                'outtmpl': os.path.join(self.output_dir, "temp", f"{video_id}.%(ext)s")
            }

            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(video_url, download=True)
                video_path = ydl.prepare_filename(info)

            if os.path.exists(video_path):
                frames = self.extract_frames(video_path)

                for frame_idx, frame in enumerate(frames):
                    ad_region = self.detect_ad_strip(frame)

                    if ad_region:
                        frame_path = os.path.join(
                            self.output_dir, "frames",
                            f"{video_id}_frame_{frame_idx}.jpg")
                        cv2.imwrite(frame_path, frame)

                        ad_info = await self.extract_text_with_claude(frame, ad_region)

                        if ad_info and (ad_info.get("company_name") or ad_info.get("offers")):
                            ad_details = AdDetails(
                                video_id=video_id,
                                video_title=video_title,
                                timestamp=float(frame_idx) / 30.0,
                                company_name=ad_info.get("company_name", ""),
                                offers=ad_info.get("offers", []),
                                disclaimer_text=ad_info.get("any_disclaimer_text", ""),
                                ad_position=tuple(int(x) for x in ad_region),
                                processed_date=datetime.now().isoformat()
                            )
                            self.results.append(ad_details)
                            print(f"Found ad in video {video_id} at frame {frame_idx}")
                            print(f"Company: {ad_info.get('company_name', '')}")
                            print(f"Offers: {ad_info.get('offers', [])}")

                os.remove(video_path)

        except Exception as e:
            print(f"Error processing video {video_id}: {str(e)}")
            raise

    async def process_playlist(self):
        try:
            print(f"Processing playlist: {self.playlist_url}")
            videos = self.get_playlist_videos()

            if not videos:
                print("No videos found in playlist")
                return

            print(f"Found {len(videos)} videos")

            semaphore = asyncio.Semaphore(2)

            async def process_with_semaphore(video_id, video_title):
                async with semaphore:
                    await self.process_video(video_id, video_title)
                    await asyncio.sleep(1)

            tasks = [process_with_semaphore(vid_id, title) for vid_id, title in videos]
            await asyncio.gather(*tasks)

            self.save_results()

        except Exception as e:
            print(f"Error processing playlist: {str(e)}")
            raise

    def save_results(self):
        results_file = os.path.join(self.output_dir, "scan_results.json")
        results_data = [result.to_dict() for result in self.results]

        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(results_data, f, indent=2, ensure_ascii=False)

        print(f"Results saved to {results_file}")
        print(f"Found {len(self.results)} ads across all videos")



In [None]:
playlist_url = "https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO"
scanner = YouTubeAdScanner(playlist_url)
await scanner.process_playlist()  # Use await directly in Colab

Processing playlist: https://www.youtube.com/playlist?list=PLTycFjvfXg-sYpyfA4a_lcIbMXhVtQrxO
Found 4 videos
Processing video: PM Modi Hails Mahayuti Landslide Victory In Maharashtra Elections | Ntv (oP-xZpQkOJ4)
Failed to parse JSON from Claude's response. Raw response: {
  "company_name": "Komanecvim",
  "offers": [
    "GET FREE 10 YEARS",
    "0-040-20000000" // appears to be some form of contact number/code
  ],
  "any_disclaimer_text": "No visible disclaimers in the banner strip"
}

Note: There appears to be potentially another text/number sequence starting with "0-040-" on the left side of the banner, but it's partially cut off and not fully legible in the image.
Found ad in video oP-xZpQkOJ4 at frame 1
Company: Kamani Evm Factory Outlet
Offers: ['GET FREE 10 YEARS', 'Less Expense More Saving']
Found ad in video oP-xZpQkOJ4 at frame 2
Company: Koman Evolution Factory Outlet
Offers: ['Get Free 10 Years Service', 'Less Expense More Saving']
Failed to parse JSON from Claude's respo

In [None]:
!pip install pandas matplotlib seaborn plotly



In [None]:
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from collections import Counter
import os
from typing import List, Dict, Any
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class AdReportGenerator:
    def __init__(self, json_path: str, output_dir: str = "ad_reports"):
        self.json_path = json_path
        self.output_dir = output_dir
        self.data = None
        self.df = None

        # Create output directory
        os.makedirs(output_dir, exist_ok=True)

    def load_data(self):
        """Load and parse the JSON data."""
        with open(self.json_path, 'r', encoding='utf-8') as f:
            self.data = json.load(f)

        # Convert to DataFrame
        self.df = pd.DataFrame(self.data)

        # Convert processed_date to datetime
        self.df['processed_date'] = pd.to_datetime(self.df['processed_date'])

        # Explode the offers array into separate rows
        self.df_offers = self.df.explode('offers').reset_index(drop=True)

        # Remove any None or empty values from offers
        self.df_offers = self.df_offers[self.df_offers['offers'].notna()]
        self.df_offers = self.df_offers[self.df_offers['offers'] != '']

    def generate_summary_stats(self) -> Dict[str, Any]:
        """Generate summary statistics."""
        summary = {
            'total_ads_detected': len(self.df),
            'unique_companies': len(self.df['company_name'].unique()),
            'total_videos_analyzed': len(self.df['video_id'].unique()),
            'unique_offers': len(self.df_offers['offers'].unique()) if not self.df_offers.empty else 0,
            'most_common_company': self.df['company_name'].mode().iloc[0] if not self.df['company_name'].empty else 'N/A',
            'avg_offers_per_ad': len(self.df_offers) / len(self.df) if len(self.df) > 0 else 0,
            'date_range': {
                'start': self.df['processed_date'].min().strftime('%Y-%m-%d') if not self.df.empty else 'N/A',
                'end': self.df['processed_date'].max().strftime('%Y-%m-%d') if not self.df.empty else 'N/A'
            }
        }
        return summary

    def analyze_offers(self) -> Dict[str, Any]:
        """Analyze offers and their patterns."""
        if self.df_offers.empty:
            return {
                'most_common_offers': {},
                'offers_by_company': {},
                'avg_offers_per_company': 0
            }

        offer_analysis = {
            'most_common_offers': self.df_offers['offers'].value_counts().head(10).to_dict(),
            'offers_by_company': self.df.groupby('company_name')['offers'].apply(lambda x: [item for sublist in x for item in sublist]).to_dict(),
            'avg_offers_per_company': self.df.groupby('company_name').agg({'offers': lambda x: sum(len(i) for i in x)}).mean()['offers']
        }
        return offer_analysis

    def generate_visualizations(self):
        """Generate various visualizations for the report."""

        # Only generate visualizations if we have data
        if self.df.empty:
            print("No data available for visualizations")
            return

        # 1. Companies and their ad frequencies
        company_counts = self.df['company_name'].value_counts().reset_index()
        company_counts.columns = ['company', 'count']
        fig1 = px.bar(company_counts,
                     x='company',
                     y='count',
                     title='Ad Frequency by Company',
                     labels={'company': 'Company', 'count': 'Number of Ads'},
                     color='company')
        fig1.write_html(os.path.join(self.output_dir, 'company_frequency.html'))

        # 2. Offer types distribution (only if we have offers data)
        if not self.df_offers.empty:
            fig2 = px.pie(self.df_offers,
                         names='offers',
                         title='Distribution of Offer Types')
            fig2.write_html(os.path.join(self.output_dir, 'offer_distribution.html'))

        # 3. Timeline of ads
        self.df['end_date'] = self.df['processed_date'] + pd.Timedelta(minutes=15)
        fig3 = px.timeline(self.df,
                          x_start='processed_date',
                          x_end='end_date',
                          y='company_name',
                          color='company_name',
                          title='Timeline of Ad Appearances')
        fig3.write_html(os.path.join(self.output_dir, 'ad_timeline.html'))

        # 4. Heatmap of ads by day and hour
        self.df['hour'] = self.df['processed_date'].dt.hour
        self.df['day'] = self.df['processed_date'].dt.day_name()

        # Create the crosstab and fill NaN with 0
        heatmap_data = pd.crosstab(
            self.df['day'],
            self.df['hour']
        ).fillna(0)

        fig4 = px.imshow(heatmap_data,
                        title='Ad Frequency Heatmap by Day and Hour',
                        labels=dict(x='Hour of Day', y='Day of Week', color='Number of Ads'))
        fig4.write_html(os.path.join(self.output_dir, 'ad_heatmap.html'))

    def generate_html_report(self):
        """Generate a comprehensive HTML report."""
        summary_stats = self.generate_summary_stats()
        offer_analysis = self.analyze_offers()

        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Ad Campaign Analysis Report</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 40px; }}
                .container {{ max-width: 1200px; margin: 0 auto; }}
                .section {{ margin-bottom: 40px; }}
                .stat-box {{
                    background: #f5f5f5;
                    padding: 20px;
                    border-radius: 5px;
                    margin-bottom: 20px;
                }}
                .flex-container {{
                    display: flex;
                    flex-wrap: wrap;
                    gap: 20px;
                }}
                .stat-item {{
                    flex: 1;
                    min-width: 200px;
                    background: white;
                    padding: 15px;
                    border-radius: 5px;
                    box-shadow: 0 2px 4px rgba(0,0,0,0.1);
                }}
                table {{
                    width: 100%;
                    border-collapse: collapse;
                    margin-top: 20px;
                }}
                th, td {{
                    padding: 12px;
                    border: 1px solid #ddd;
                    text-align: left;
                }}
                th {{ background: #f5f5f5; }}
                h1, h2 {{ color: #333; }}
                .chart-container {{ margin: 20px 0; height: 600px; }}
                iframe {{ border: none; width: 100%; height: 100%; }}
            </style>
        </head>
        <body>
            <div class="container">
                <h1>Ad Campaign Analysis Report</h1>
                <p>Report generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>

                <div class="section">
                    <h2>Summary Statistics</h2>
                    <div class="flex-container">
                        <div class="stat-item">
                            <h3>Total Ads Detected</h3>
                            <p>{summary_stats['total_ads_detected']}</p>
                        </div>
                        <div class="stat-item">
                            <h3>Unique Companies</h3>
                            <p>{summary_stats['unique_companies']}</p>
                        </div>
                        <div class="stat-item">
                            <h3>Videos Analyzed</h3>
                            <p>{summary_stats['total_videos_analyzed']}</p>
                        </div>
                        <div class="stat-item">
                            <h3>Unique Offers</h3>
                            <p>{summary_stats['unique_offers']}</p>
                        </div>
                    </div>
                </div>
        """

        # Only add offers section if we have offer data
        if offer_analysis['most_common_offers']:
            html_content += f"""
                <div class="section">
                    <h2>Most Common Offers</h2>
                    <table>
                        <tr>
                            <th>Offer</th>
                            <th>Frequency</th>
                        </tr>
                        {''.join(f"<tr><td>{offer}</td><td>{count}</td></tr>"
                               for offer, count in offer_analysis['most_common_offers'].items())}
                    </table>
                </div>
            """

        # Only add company analysis if we have company data
        if offer_analysis['offers_by_company']:
            html_content += f"""
                <div class="section">
                    <h2>Company Analysis</h2>
                    <table>
                        <tr>
                            <th>Company</th>
                            <th>Unique Offers</th>
                        </tr>
                        {''.join(f"<tr><td>{company}</td><td>{len(set(offers))}</td></tr>"
                               for company, offers in offer_analysis['offers_by_company'].items())}
                    </table>
                </div>
            """

        # Add visualizations
        html_content += """
                <div class="section">
                    <h2>Visualizations</h2>
                    <div class="chart-container">
                        <iframe src="company_frequency.html"></iframe>
                    </div>
        """

        # Only add offer distribution if we have offer data
        if not self.df_offers.empty:
            html_content += """
                    <div class="chart-container">
                        <iframe src="offer_distribution.html"></iframe>
                    </div>
            """

        html_content += """
                    <div class="chart-container">
                        <iframe src="ad_timeline.html"></iframe>
                    </div>
                    <div class="chart-container">
                        <iframe src="ad_heatmap.html"></iframe>
                    </div>
                </div>
            </div>
        </body>
        </html>
        """

        with open(os.path.join(self.output_dir, 'ad_analysis_report.html'), 'w', encoding='utf-8') as f:
            f.write(html_content)

    def generate_report(self):
        """Generate the complete report."""
        print("Loading data...")
        self.load_data()

        print("Generating visualizations...")
        self.generate_visualizations()

        print("Generating HTML report...")
        self.generate_html_report()

        print(f"Report generated successfully in {self.output_dir}")

# Usage example
if __name__ == "__main__":
    # Specify the path to your JSON file
    json_path = "/content/ad_scan_results/scan_results_withClaude3.json"

    # Create report generator
    report_generator = AdReportGenerator(json_path)

    # Generate report
    report_generator.generate_report()

Loading data...
Generating visualizations...
Generating HTML report...
Report generated successfully in ad_reports
