Imports

In [None]:
!pip install anthropic
!pip install ffmpeg-python
!pip install av
!pip install scenedetect

In [None]:
import anthropic
from json import loads
import time
import json
import os
import cv2
import torch
import random
import ffmpeg
import warnings
import numpy as np
import pandas as pd
from PIL import Image
import soundfile as sf
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import resample
import typing_extensions as typing
from scenedetect import open_video,  VideoStreamCv2, SceneManager
from scenedetect.detectors import ContentDetector
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [None]:
# Extracting all required ids 

df = pd.read_csv('/kaggle/input/youtube-data/ground_labels_new.csv') 

df['Primary Label'] = df['Primary Label'].str.lower() 
df = df[df['Primary Label'].isin(['appropriate', 'inappropriate'])]


transcriptions_df = pd.read_csv('/kaggle/input/youtube-data/eng-complete-transcriptions.csv')
df = pd.merge(df, transcriptions_df, on='Video Id', how='inner') 

df

In [None]:
# Extracting video ids and primary labels

video_ids = list(df['Video Id'])
primary_labels = list(df['Primary Label'])
all_transcriptions = list(df['Transcription'])

In [None]:
# Extracting data from transcripts
transcriptions = []
lengths = []

for (i, id_) in enumerate(video_ids):
    transcriptions.append(all_transcriptions[i].split("chunks")[0])
    lengths.append(len(all_transcriptions[i].split("chunks")[0]))

In [None]:
available_ids = os.listdir('/kaggle/input/youtube-data/Ads/Ads') 
len(available_ids) 

In [None]:
available_ad_ids = [] 

for i in range(len(video_ids)): 
    if video_ids[i] in available_ids: 
        #if video_ids[i] not in ids_covered: 
        available_ad_ids.append(video_ids[i]) 
    else: 
        print(i, video_ids[i]) 

In [None]:
len(available_ad_ids)

**Setting up Model** 

In [None]:
os.environ["ANTHROPIC_API_KEY"] = "" 

client = anthropic.Anthropic() 

**Prompt** 

In [None]:
prompt =  """"A video can be considered inappropriate for children if it contains physical violence (cartoonish, realistic, or gory), interpersonal violence (bullying, pranks, meanness, belittling, controlling behavior, talking down to others, or manipulation), self-harm or suicide (depictions of harm inflicted on oneself or suicidal thoughts/tendencies), extreme stunts (life-endangering, high-risk activities/challenges that require adult supervision), dangerous products or services (like paintball, airsoft, fireworks, weapons, hunting equipment, graffiti products), scary content (horror, suspense, loud violence, zombies, skeletons, masks, scary clowns, blood, dangerous fire, car crashes, medical procedures or other scary visuals), sexual content (innuendos, sexual behavior, nudity, suggestiveness), intimacy (romantic conduct, couples expressing love, sensual contact, dating, relationships or any expression of love or loss for a significant other,  lyrics about or mentions of love or romance, heartbreak), inappropriate clothing (significant skin exposure or sheer clothing outside of appropriate contexts such as sunbathing, swinmwear,etc), substance use (mention, depictions, or use of alcohol, drugs, tobacco), gambling (betting, gambling, casinos, online or real world casino games, online trading), offensive language (from mild to severe with words such as “stupid” or “idiot”), stereotypes (racial, ethnic, gender, body shaming), crude humor (inappropriate humor, innuendos), weapons (guns, swords, knives, etc but toy guns such as nerf guns are fine), unsuitable media (ads for violent or adult-themed video games, casino games, games with romance or battle or first person-shooter games), fight sports (boxing, wrestling, martial arts), financial content (difference, rolling spot forex, financial spread betting and synonymous products but banking ads are fine), health and beauty services (cosmetic procedures, weight loss, tanning, piercings, tattoos, cosmetics and makeup (no matter their essential benefits) and other products related to external personal care focused on body image but ads on fashion products and essential hygiene products like sunscreen, shampoo, toothpaste, etc. are fine, however, products like sanitary care and hair removal is inappropriate), ads or sponsored content for consumable food or drink products (regardless of nutritional content), where the product is consumed by a person or shown with branding, packaging, or promotional intent are inappropriate; general content like cooking shows, recipes, or scenes where food is simply present or being eaten without promotion are acceptable, pet/animal food products and healthcare, healthcare or medical topics (reproductive health, mental health, substance abuse, eating disorders, pharmaceuticals, health insurance, any nutritional or dietary supplements), sensitive topics (politics, religion, controversial social issues, astrology, occult, paranormal, religious sermons), privacy or safety risks (services or competitions requiring personal information, social networks for strangers, spy services, or video game skin trading), or any explicit depictions or discussions of death or dying. Consider that you are an expert content moderator. Based on the image grids provided containing frames from key scenes from within the video, is the video advertisement inappropriate for children on this definition? If it is, label it as “inappropriate.” If it is not, label it as “appropriate.” Justify your conclusion. Also, mention what language is being spoken if there is any. 
Respond in **JSON format only**. Do **not** include any commentary.

Return only this format (no preamble):

{
  "label": "appropriate" or "inappropriate",
  "justification": "brief reason",
  "language": "language spoken in the video"
}"""

