In [None]:
!pip install transformers pillow opencv-python ffmpeg-python openai-whisper pandas

Collecting ffmpeg-python
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl.metadata (1.7 kB)
Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->openai-whisper)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->openai-whisper)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12

# Import Libraries

In [None]:
# Import all the tools we'll use
import os
import re
import tempfile
import whisper
import ffmpeg
import cv2
import pandas as pd
from google.colab import drive
from PIL import Image
from transformers import pipeline

# Load Models

In [None]:
# Load the Whisper model for audio transcription
whisper_model = whisper.load_model("base")

# Load pre-trained models for text and image analysis
toxicity_model = pipeline("text-classification", model="unitary/toxic-bert")
nsfw_model = pipeline("image-classification", model="Falconsai/nsfw_image_detection")


100%|███████████████████████████████████████| 139M/139M [00:01<00:00, 85.3MiB/s]
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/811 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/174 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Device set to use cpu


config.json:   0%|          | 0.00/724 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/343M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/325 [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Device set to use cpu


# Define Unsafe Words, Profanity Sets and words you want to score as unsafe

In [None]:
# These are words we'll use to flag inappropriate content
UNSAFE_WORDS = {"kill", "gun", "nude", "fight", "blood", "death", "suicide", "drugs", "sex", "murder", "bikini", "naked", "nudity", "panty", "underwear"}
PROFANITY = {"shit", "fuck", "damn", "bitch", "asshole", "bastard", "crap", "dick", "piss", "hell"}


# Audio Extraction & Transcription

In [None]:
# Convert video to audio (.wav) using ffmpeg
def extract_audio(video_path):
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
        audio_path = tmp.name
    os.system(f"ffmpeg -y -i '{video_path}' -vn -acodec pcm_s16le -ar 16000 -ac 1 '{audio_path}'")
    return audio_path

# Transcribe the audio using Whisper
def transcribe_audio(audio_path):
    if not os.path.exists(audio_path):
        return ""
    try:
        audio = whisper.load_audio(audio_path)
        audio = whisper.pad_or_trim(audio)
        mel = whisper.log_mel_spectrogram(audio).to(whisper_model.device)
        result = whisper.decode(whisper_model, mel)
        return result.text.strip()
    except Exception as e:
        print(f"Whisper error: {e}")
        return ""


# Extract Video Frames

In [None]:
# Capture frames at regular intervals from a video
def extract_frames(video_path, interval_seconds=1):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    interval = int(fps * interval_seconds)
    frames = []

    frame_num = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_num % interval == 0:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(Image.fromarray(rgb))
        frame_num += 1

    cap.release()
    return frames


# Feature Extraction

In [None]:
# Check how toxic the text is
def get_toxicity_score(text):
    if not text:
        return 0.0
    result = toxicity_model(text[:512])[0]
    return result["score"] if "toxic" in result["label"].lower() else 0.0

# Check if any of the frames look NSFW
def get_nsfw_score(frames):
    if not frames:
        return 1.0
    max_score = 0.0
    for frame in frames:
        result = nsfw_model(frame)[0]
        if result["label"].lower() in {"nsfw", "porn", "sexy", "nude"}:
            max_score = max(max_score, result["score"])
    return max_score

# Count how many "bad" words appear in the text
def count_words(text, word_set):
    words = re.findall(r'\w+', text.lower())
    return sum(word in word_set for word in words)


# Run the Full Pipeline on One Video

In [None]:
# Run the full analysis for a video
def process_video(video_path, title="", description=""):
    print(f"Processing {video_path}...")

    audio_path = extract_audio(video_path)
    transcript = transcribe_audio(audio_path)
    os.remove(audio_path)  # Cleanup

    frames = extract_frames(video_path)

    # Build the feature dictionary
    features = {
        "video_path": video_path,
        "transcript": transcript,
        "toxicity_score": get_toxicity_score(transcript),
        "nsfw_score": get_nsfw_score(frames),
        "violence_score": count_words(transcript, {"kill", "gun", "fight", "blood", "murder"}),
        "profanity_count": count_words(transcript, PROFANITY),
        "unsafe_words_count": count_words(title + " " + description, UNSAFE_WORDS),
    }

    # Label as 0 (for safe) or 1 (for unsafe)
    features["safety_score"] = "safe" if (
        features["toxicity_score"] == 0 and
        features["nsfw_score"] ==0 and
        features["violence_score"] == 0 and
        features["profanity_count"] == 0
    ) else "unsafe"

    return features


#  Define Video Information, process videos, and extract features

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

# Load CSV containing multiple shorts info
def load_video_info_csv(csv_path):
    return pd.read_csv(csv_path)

# Process all videos and extract features
def process_all_videos(df):
    features_list = []

    for idx, row in df.iterrows():
        video_path = row["file"]
        title = row.get("title", "")
        description = row.get("description", "")

        try:
            features = process_video(video_path, title, description)
            features_list.append(features)
        except Exception as e:
            print(f"Error processing video {video_path}: {e}")

    return pd.DataFrame(features_list)

# Main execution
csv_path = "/content/drive/MyDrive/shorts/downloaded_shorts.csv"  # Update this if your path changes
video_info_df = load_video_info_csv(csv_path)
features_df = process_all_videos(video_info_df)




{'file': 'shorts/ggCMks51QSk.mp4', 'title': 'British Wife #shorts #funny', 'description': 'shortsvideo shortsfunny'}


# Add Features to into a DataFrame saved as a csv file

In [None]:
# Define path inside Google Drive
drive_path = "/content/drive/MyDrive/extracted_features_YTshorts.csv"

# function to save the features into the google drive folder
def save_features_to_csv(features, filename=drive_path):
    """Appends new video features to an existing CSV or creates a new one."""

    # If file exists, append; otherwise, create a new file
    if os.path.exists(filename):
        existing_df = pd.read_csv(filename)
        df = pd.concat([existing_df, new_data], ignore_index=True)
    else:
        df = new_data  # First entry

    # Save updated DataFrame back to CSV
    df.to_csv(filename, index=False)
    print(f"Features saved to {filename}")


# Save all features to CSV in Drive
save_features_to_csv(features_df, drive_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Features saved to /content/drive/MyDrive/shorts_safety.csv


# Train the Random Forest Classifier

In [None]:
#train a Random Forest model on labeled Shorts data
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load dataset
df = pd.read_csv(drive_path)

# Define input features and target label
X = df[["toxicity_score", "nsfw_score", "profanity_count", "violence_score", "unsafe_words_count"]]
y = df["safety_score"].map({"safe": 0, "unsafe": 1})  # Convert to binary labels

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train Random Forest model
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# Evaluate the model
y_pred = rf_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.2f}")


ValueError: With n_samples=1, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

# Use the Classifier for New Videos

In [None]:
#apply the trained model to classify new Shorts.
def classify_video(features):
    """Use trained model to classify video as 'safe' or 'unsafe'."""
    X_new = pd.DataFrame([features])  # Convert to DataFrame
    prediction = rf_classifier.predict(X_new)[0]  # Predict
    return "safe" if prediction == 0 else "unsafe"
'''
# Example usage
features = {
    "toxicity_score": 0.3,
    "nsfw_score": 0.2,
    "profanity_count": 1,
    "violence_score": 0,
    "unsafe_words_count": 2,
}
'''
classification_result = classify_video(video_features)
print("Video classification:", classification_result)


# Final Results

In [None]:
print("\n Video Processing Complete!")
print("\nSafety Classification:")
print(df_features["safe or not?"])
