<a href="https://colab.research.google.com/github/asmis11/video-content-moderation/blob/main/Phases.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import files
uploaded = files.upload()

Saving tikharm_dataset.zip to tikharm_dataset.zip


In [3]:
import zipfile
import os

base_path = "/content/tikharm_dataset.zip"

with zipfile.ZipFile(base_path, 'r') as zip_ref:
    zip_ref.extractall("/content/tikharm_dataset")

extracted_dir = "/content/tikharm_dataset"
print(os.listdir(extracted_dir))

['tikharm_dataset']


In [4]:
!pip install opencv-python-headless --quiet

import os
import cv2
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

## **Phase 1** – Frame-Based Classification with MobileNetV2

In [5]:
def extract_frames_from_video(video_path, output_dir, fps=1):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    count, saved = 0, 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        if frame_rate > 0 and count % (frame_rate // fps) == 0:
            cv2.imwrite(os.path.join(output_dir, f"{saved}.jpg"), frame)
            saved += 1
        count += 1
    cap.release()

In [6]:
base_path = "/content/tikharm_dataset/tikharm_dataset/train"
labels_map = {"safe": 0, "adult": 1, "harmful": 2, "suicide": 3}
frame_dataset_dir = "phase1_frames"

X, y = [], []

for label_name, label in labels_map.items():
    video_paths = glob(os.path.join(base_path, label_name, "*.mp4"))
    print(f"Found {len(video_paths)} videos for label: {label_name}") # Print the number of video paths found
    for video_path in tqdm(video_paths[:5]):
        video_id = os.path.splitext(os.path.basename(video_path))[0]
        out_dir = os.path.join(frame_dataset_dir, video_id)
        extract_frames_from_video(video_path, out_dir, fps=1)
        print(f"Extracted frames to: {out_dir}")  # Print the output directory

        for fname in os.listdir(out_dir):
            img = cv2.imread(os.path.join(out_dir, fname))
            if img is not None:
                img = cv2.resize(img, (224, 224))
                img = preprocess_input(img_to_array(img))
                X.append(img)
                y.append(label)

X = np.array(X)
y = np.array(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

Found 11 videos for label: safe


  0%|          | 0/5 [00:00<?, ?it/s]

Extracted frames to: phase1_frames/anvattuoinho_7116224760626892075


 20%|██        | 1/5 [00:21<01:24, 21.23s/it]

Extracted frames to: phase1_frames/alinaways_7175873829930061062


 40%|████      | 2/5 [00:24<00:31, 10.62s/it]

Extracted frames to: phase1_frames/anhtun.nta_7246601851410337029


 60%|██████    | 3/5 [00:32<00:18,  9.30s/it]

Extracted frames to: phase1_frames/anhnongdancartoon_7360268444362673415


 80%|████████  | 4/5 [00:43<00:09,  9.93s/it]

Extracted frames to: phase1_frames/vid1


100%|██████████| 5/5 [00:57<00:00, 11.48s/it]


Found 10 videos for label: adult


 20%|██        | 1/5 [00:01<00:07,  1.90s/it]

Extracted frames to: phase1_frames/aldrift455_7199249280387927322


 40%|████      | 2/5 [00:03<00:05,  1.71s/it]

Extracted frames to: phase1_frames/adulto18_7273549708612635910


 60%|██████    | 3/5 [00:04<00:02,  1.47s/it]

Extracted frames to: phase1_frames/badboyschicago_7308839755289234734


 80%|████████  | 4/5 [00:05<00:01,  1.38s/it]

Extracted frames to: phase1_frames/andriaanastasiou_7081329958160715010


100%|██████████| 5/5 [00:08<00:00,  1.60s/it]


Extracted frames to: phase1_frames/am__.wt_7360945387911335176
Found 10 videos for label: harmful


 20%|██        | 1/5 [00:00<00:02,  1.97it/s]

Extracted frames to: phase1_frames/akytube231206_6948939267137670402


 40%|████      | 2/5 [00:01<00:02,  1.10it/s]

Extracted frames to: phase1_frames/0huckleberry_6923105031466061061
Extracted frames to: phase1_frames/_toscanelucas_7229361043674369307


 60%|██████    | 3/5 [00:09<00:07,  3.92s/it]

Extracted frames to: phase1_frames/alexei_pergande_7266857966836387115


100%|██████████| 5/5 [00:21<00:00,  4.24s/it]


Extracted frames to: phase1_frames/_itznate_7361499791978908934
Found 4 videos for label: suicide


 25%|██▌       | 1/4 [00:01<00:03,  1.11s/it]

Extracted frames to: phase1_frames/_nctry_7115945153335954715


 50%|█████     | 2/4 [00:01<00:01,  1.25it/s]

Extracted frames to: phase1_frames/_.tokio._.hotel.__7345207721169390849


 75%|███████▌  | 3/4 [00:02<00:00,  1.06it/s]

Extracted frames to: phase1_frames/_im.all.alone__6865122133383597318


100%|██████████| 4/4 [00:04<00:00,  1.01s/it]

Extracted frames to: phase1_frames/_im.all.alone__6869027407265828101





In [None]:
base = MobileNetV2(include_top=False, input_shape=(224, 224, 3), weights="imagenet")
x = GlobalAveragePooling2D()(base.output)
x = Dense(128, activation='relu')(x)
output = Dense(4, activation='softmax')(x)

model = Model(inputs=base.input, outputs=output)
model.compile(optimizer=Adam(1e-4), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=3, batch_size=8)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Epoch 1/3
[1m56/56[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m161s[0m 2s/step - accuracy: 0.6386 - loss: 0.8913 - val_accuracy: 0.9196 - val_loss: 0.2360
Epoch 2/3
[1m42/56[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m21s[0m 2s/step - accuracy: 0.9544 - loss: 0.1512

In [None]:
# Use any video from the dataset
test_video = glob(os.path.join(base_path, "harmful", "*.mp4"))[0]
test_out = "test_video_frames"
extract_frames_from_video(test_video, test_out, fps=1)

predictions = []
for fname in os.listdir(test_out):
    path = os.path.join(test_out, fname)
    img = cv2.imread(path)
    if img is not None:
        img = cv2.resize(img, (224, 224))
        img = preprocess_input(img_to_array(img))
        pred = model.predict(np.expand_dims(img, axis=0))[0]
        predictions.append(pred)

avg_softmax = np.mean(predictions, axis=0)
confidence = round(float(np.max(avg_softmax)), 2)
label_index = int(np.argmax(avg_softmax))
reverse_map = {v: k for k, v in labels_map.items()}
predicted_label = reverse_map[label_index]

print("🎯 Prediction:", predicted_label)
print("✅ Confidence:", confidence)

In [None]:
row = {
    "video_id": os.path.basename(test_video),
    "duration": 1.5,
    "predicted_label": predicted_label,
    "confidence": confidence,
    "notes": "MobileNetV2 frame-based classification"
}

df = pd.DataFrame([row])
csv_path = "phase1_predictions.csv"
if os.path.exists(csv_path):
    df_old = pd.read_csv(csv_path)
    df = pd.concat([df_old, df])
df.to_csv(csv_path, index=False)
print("📁 Saved to:", csv_path)

# **Phase 2** Metadata & Notes-Based Classification with Classical ML


In [None]:
import pandas as pd
import os
import random

base_path = "/content/tikharm_dataset/tikharm_dataset/train"

rows = []
for label in os.listdir(base_path):
    label_path = os.path.join(base_path, label)
    if os.path.isdir(label_path):
        for fname in os.listdir(label_path):
            if fname.endswith(".mp4"):
                rows.append({
                    "video_id": fname,
                    "category": label,
                    "duration": round(random.uniform(3, 60), 2),
                    "notes": f"This is a synthetic {label} video."
                })

meta_df = pd.DataFrame(rows)
meta_df.to_csv("phase2_metadata.csv", index=False)
meta_df.head()


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
import numpy as np

df = pd.read_csv("phase2_metadata.csv")
label_map = {"safe": 0, "adult": 1, "harmful": 2, "suicide": 3}
df["label"] = df["category"].map(label_map)

# One-hot encode the category
encoder = OneHotEncoder()
cat_encoded = encoder.fit_transform(df[["category"]]).toarray()

# TF-IDF vectorize the notes
vectorizer = TfidfVectorizer()
notes_encoded = vectorizer.fit_transform(df["notes"]).toarray()

# Combine all features
X = np.hstack([df[["duration"]].values, cat_encoded, notes_encoded])
y = df["label"].values

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier()
model.fit(X_train, y_train)

In [None]:
# Simulate new metadata row
new_data = pd.DataFrame([{
    "video_id": "test_001.mp4",
    "category": "harmful",
    "duration": 42.5,
    "notes": "This video may contain aggressive behavior and loud sounds."
}])
new_data["label"] = new_data["category"].map(label_map)

cat_new = encoder.transform(new_data[["category"]]).toarray()
notes_new = vectorizer.transform(new_data["notes"]).toarray()
X_new = np.hstack([new_data[["duration"]].values, cat_new, notes_new])

# Predict
pred_probs = model.predict_proba(X_new)[0]
confidence = round(float(np.max(pred_probs)), 2)
pred_label_idx = int(np.argmax(pred_probs))
pred_label = {v: k for k, v in label_map.items()}[pred_label_idx]

print("Predicted label:", pred_label)
print("Confidence:", confidence)

# **Phase 3** Modal-Level Fusion: Visual & Text (BERT)


In [None]:
# Use previously trained MobileNetV2 model from Phase 1
# Reuse extracted frames

predictions = []
frame_dir = "/content/phase1_frames"
for fname in os.listdir(frame_dir):
    img = cv2.imread(os.path.join(frame_dir, fname))
    if img is not None:
        img = cv2.resize(img, (224, 224))
        img = preprocess_input(img_to_array(img))
        pred = model.predict(np.expand_dims(img, axis=0))[0]
        predictions.append(pred)

avg_softmax = np.mean(predictions, axis=0)
confidence = round(np.max(avg_softmax), 2)
label = reverse_map[np.argmax(avg_softmax)]

In [None]:
!pip install transformers --quiet

In [None]:
from transformers import BertTokenizer, BertModel
import torch
from sklearn.linear_model import LogisticRegression

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert = BertModel.from_pretrained("bert-base-uncased")

def get_bert_embedding(text):
    tokens = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        outputs = bert(**tokens)
    return outputs.pooler_output[0].numpy()

# Sample train
X_text = [get_bert_embedding("this is a safe video"),
          get_bert_embedding("violence, fighting and abuse")]
y_text = [0, 1]

text_model = LogisticRegression()
text_model.fit(X_text, y_text)

# Predict on a new note
note = "people screaming and hitting each other"
embedding = get_bert_embedding(note).reshape(1, -1)
pred = text_model.predict_proba(embedding)[0]
confidence = round(np.max(pred), 2)
label = "unsafe" if pred[1] > 0.5 else "safe"

In [None]:
def save_to_csv(file, vid, label, confidence, notes):
    df = pd.DataFrame([{
        "video_id": vid,
        "predicted_label": label,
        "confidence": confidence,
        "notes": notes
    }])
    if os.path.exists(file):
        df_existing = pd.read_csv(file)
        df = pd.concat([df_existing, df])
    df.to_csv(file, index=False)

save_to_csv("phase3_visual.csv", "vid_visual", label, confidence, "Object thrown, chaos")
save_to_csv("phase3_text.csv", "vid_text", label, confidence, note)


# **Phase 4** Sequence-Based Classification (CNN + LSTM)


In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam

In [None]:
def extract_frame_sequence(video_path, sequence_length=10):
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25
    frames = []
    count = 0
    while cap.isOpened() and len(frames) < sequence_length:
        ret, frame = cap.read()
        if not ret:
            break
        if count % (fps // 1) == 0:
            img = cv2.resize(frame, (224, 224))
            img = preprocess_input(img_to_array(img))
            frames.append(img)
        count += 1
    cap.release()
    return np.array(frames) if len(frames) == sequence_length else None

In [None]:
labels_map = {"safe": 0, "adult": 1, "harmful": 2, "suicide": 3}
base_path = "/content/tikharm_dataset/tikharm_dataset/train"  # update as needed

X_seqs, y_seqs = [], []

for label_name, label in labels_map.items():
    video_paths = os.listdir(os.path.join(base_path, label_name))
    for fname in tqdm(video_paths[:5]):  # limit to 5 per class for demo
        full_path = os.path.join(base_path, label_name, fname)
        frames = extract_frame_sequence(full_path, sequence_length=10)
        if frames is not None:
            X_seqs.append(frames)
            y_seqs.append(label)

X_seqs = np.array(X_seqs)
y_seqs = np.array(y_seqs)


In [None]:
cnn = MobileNetV2(include_top=False, input_shape=(224, 224, 3), weights="imagenet")
cnn.trainable = False

feature_model = Sequential([
    cnn,
    GlobalAveragePooling2D()
])

In [None]:
X_feat = []
for video in X_seqs:
    seq_feats = []
    for frame in video:
        feat = feature_model.predict(np.expand_dims(frame, axis=0))[0]
        seq_feats.append(feat)
    X_feat.append(seq_feats)

X_feat = np.array(X_feat)

In [None]:
model = Sequential([
    LSTM(64, input_shape=(10, X_feat.shape[2])),
    Dense(4, activation='softmax')
])
model.compile(optimizer=Adam(1e-4), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

X_train, X_test, y_train, y_test = train_test_split(X_feat, y_seqs, test_size=0.2)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=2)

In [None]:
test_video = "/content/tikharm_dataset/tikharm_dataset/train/harmful/_paris_tisdale_7146660080317074734.mp4"
test_seq = extract_frame_sequence(test_video)

seq_feat = []
for frame in test_seq:
    feat = feature_model.predict(np.expand_dims(frame, axis=0))[0]
    seq_feat.append(feat)

X_new = np.expand_dims(seq_feat, axis=0)
pred = model.predict(X_new)[0]
confidence = round(float(np.max(pred)), 2)
label_index = int(np.argmax(pred))
reverse_map = {v: k for k, v in labels_map.items()}
predicted_label = reverse_map[label_index]

print("Predicted:", predicted_label)
print("Confidence:", confidence)

# **Phase 5** Spatio-Temporal Modeling using 3D CNN


In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

In [None]:
def extract_video_cube(video_path, num_frames=16, size=(112, 112)):
    cap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step = max(total_frames // num_frames, 1)

    for i in range(num_frames):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, size)
        frames.append(frame)

    cap.release()

    if len(frames) < num_frames:
        return None
    cube = np.stack(frames, axis=0)  # shape: (16, H, W, C)
    return cube

In [None]:
base_path = "/content/tikharm_dataset/tikharm_dataset/train"
labels_map = {"safe": 0, "adult": 1, "harmful": 2, "suicide": 3}

X_3d, y_3d = [], []

for label_name, label in labels_map.items():
    for fname in tqdm(os.listdir(os.path.join(base_path, label_name))[:5]):
        path = os.path.join(base_path, label_name, fname)
        cube = extract_video_cube(path)
        if cube is not None:
            X_3d.append(cube)
            y_3d.append(label)

X_3d = np.array(X_3d)
y_3d = np.array(y_3d)
X_3d = X_3d / 255.0  # normalize pixel values

In [None]:
model = Sequential([
    Conv3D(32, (3, 3, 3), activation='relu', input_shape=(16, 112, 112, 3)),
    MaxPooling3D((2, 2, 2)),
    Conv3D(64, (3, 3, 3), activation='relu'),
    MaxPooling3D((2, 2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(4, activation='softmax')
])

model.compile(optimizer=Adam(1e-4), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

X_train, X_test, y_train, y_test = train_test_split(X_3d, y_3d, test_size=0.2)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=2)

In [None]:
test_path = "/content/tikharm_dataset/train/harmful/harmful_001.mp4"
cube = extract_video_cube(test_path)

if cube is not None:
    pred = model.predict(np.expand_dims(cube / 255.0, axis=0))[0]
    confidence = round(float(np.max(pred)), 2)
    label_index = np.argmax(pred)
    reverse_map = {v: k for k, v in labels_map.items()}
    predicted_label = reverse_map[label_index]

    print("Predicted:", predicted_label)
    print("Confidence:", confidence)
else:
    print("❌ Not enough frames in test video")

# **Phase 6 **– Multimodal Fusion (Audio + Visual + Text)

In [None]:
# Mock predictions (normally read from phase3_*.csv)
audio_pred = {"label": "unsafe", "confidence": 0.81}
visual_pred = {"label": "unsafe", "confidence": 0.72}
text_pred = {"label": "safe",   "confidence": 0.69}

In [None]:
from sklearn.linear_model import LogisticRegression
import numpy as np

# Mock predictions (normally read from phase3_*.csv)
audio_pred = {"label": "unsafe", "confidence": 0.81}
visual_pred = {"label": "unsafe", "confidence": 0.72}
text_pred = {"label": "safe", "confidence": 0.69}

# Safe = 0, Anything else = 1
# Include samples for both safe and unsafe to have 2 classes
X_fusion = np.array([
    [audio_pred["confidence"], visual_pred["confidence"], text_pred["confidence"]],  # unsafe example
    [0.2, 0.3, 0.8]  # Example of a safe prediction - adjust values as needed
])
y_fusion = np.array([1, 0])  # 1 = unsafe, 0 = safe

# Train fusion model
fusion_model = LogisticRegression()
fusion_model.fit(X_fusion, y_fusion)

In [None]:
# Simulate new inputs
new_audio = 0.76
new_visual = 0.84
new_text = 0.70

X_new = np.array([[new_audio, new_visual, new_text]])
pred = fusion_model.predict_proba(X_new)[0]

confidence = round(float(np.max(pred)), 2)
final_label = "unsafe" if pred[1] > 0.5 else "safe"

print("🔁 Final Fusion Label:", final_label)
print("🎯 Confidence:", confidence)

# **Phase 7** – Vision Transformer (ViT)

In [None]:
!pip install transformers --quiet

In [None]:
from transformers import ViTFeatureExtractor, ViTForImageClassification
from PIL import Image
import torch
import numpy as np
import os
import pandas as pd

In [None]:
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224")
model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224")

In [None]:
import cv2

def extract_test_frames(video_path, out_dir, fps=1):
    os.makedirs(out_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    count, saved = 0, 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        if frame_rate > 0 and count % (frame_rate // fps) == 0:
            fname = os.path.join(out_dir, f"{saved}.jpg")
            cv2.imwrite(fname, frame)
            saved += 1
        count += 1
    cap.release()
    return out_dir


In [None]:
video_path = "/content/tikharm_dataset/tikharm_dataset/train/harmful/harmful_001.mp4"
frame_dir = "phase7_frames"
extract_test_frames(video_path, frame_dir, fps=1)

In [None]:
frame_preds = []

for fname in sorted(os.listdir(frame_dir)):
    img_path = os.path.join(frame_dir, fname)
    image = Image.open(img_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")

    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        probs = torch.softmax(logits, dim=1)[0].numpy()
        frame_preds.append(probs)

avg_probs = np.mean(frame_preds, axis=0)
confidence = round(float(np.max(avg_probs)), 2)
label_index = int(np.argmax(avg_probs))
predicted_label = "unsafe" if label_index % 2 == 1 else "safe"  # Simulate binary safety

print("🔮 ViT Predicted:", predicted_label)
print("🎯 Confidence:", confidence)