prompt 

In [None]:
import base64
import httpx

# Function to encode the image

def encode_image(image_path):
    with open(image_path, "rb") as image_file: 
        image_media_type = "image/png" 
        image_data = base64.b64encode(image_file.read()).decode('utf-8') 
        return image_media_type, image_data 

In [None]:
def classify_video_with_images(text_input, audio_transcription, image_paths):
    
    image_contents = [
        {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": img_media_type,
                "data": img_data,
            }
        }
        for image_path in image_paths
        for img_media_type, img_data in [encode_image(image_path)]
    ]
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",  # <- Updated model name
        system="You are a content classification assistant that evaluates image frames and audio transcriptions from videos according to strict guidelines and gives response in JSON format.",
        max_tokens=1024,
        temperature=0.0,
        messages=[
            {
                "role": "user",
                "content": [
                    *image_contents,
                    {"type": "text", "text": text_input},
                    {"type": "text", "text": audio_transcription}
                ],
            }
        ]
    )

    return response

**Extracting Images from Videos** 

In [None]:
def detect_scenes(video_path, threshold = 30):
    """Detect scenes in a video and return scene start and end frames."""
    scene_list = []
    while len(scene_list) < 6 and threshold > 0:
        threshold //= 2
    
        video = open_video(video_path)
        scene_manager = SceneManager()
        scene_manager.add_detector(ContentDetector(threshold=threshold))
    
        scene_manager.detect_scenes(video)
        scene_list = scene_manager.get_scene_list()
    
    return scene_list


def get_top_n_longest_scenes(scene_list, n):
    '''Return the top n longest scenes with start and end frame indices.'''
    scene_durations = [(start, end - start) for start, end in scene_list]
    scene_durations.sort(key=lambda x: x[1], reverse=True)

    # Top n longest scenes with start and end frame indices
    longest_scenes = [(start, start + duration) for start, duration in scene_durations[:n]]
    return longest_scenes


def sort_scenes_by_frame(scenes_list):
    '''Sort scenes by their start frame number.'''
    sorted_scenes = sorted(scenes_list, key=lambda scene: scene[0].get_frames())
    return sorted_scenes


def get_num_grids(video_path):
    '''Get number of grids to be created'''
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = total_frames / fps

    # Calculate number of grids based on the duration
    duration = round(duration, 2)
    if ((duration // 60) + 1) <= 5:
        return int(((duration // 60) + 1))
    else:
        return 5
        

def extract_k_frames_from_scene(video_path, scene, k):
    '''Extract k frames evenly spaced from each scene.'''
    # Extract frame numbers from scene start and end
    start_frame = scene[0].get_frames() + 1
    end_frame = scene[1].get_frames() - 1

    # Create k equally spaced frame indices within the scene's range
    frame_indices = np.linspace(start_frame, end_frame, k, dtype=int)
    
    cap = cv2.VideoCapture(video_path)
    frames = []

    # Extract frames from calculated indices
    for frame_no in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
        ret, frame = cap.read()
        if ret:
            frames.append(frame)
    
    cap.release()
    return frames


def create_image_grid(frames, grid_size=(1000, 1000)):
    '''Arrange 6 frames into a 3x2 grid and resize to the specified grid size.'''
    # Ensure all frames have the same size for concatenation
    frames = [cv2.resize(frame, (640, 360)) for frame in frames]  # Resize to a common size like 640x360
    rows = [np.concatenate(frames[i:i+2], axis=1) for i in range(0, 6, 2)]
    image_grid = np.concatenate(rows, axis=0)
    
    return np.array(Image.fromarray(image_grid).resize(grid_size))

In [None]:
def get_images(video_path, n=6):
    ''' 1. Detect scenes
        2. Get k; where k = num_grids
        3. Get the 6k longest scenes
        4. Sort scenes wrt frame numbers
        5. Extract 1 frame per 6k scene
        6. Create k image grids of 6 frames each
     '''
    scene_list = detect_scenes(video_path)
    k = get_num_grids(video_path)
    longest_scenes = get_top_n_longest_scenes(scene_list, n*k)
    scenes = sort_scenes_by_frame(longest_scenes)

    frames = []
    for scene in scenes:
        frames.extend(extract_k_frames_from_scene(video_path, scene, 1))

    grids = []
    for i in range(k):
        start_idx = i * n
        end_idx = start_idx + n
        grid_frames = frames[start_idx:end_idx]
        grid = create_image_grid(grid_frames, grid_size=(1000, 1000))
        grids.append(grid)

    return grids

In [None]:
ids = []
labels = []
responses = []
predicted_labels = [] 
languages = [] 
remaining = []
failed_json_logs = []  # To store tuples of (video_id, raw_response)

img_dir = '/kaggle/working/Images'
if not os.path.exists(img_dir):
    os.makedirs(img_dir)

for i in range(0, len(video_ids)): 
    
    if video_ids[i] in available_ad_ids:

        contents_of_ad = os.listdir('/kaggle/input/youtube-data/Ads/Ads/' + video_ids[i]) 
        contents_of_ad.remove('audio.mp3') 
        video_path = '/kaggle/input/youtube-data/Ads/Ads/' + video_ids[i] + '/' + contents_of_ad[0] 

        try: 
            print(i, video_path)

            # Extract multiple images representative of the video
            images = get_images(video_path) 

            # Save each image returned by extract_images_of_frames
            image_paths = []
            for idx, img in enumerate(images):
                # Convert NumPy array to PIL image
                image = Image.fromarray(img)
                
                # Save the image to a file 
                image_name = f"{video_ids[i]}_{idx + 1}.png"
                image_path = os.path.join(img_dir, image_name)
                image.save(image_path)
                image_paths.append(image_path)
                print(image_path)
                
            # Display the images
            fig, axes = plt.subplots(1, len(images), figsize=(10, 3))
            if len(images) == 1:
                axes.imshow(images[0])
                axes.axis('off')
            else:
                for ax, img in zip(axes, images):
                    ax.imshow(img)
                    ax.axis('off')

            plt.tight_layout()
            plt.show()

            audio_transcription = transcriptions[i]

            # Make inference
            print("Making inference for i", i)
            classification_response = classify_video_with_images(prompt, audio_transcription, image_paths)
            
            # Extract text from response
            raw_text = classification_response.content[0].text.strip()
            print("Raw response text:", repr(raw_text))

            # Try direct parsing first
            try:
                if raw_text.startswith("```json"):
                    raw_text = raw_text.replace("```json", "").replace("```", "").strip()
                parsed = json.loads(raw_text)
            except json.JSONDecodeError as e:
                print(f"Direct JSON parsing failed: {e}")
                print("Attempting regex-based extraction...")

                # Fallback: extract JSON block using regex
                json_match = re.search(r'\{[\s\S]*\}', raw_text)
                if json_match:
                    json_str = json_match.group(0)
                    try:
                        parsed = json.loads(json_str)
                        raw_text = json_str  # Save cleaned version
                    except json.JSONDecodeError as e2:
                        print(f"Regex-based JSON parsing error: {e2}")
                        print(f"Extracted JSON:\n{json_str}")
                        remaining.append(video_ids[i])
                        failed_json_logs.append((video_ids[i], raw_text))
                        continue
                else:
                    print("No JSON found in response.")
                    remaining.append(video_ids[i])
                    failed_json_logs.append((video_ids[i], raw_text))
                    continue

            pred_temp = parsed.get('label') or parsed.get('classification')
            lang_temp = parsed.get('language')

            temp_id = video_ids[i]
            temp_label = primary_labels[i]

            print(f"Id: {temp_id}. Primary Label: {temp_label}\nParsed Response: {parsed}")

            ids.append(temp_id) 
            labels.append(temp_label) 
            responses.append(raw_text) 
            predicted_labels.append(pred_temp) 
            languages.append(lang_temp)
            
        except: 
            print('failed for ', i) 
            remaining.append(video_ids[i])

        time.sleep(20)

In [None]:
# Save failed JSON logs to CSV
import pandas as pd

if failed_json_logs:
    df_failed = pd.DataFrame(failed_json_logs, columns=["video_id", "raw_response"])
    df_failed.to_csv("/kaggle/working/failed_json_responses.csv", index=False)
    print("Saved failed JSON responses to failed_json_responses.csv")

In [None]:
results_dir = '/kaggle/working/results'
if not os.path.exists(results_dir):
    os.makedirs(results_dir)

In [None]:
# At the end, print remaining videos 

remaining_df = pd.DataFrame({'Video Id': remaining})

# Save to CSV
remaining_df.to_csv('/kaggle/working/results/claude-ads-remaining-3.csv', index=False)

print("Remaining videos saved to remaining.csv")
print("Remaining videos with errors:", remaining) 

In [None]:
new_df = pd.DataFrame({
    'Video Id': ids,
    'Primary Label': labels,
    'Predicted Label': predicted_labels,
    'Response': responses, 
    'Languages': languages 
})

new_df.head() 

In [None]:
new_df.to_csv('/kaggle/working/results/claude-ads-3.csv', index=False)

In [None]:
# Changing to binary lists 

predictions = [1 if pred == 'inappropriate' else 0 for pred in predicted_labels] 
ground_truths = [1 if label == 'inappropriate' else 0 for label in labels] 

In [None]:
# Obtaining classification report 

report = classification_report(ground_truths, predictions) 
print(report) 

In [None]:
cm = confusion_matrix(ground_truths, predictions)

plt.figure(figsize=(6,4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Appropriate', 'Inapproriate'], yticklabels=['Appropriate', 'Inapproriate'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()