In [None]:
import torch

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA device count:", torch.cuda.device_count())
print("Current CUDA device:", torch.cuda.current_device() if torch.cuda.is_available() else "No CUDA device")
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

PyTorch version: 2.5.1+cu124
CUDA available: True
CUDA device count: 1
Current CUDA device: 0
GPU name: NVIDIA A100-SXM4-40GB


In [None]:
# !pip install -U bitsandbytes
# !pip install -U accelerate transformers
# !pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [None]:
# import zipfile
# import os

# zip_path = '/content/Compliance_model.zip'
# extract_path = ""

# with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#     zip_ref.extractall(extract_path)

# print("Extraction complete!")

###LLava 13 b model.

In [None]:
hf_token ='huggin_face_token'
#before preparing the data as per model cofiguratios we will read document and take note of how the data preparations should be done

In [None]:
import torch
from transformers import AutoProcessor, LlavaForConditionalGeneration, BitsAndBytesConfig

# Check CUDA availability
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Device name: {torch.cuda.get_device_name(0)}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Model name
model_name = "llava-hf/llava-1.5-13b-hf"

# Load processor
processor = AutoProcessor.from_pretrained(model_name, token='hf_token')

# Load model and move to GPU
try:

    model = LlavaForConditionalGeneration.from_pretrained(
        model_name,
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
        llm_int8_enable_fp32_cpu_offload=True,
        token="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",
        device_map="auto",  # Automatically place on GPU
        torch_dtype=torch.float16,
        #torch_compile=False,
    )

    # Enable gradient checkpointing for memory efficiency
    model.gradient_checkpointing_enable()

    print("\n✅ Model and processor loaded successfully!")
    print(f"Model loaded on: {device}")

except Exception as e:
    print(f"\n❌ Error loading model: {e}")
    model = None
    processor = None

# Ensure processor is also set to use GPU
if processor is not None:
    processor.device = device

CUDA available: True
Device name: NVIDIA A100-SXM4-40GB


processor_config.json:   0%|          | 0.00/173 [00:00<?, ?B/s]

chat_template.json:   0%|          | 0.00/701 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/505 [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.48, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


tokenizer_config.json:   0%|          | 0.00/1.45k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/3.62M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/41.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/552 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.10k [00:00<?, ?B/s]

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


model.safetensors.index.json:   0%|          | 0.00/77.2k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/6 [00:00<?, ?it/s]

model-00001-of-00006.safetensors:   0%|          | 0.00/4.96G [00:00<?, ?B/s]

model-00002-of-00006.safetensors:   0%|          | 0.00/4.97G [00:00<?, ?B/s]

model-00003-of-00006.safetensors:   0%|          | 0.00/4.88G [00:00<?, ?B/s]

model-00004-of-00006.safetensors:   0%|          | 0.00/4.93G [00:00<?, ?B/s]

model-00005-of-00006.safetensors:   0%|          | 0.00/4.93G [00:00<?, ?B/s]

model-00006-of-00006.safetensors:   0%|          | 0.00/2.02G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/141 [00:00<?, ?B/s]


✅ Model and processor loaded successfully!
Model loaded on: cuda


In [None]:
# print("\n🔍 **Model Expected Input Signature** 🔍")
# print(model.forward.__doc__)

In [None]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms

class LLaVADataset(Dataset):
    def __init__(self, image_folder, processor, max_length=2048, image_size=(336, 336), patch_size=14):
        self.image_folder = image_folder
        self.processor = processor
        self.max_length = max_length
        self.image_size = image_size
        self.patch_size = patch_size
        self.image_token_id = processor.tokenizer.convert_tokens_to_ids("<image>")

        # Gather .jpg files
        self.image_files = [f for f in os.listdir(image_folder) if f.endswith(".jpg")]

        if not self.image_files:
            raise ValueError(f"No images found in {image_folder}. Check dataset path!")

        self.transform = transforms.Compose([
            transforms.Resize(self.image_size),
            transforms.ToTensor(),
        ])

        self.patch_count_h = self.image_size[0] // self.patch_size
        self.patch_count_w = self.image_size[1] // self.patch_size
        self.num_patches = self.patch_count_h * self.patch_count_w

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_file = self.image_files[idx]
        image_path = os.path.join(self.image_folder, image_file)

        if not os.path.exists(image_path):
            print(f"⚠️ Missing image: {image_path}")
            return None  # Return None to be filtered later

        try:
            caption_text = os.path.splitext(image_file)[0].replace("_", " ")
            text_prompt = f"Here is an image: {caption_text}\n"
            image = Image.open(image_path).convert("RGB")
            image_tensor = self.transform(image)

            text_inputs = self.processor.tokenizer(
                text_prompt,
                padding="max_length",
                truncation=True,
                max_length=max(0, self.max_length - self.num_patches),
                return_tensors="pt"
            )

            input_ids = text_inputs["input_ids"].squeeze(0).to(torch.int64)
            attention_mask = text_inputs["attention_mask"].squeeze(0).to(torch.int64)

            # Append 576 <image> tokens
            image_tokens = torch.tensor([self.image_token_id] * self.num_patches, dtype=torch.int64)
            input_ids = torch.cat([input_ids, image_tokens])
            image_attn = torch.ones_like(image_tokens, dtype=torch.int64)
            attention_mask = torch.cat([attention_mask, image_attn])

            image_grid_thw = torch.tensor([1, self.patch_count_h, self.patch_count_w], dtype=torch.int64)

            labels = input_ids.clone()
            pad_id = self.processor.tokenizer.pad_token_id
            if pad_id is not None:
                labels[labels == pad_id] = -100

            return {
                "input_ids": input_ids,
                "attention_mask": attention_mask,
                "pixel_values": image_tensor.to(torch.float32),
                "image_grid_thw": image_grid_thw,
                "labels": labels,
            }

        except Exception as e:
            print(f"⚠️ Error processing {image_file}: {e}")
            return None  # Return None to be filtered


In [None]:
from torch.nn.utils.rnn import pad_sequence
import torch

def collate_fn(batch):
    batch = [b for b in batch if b is not None]

    if not batch:
        raise ValueError("Empty batch after filtering - check dataset or tokenization errors")

    input_ids = [item["input_ids"].squeeze(0) for item in batch]
    attention_mask = [item["attention_mask"].squeeze(0) for item in batch]
    pixel_values = [item["pixel_values"] for item in batch]
    image_grid_thw = [item["image_grid_thw"] for item in batch]
    labels = [item["labels"].squeeze(0) for item in batch]

    padded_input_ids = pad_sequence(input_ids, batch_first=True, padding_value=0)
    padded_attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    padded_labels = pad_sequence(labels, batch_first=True, padding_value=-100)

    try:
        pixel_values = torch.stack(pixel_values)
        image_grid_thw = torch.stack(image_grid_thw)
    except RuntimeError as e:
        raise ValueError(f"Image tensor shape mismatch: {[p.shape for p in pixel_values]}") from e

    return {
        "input_ids": padded_input_ids,
        "attention_mask": padded_attention_mask,
        "pixel_values": pixel_values,
        "image_grid_thw": image_grid_thw,
        "labels": padded_labels
    }


In [None]:
# "image_grid_thw": image_grid_thw,
# image_grid_thw = torch.stack(image_grid_thw)
# image_grid_thw = [item["image_grid_thw"] for item in batch]

In [None]:
import torch
from torch.utils.data import DataLoader, random_split
from transformers import AutoProcessor

# Define dataset path
image_folder = "/content/Compliance_model_data"
batch_size = 4
max_length = 512
image_size = (336, 336)

# Initialize full dataset
full_dataset = LLaVADataset(image_folder, processor, max_length=max_length, image_size=image_size)

# **Split into Train and Validation Sets (80% Train, 20% Validation)**
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, eval_dataset = random_split(full_dataset, [train_size, val_size])

# Initialize DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, pin_memory=False)
eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, pin_memory=False)

# Verify batch structure (Debugging)
for step, batch in enumerate(train_loader):
    if batch is None:
        print(f"⚠️ Skipping empty batch at step {step}")
        continue

    print(f"\n🔹 Training Step {step}/{len(train_loader)}")
    for key, value in batch.items():
        print(f"   {key}: {value.shape}")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
   image_grid_thw: torch.Size([4, 3])
   labels: torch.Size([4, 599])

🔹 Training Step 746/1460
   input_ids: torch.Size([4, 599])
   attention_mask: torch.Size([4, 599])
   pixel_values: torch.Size([4, 3, 336, 336])
   image_grid_thw: torch.Size([4, 3])
   labels: torch.Size([4, 599])

🔹 Training Step 747/1460
   input_ids: torch.Size([4, 597])
   attention_mask: torch.Size([4, 597])
   pixel_values: torch.Size([4, 3, 336, 336])
   image_grid_thw: torch.Size([4, 3])
   labels: torch.Size([4, 597])

🔹 Training Step 748/1460
   input_ids: torch.Size([4, 597])
   attention_mask: torch.Size([4, 597])
   pixel_values: torch.Size([4, 3, 336, 336])
   image_grid_thw: torch.Size([4, 3])
   labels: torch.Size([4, 597])

🔹 Training Step 749/1460
   input_ids: torch.Size([4, 598])
   attention_mask: torch.Size([4, 598])
   pixel_values: torch.Size([4, 3, 336, 336])
   image_grid_thw: torch.Size([4, 3])
   labels: torch.Size([4, 598

In [None]:
# Get a sample from the dataset
sample = train_dataset[99]

# Print available keys in the dataset sample
print("🔍 **Dataset Sample Keys**:", sample.keys())

# Print the shape and type of each key
for key, value in sample.items():
    print(f"🔹 {key}: {type(value)}, Shape: {value.shape if isinstance(value, torch.Tensor) else 'N/A'}")

🔍 **Dataset Sample Keys**: dict_keys(['input_ids', 'attention_mask', 'pixel_values', 'image_grid_thw', 'labels'])
🔹 input_ids: <class 'torch.Tensor'>, Shape: torch.Size([595])
🔹 attention_mask: <class 'torch.Tensor'>, Shape: torch.Size([595])
🔹 pixel_values: <class 'torch.Tensor'>, Shape: torch.Size([3, 336, 336])
🔹 image_grid_thw: <class 'torch.Tensor'>, Shape: torch.Size([3])
🔹 labels: <class 'torch.Tensor'>, Shape: torch.Size([595])


In [None]:
from transformers import AutoProcessor, LlavaForConditionalGeneration, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model


bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)

# Enable memory optimization
model.gradient_checkpointing_enable()

lora_config = LoraConfig(
    r=4, #previously its 8
    lora_alpha=16,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

model = get_peft_model(model, lora_config)
model.enable_input_require_grads()
model.config.use_cache = False  # Important for gradient checkpointing

print(f"\n✅ Model and processor loaded successfully on GPU with LoRA!")



✅ Model and processor loaded successfully on GPU with LoRA!


In [None]:
# for name, param in model.named_parameters():
#     if param.requires_grad:
#         print(name)

# for name, param in model.named_parameters():
#     print(f"{name}: requires_grad = {param.requires_grad}")

#free cuda before training.

In [None]:
import gc
import torch

torch.cuda.empty_cache()
gc.collect()

80

#Original training code.

In [None]:
import torch
from transformers import (
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)

# 1) Define TrainingArguments with Early Stopping
training_args = TrainingArguments(
    output_dir="/content/Fine_tuned_llava_model",
    per_device_train_batch_size=1,  # Adjust based on available GPU memory
    gradient_accumulation_steps=8,
    max_grad_norm=1.0,
    gradient_checkpointing=True,
    logging_strategy="steps",
    max_steps=10,
    logging_steps=10,  # Logs every 10 steps
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
    save_strategy="epoch",
    num_train_epochs=3,
    learning_rate=2e-5,
    weight_decay=0.01,
    fp16=True,  # Enables mixed precision for speed
    bf16=False,
    push_to_hub=False,
    logging_dir="/content/logs",
    remove_unused_columns=False,
    label_names=["labels"],
    dataloader_num_workers=0,
    save_total_limit=1,  # Keeps only the last 2 checkpoints
    load_best_model_at_end=True,  # Loads best model after training
    metric_for_best_model="loss"
)

# 2) Create a Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.tokenizer,
    data_collator=collate_fn,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

# 3) Train model
trainer.train()

# 4) Save all components for reloading
save_path = "/content/Fine_tuned_llava_model"
trainer.save_model(save_path)
processor.tokenizer.save_pretrained(save_path)
processor.save_pretrained(save_path)
torch.save(training_args, f"{save_path}/training_args.bin")
torch.save(trainer.state.optimizer.state_dict(), f"{save_path}/optimizer_state.pt")

print(f"Model and necessary components saved at {save_path}")

  trainer = Trainer(


Epoch,Training Loss,Validation Loss
0,76.8654,9.517768


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Model and necessary components saved at /content/Fine_tuned_llava_model


In [None]:
import torch
from PIL import Image
from transformers import LlavaForConditionalGeneration, AutoProcessor

# Define the path where the fine-tuned model is saved
save_path = "/content/Fine_tuned_llava_model"

# Load the fine-tuned LLaVA-13B model
model_reloaded = LlavaForConditionalGeneration.from_pretrained(save_path)
processor_reloaded = AutoProcessor.from_pretrained(save_path)
tokenizer_reloaded = processor_reloaded.tokenizer  # Tokenizer is part of the processor

print("Fine-tuned LLaVA-13B model reloaded successfully!")

# ---- Load the Image ----
image_path = "/content/violant_image.jpg"  # Change this to your actual image file path
image = Image.open(image_path).convert("RGB")  # Convert to RGB format
print("Image:",image)


# ---- Process the Image ----
inputs = processor_reloaded(image, return_tensors="pt")

# Move to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
model_reloaded.to(device)
inputs = {key: val.to(device) for key, val in inputs.items()}

# ---- Generate Description ----
with torch.no_grad():
    generated_ids = model_reloaded.generate(**inputs, max_length=50)

# Decode the generated output
generated_text = tokenizer_reloaded.decode(generated_ids[0], skip_special_tokens=True)

print("Generated Description:", generated_text)

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

In [None]:
import torch

def extract_required_image_tokens(model, dataset, device="cuda"):
    """Extract image features from the model to determine required `<image>` tokens."""

    # Get a single sample from the dataset
    sample = dataset[0]
    pixel_values = sample["pixel_values"].unsqueeze(0).to(device)  # Add batch dimension

    # Ensure no text input is given
    with torch.no_grad():
        image_features = model.vision_tower(pixel_values)

    # Handle the output structure of vision_tower
    if isinstance(image_features, torch.Tensor):
        num_image_features = image_features.shape[1] * image_features.shape[2]
    elif hasattr(image_features, "last_hidden_state"):
        num_image_features = image_features.last_hidden_state.shape[1]
    else:
        raise ValueError("❌ Could not extract valid image features. Debug model output.")

    print(f"\n🔍 Extracted Required Image Tokens: {num_image_features}")
    return num_image_features

# Run feature extraction
num_required_image_tokens = extract_required_image_tokens(model, dataset)
print(num_required_image_tokens)

It is Returning

🔍 Extracted Required Image Tokens: 577
577


In [None]:
import torch
from transformers import AutoProcessor, LlavaForConditionalGeneration, BitsAndBytesConfig
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from PIL import Image
import os
import torchvision.transforms as transforms
import logging
from tqdm import tqdm

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LLaVADataset(Dataset):
    def __init__(self, image_folder, processor, max_length=512, image_size=(336, 336)):
        self.image_folder = image_folder
        self.processor = processor
        self.max_length = max_length
        self.image_size = image_size
        self.image_token = processor.tokenizer.convert_tokens_to_ids("<image>")

        # Validate and load image files
        self.image_files = [f for f in os.listdir(image_folder)
                          if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        if not self.image_files:
            raise ValueError(f"No valid image files found in {image_folder}")

        logger.info(f"Found {len(self.image_files)} valid image files")

        # Improved image transformation pipeline
        self.transform = transforms.Compose([
            transforms.Resize(image_size, interpolation=transforms.InterpolationMode.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_file = self.image_files[idx]
        image_path = os.path.join(self.image_folder, image_file)

        # Extract caption and preprocess
        caption = os.path.splitext(image_file)[0].replace("_", " ").strip()
        # Important: Do not add image token in the text - it's handled by the model
        caption = caption

        try:
            # Image processing with error handling
            image = Image.open(image_path).convert("RGB")
            if image.mode != "RGB":
                image = image.convert("RGB")
            image_tensor = self.transform(image)

            # Text processing with proper token handling
            text_inputs = self.processor.tokenizer(
                caption,
                padding="max_length",
                truncation=True,
                max_length=self.max_length - 1,  # Reserve space for image token
                return_tensors="pt"
            )

            input_ids = text_inputs["input_ids"].squeeze(0)
            attention_mask = text_inputs["attention_mask"].squeeze(0)

            # Append image token
            input_ids = torch.cat([input_ids, torch.tensor([self.image_token])])
            attention_mask = torch.cat([attention_mask, torch.tensor([1])])

            return {
                "input_ids": input_ids,
                "attention_mask": attention_mask,
                "pixel_values": image_tensor,
                "image_grid_thw": torch.tensor([[1, 1, 1]], dtype=torch.int64)
            }

        except Exception as e:
            logger.error(f"Error processing {image_file}: {str(e)}")
            return None

def create_collate_fn(pad_token_id=0):
    def collate_fn(batch):
        # Remove None values from failed samples
        batch = [b for b in batch if b is not None]
        if not batch:
            return None

        # Prepare tensors
        input_ids = pad_sequence([item["input_ids"] for item in batch],
                               batch_first=True,
                               padding_value=pad_token_id)
        attention_mask = pad_sequence([item["attention_mask"] for item in batch],
                                    batch_first=True,
                                    padding_value=0)
        pixel_values = torch.stack([item["pixel_values"] for item in batch])
        image_grid_thw = torch.stack([item["image_grid_thw"] for item in batch])

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "pixel_values": pixel_values,
            "image_grid_thw": image_grid_thw
        }
    return collate_fn

def setup_model_and_processor(model_name, token):
    """Setup model with optimized configuration"""
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4"  # Using nested float 4 for better precision
    )

    try:
        processor = AutoProcessor.from_pretrained(model_name, token=token)
        model = LlavaForConditionalGeneration.from_pretrained(
            model_name,
            token=token,
            quantization_config=bnb_config,
            device_map="auto",
            torch_dtype=torch.float16
        )
        model.gradient_checkpointing_enable()

        logger.info("Model and processor loaded successfully")
        return model, processor
    except Exception as e:
        logger.error(f"Error loading model: {str(e)}")
        return None, None

def main():
    # Configuration
    MODEL_NAME = "llava-hf/llava-1.5-13b-hf"
    TOKEN = "hf_token"
    IMAGE_FOLDER = "/content/content/sample_data"
    BATCH_SIZE = 4
    MAX_LENGTH = 512
    IMAGE_SIZE = (336, 336)  # Make sure this matches the model's expected size

    # Add PEFT/LoRA configuration
    lora_config = LoraConfig(
        r=16,
        lora_alpha=32,
        target_modules=["q_proj", "v_proj"],
        lora_dropout=0.1,
        bias="none",
    )

    # Setup
    model, processor = setup_model_and_processor(MODEL_NAME, TOKEN)
    if model is None or processor is None:
        return

    # Initialize dataset and dataloader
    dataset = LLaVADataset(IMAGE_FOLDER, processor, MAX_LENGTH, IMAGE_SIZE)
    dataloader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=create_collate_fn(processor.tokenizer.pad_token_id),
        num_workers=2,
        pin_memory=True
    )

    # Training loop example
    model.train()
    for batch in tqdm(dataloader, desc="Training"):
        if batch is None:
            continue

        try:
            # Move batch to device
            batch = {k: v.cuda() for k, v in batch.items()}

            # Forward pass
            outputs = model(**batch)
            loss = outputs.loss

            # Backward pass and optimization steps would go here
            # loss.backward()
            # optimizer.step()
            # optimizer.zero_grad()

            logger.info(f"Loss: {loss.item():.4f}")

        except Exception as e:
            logger.error(f"Error in training loop: {str(e)}")
            continue
    model.save_pretrained("/content/llava-finetuned")
    print("model is saved.")


if __name__ == "__main__":
    main()

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

Training:   0%|          | 0/1825 [00:00<?, ?it/s]ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
Training:   0%|          | 1/1825 [00:00<06:57,  4.37it/s]ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
Training:   0%|          | 3/1825 [00:00<03:18,  9.17it/s]ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
Training:   0%|          | 5/1825 [00:00<02:39, 11.43it/s]ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
ERROR:__main__:Error in training loop: Image features and image tokens do not match: tokens: 4, features 2304
Traini

model is saved.


In [None]:
# Generate a description
with torch.no_grad():
    output = model.generate(
        **inputs,
        max_length=1024,
        do_sample=True,  # Allow sampling for diverse outputs
        temperature=0.7,  # Adjust randomness
        top_p=0.9  # Enable nucleus sampling
    )

# Decode and print the result
description = processor.tokenizer.decode(output[0], skip_special_tokens=True)
print("\n📝 Generated Description:", description)



📝 Generated Description:  Describe the scene in detail.


In [None]:
pretrained_model = LlavaForConditionalGeneration.from_pretrained("llava-hf/llava-1.5-13b-hf", torch_dtype=torch.float16, device_map="auto")

with torch.no_grad():
    output = pretrained_model.generate(**inputs, max_length=1024)

description = processor.tokenizer.decode(output[0], skip_special_tokens=True)
print("\n📝 Pretrained Model Description:", description)

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]




📝 Pretrained Model Description:  Describe the scene in detail.


In [None]:
import torch
from PIL import Image
import json, re
from transformers import AutoProcessor, LlavaForConditionalGeneration, BitsAndBytesConfig

# Configure 4-bit quantization if needed
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
)

# Load base LLaVA model (load only once)
model_name = "llava-hf/llava-1.5-13b-hf"
processor = AutoProcessor.from_pretrained(
    model_name,
    token="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"
)

print("Loading base LLaVA model...")
model = LlavaForConditionalGeneration.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto",
    quantization_config=bnb_config,
)
model.eval()
model.to("cuda")


Loading base LLaVA model...


Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

LlavaForConditionalGeneration(
  (vision_tower): CLIPVisionModel(
    (vision_model): CLIPVisionTransformer(
      (embeddings): CLIPVisionEmbeddings(
        (patch_embedding): Conv2d(3, 1024, kernel_size=(14, 14), stride=(14, 14), bias=False)
        (position_embedding): Embedding(577, 1024)
      )
      (pre_layrnorm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
      (encoder): CLIPEncoder(
        (layers): ModuleList(
          (0-23): 24 x CLIPEncoderLayer(
            (self_attn): CLIPSdpaAttention(
              (k_proj): Linear4bit(in_features=1024, out_features=1024, bias=True)
              (v_proj): Linear4bit(in_features=1024, out_features=1024, bias=True)
              (q_proj): Linear4bit(in_features=1024, out_features=1024, bias=True)
              (out_proj): Linear4bit(in_features=1024, out_features=1024, bias=True)
            )
            (layer_norm1): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
            (mlp): CLIPMLP(
              (a

In [None]:
'''
graph TD
    A[Input Image] --> B(LLaVA Visual Analysis)
    B --> C[Textual Description]
    C --> D{Keyword Extraction}
    D --> E[Base Score Calculation]
    C --> F[Contextual Understanding]
    F --> G[Context Score]
    E --> H[Score Aggregator]
    G --> H
    H --> I((Final Compliance Score))
    I --> J[Moderation Action]

'''

'\ngraph TD\n    A[Input Image] --> B(LLaVA Visual Analysis)\n    B --> C[Textual Description]\n    C --> D{Keyword Extraction}\n    D --> E[Base Score Calculation]\n    C --> F[Contextual Understanding]\n    F --> G[Context Score]\n    E --> H[Score Aggregator]\n    G --> H\n    H --> I((Final Compliance Score))\n    I --> J[Moderation Action]\n\n'

In [None]:
from scipy.special import expit as sigmoid

def calculate_compliance_score(description, category_rules):
    """Calculate dynamic compliance score using sigmoid thresholding"""
    base_score = sum(
        category['weights'].get(word, 0) * count
        for word, count in analyze_keywords(description).items()
        for category in category_rules.values()
    )

    context_score = llava_context_analysis(description)  # 0-1 scale
    adjusted_score = base_score * sigmoid(5*(context_score - 0.65))  # α=5, τ=0.65
    return min(100, max(0, int(adjusted_score * 100)))

In [None]:
def load_compliance_context(json_path):
    with open(json_path) as f:
        data = json.load(f)

    categories = {}
    for cat in data['subcategories']:
        total_weight = sum(tag['weight'] for tag in cat['tags'])
        categories[cat['name']] = {
            'tags': {tag['name']: tag['weight']/total_weight for tag in cat['tags']},
            'threshold': cat.get('threshold', 0.5)
        }

    return categories

In [None]:
import json

def load_compliance_context(json_path):
    # Open and load the JSON file from the provided path
    with open(json_path, 'r') as f:
        context = json.load(f)

    # Extract the top-level name (e.g., "Content Restrictions")
    top_name = context.get("name", "Compliance Guidelines")

    # Process each subcategory, listing its name and a sample of its tags
    subcategories = context.get("subcategories", [])
    summary_parts = [f"{top_name}:"]

    for subcat in subcategories:
        cat_name = subcat.get("name", "Unnamed Category")
        # Include a few sample tags (first 3) for context; adjust as needed
        sample_tags = ", ".join(subcat.get("tags", [])[:3])
        summary_parts.append(f"{cat_name} (e.g., {sample_tags})")

    # Join the parts with a separator; you can use a comma or pipe depending on your preference
    return " | ".join(summary_parts)

# Example usage:
compliance_context = load_compliance_context("/content/compliance_rules.json")
print("Compliance Context Summary:")
print(compliance_context)

# Define the function to generate the description
def generate_image_description(image, compliance_context):
    prompt = f"""### Instruction:
You are a vision+text assistant.
1) Describe the image in detail: objects, arrangement, color, usage, background details.
2) Check for potential policy concerns (sensitive content, hate/violence, illegal, graphic) based on these guidelines: {compliance_context}
3) Important: End your response with a separate line exactly like: "Compliance Score: XX/100"
   (where XX is a number from 0 to 100, with 0 meaning entirely safe and 100 extremely unsafe).

### Input:
<image>

### Response:
"""
    inputs = processor(
        images=image,
        text=prompt,
        return_tensors="pt"
    ).to("cuda")

    with torch.no_grad():
        generated_ids = model.generate(
            **inputs,
            max_new_tokens=512,
            do_sample=True,
            temperature=0.7
        )
    output = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return output

# Post-processing: extract the compliance score from the generated description
def extract_compliance_score(text):
    match = re.search(r'Compliance Score:\s*(\d{1,3})/100', text)
    if match:
        return int(match.group(1))
    return None

# Example usage:
image_path = "/content/violant_image.jpg"
image = Image.open(image_path).convert("RGB")

description = generate_image_description(image, compliance_context)
print("\nGenerated Description:\n", description)

score = extract_compliance_score(description)
if score is not None:
    print(f"\nExtracted Compliance Score: {score}/100")
else:
    print("\nCompliance Score not found in the generated output.")

Compliance Context Summary:
Content Compliance Framework:

Generated Description:
 ### Instruction:
You are a vision+text assistant.
1) Describe the image in detail: objects, arrangement, color, usage, background details.
2) Check for potential policy concerns (sensitive content, hate/violence, illegal, graphic) based on these guidelines: Content Compliance Framework:
3) Important: End your response with a separate line exactly like: "Compliance Score: XX/100"
   (where XX is a number from 0 to 100, with 0 meaning entirely safe and 100 extremely unsafe).

### Input:
 

### Response:

In the image, a man is running down a street during a protest, with the smoke and tear gas filling the air. Several other people are scattered throughout the scene, some of them appearing to be protesters, and others seemingly observers. The man in the red and blue clothing appears to be in a hurry, possibly trying to escape the chaos happening around him.

The presence of tear gas and smoke indicates that

In [None]:
# %%time
# !pip install -q transformers accelerate torchvision

In [None]:
import torch
from transformers import LlavaForConditionalGeneration, AutoProcessor, BitsAndBytesConfig

def load_models():
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Configure 4-bit quantization
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
    )

    model_name = "llava-hf/llava-1.5-13b-hf"
    processor = AutoProcessor.from_pretrained(
        model_name,
        token="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"
    )

    print("Loading base LLaVA model...")
    model = LlavaForConditionalGeneration.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        quantization_config=bnb_config,
    )
    model.eval()
    model.to(device)

    return model, processor, device

# Run once per session
model, processor, device = load_models()

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.48, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Loading base LLaVA model...


Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

In [None]:
# Content Moderation System Implementation
import json
import re
from PIL import Image
from scipy.special import expit as sigmoid
from collections import defaultdict
from functools import lru_cache
import torch

class ContentModerator:
    def __init__(self, model, processor, rules_path="/content/compliance_rules.json"):
        """Initialize Content Moderator with model, processor, and compliance rules."""
        self.model = model
        self.processor = processor
        print(f"🔄 Loading compliance rules from: {rules_path} (Type: {type(rules_path)})")  # Debug
        self.categories = self._load_rules(rules_path)
        self.alpha = 3.0  # Reduced alpha to soften sigmoid scaling
        self.base_threshold = 0.65
        print(f"✅ Initialization model and processor (Type: {type(model)})")  # Debug
        print(f"✅ Initialization Complete (Alpha: {self.alpha}, Base Threshold: {self.base_threshold})")  # Debug

    def _load_rules(self, json_path):
        """Load compliance categories and normalize weights."""
        with open(json_path) as f:
            data = json.load(f)

        print(f"📂 JSON Data Loaded (Type: {type(data)})")  # Debug
        print(f"🛠 Categories Found: {len(data['categories'])} (Type: {type(data['categories'])})")  # Debug

        categories = {}
        for cat in data["categories"]:
            print(f"➡ Processing Category: {cat['name']} (Type: {type(cat)})")  # Debug
            total = sum(t["weight"] for t in cat["tags"])  # Sum of all tag weights

            print(f"📊 Total Tag Weight for {cat['name']}: {total} (Type: {type(total)})")  # Debug

            categories[cat["name"]] = {
                "tags": {t["name"]: t["weight"] / total for t in cat["tags"]},
                "threshold": cat.get("threshold", 0.5),
                "severity_scale": cat.get("severity_scale", 1.0)  # Apply severity scaling
            }

            print(f"✔ Normalized Weights for {cat['name']}: {categories[cat['name']]['tags']} (Type: {type(categories[cat['name']]['tags'])})")  # Debug

        print(f"🔍 Final Categories Structure: {categories} (Type: {type(categories)})")  # Debug
        return categories

    @lru_cache(maxsize=1000)
    def _analyze_text(self, text):
        """Analyze text for keyword matches and compute severity scores."""
        print(f"📝 Text to Analyze: {text} (Type: {type(text)})")  # Debug
        text = text.lower()
        matches = defaultdict(float)

        print(f"📊 Available Categories: {list(self.categories.keys())} (Type: {type(self.categories)})")  # Debug

        for cat, data in self.categories.items():
            print(f"🔎 Checking Category: {cat} (Type: {type(cat)})")  # Debug
            for kw, wt in data["tags"].items():
                if re.search(rf'\b{re.escape(kw)}s?\b', text):
                    print(f"✅ Matched Keyword: {kw} (Type: {type(kw)}) in Category: {cat}")  # Debug
                    matches[cat] += wt
                    print(f"📈 Updated Match Score for {cat}: {matches[cat]} (Type: {type(matches[cat])})")  # Debug

        print(f"📌 Final Match Scores: {matches} (Type: {type(matches)})")  # Debug
        return dict(matches)

    def _get_context_score(self, description):
        """Extract compliance score from LLaVA model output or estimate based on keyword density."""
        print(f"📜 Model Output Description: {description} (Type: {type(description)})")  # Debug
        match = re.search(r'\[Score:\s*(\d+)/100\]', description)

        if match:
            score = int(match.group(1)) / 100
            print(f"🎯 Extracted Context Score: {score} (Type: {type(score)})")  # Debug
            return score

        # If no explicit score, estimate based on keyword density
        print("⚠️ No explicit score found in description. Estimating context score...")
        word_count = len(description.split())
        keyword_density = len(self._analyze_text(description)) / max(word_count, 1)
        estimated_score = min(1.0, max(0.5, keyword_density * 2))
        print(f"📊 Estimated Context Score: {estimated_score}")
        return estimated_score

    def analyze(self, image_path):
        """Analyze an image for compliance scoring."""
        print(f"🖼️ Processing Image: {image_path} (Type: {type(image_path)})")  # Debug
        image = Image.open(image_path).convert("RGB")

        print(f"✅ Image Loaded Successfully (Mode: {image.mode}, Size: {image.size})")  # Debug

        prompt = """ <image> Describe this image in detail. Include:
- Objects present
- Actions being performed
- Clothing, expressions, and gestures
- Background and scene description
- Any elements that might relate to compliance rules """

        print(f"📝 Generated Prompt: {prompt} (Type: {type(prompt)})")  # Debug


        inputs = self.processor(
        images=image,
        text=prompt,
        return_tensors="pt"
        ).to("cuda")

        with torch.no_grad():
            generated_ids = model.generate(
                **inputs,
                max_new_tokens=512,
                do_sample=True,
                temperature=0.7
              )
        desc = processor.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]


        # # Generate description
        # inputs = self.processor(
        #     text=prompt,
        #     images=image,
        #     return_tensors="pt"
        # ).to(device, torch.float16)

        # print(f"🛠 Model Input Prepared (Keys: {inputs.keys()}, Type: {type(inputs)})")  # Debug

        # with torch.inference_mode():
        #     out = self.model.generate(**inputs, max_new_tokens=512)
        #     print(f"🚀 Model Generated Output: {out} (Type: {type(out)}")

        # desc = self.processor.decode(out[0], skip_special_tokens=True)
        #desc = processor.tokenizer.batch_decode(inputs, skip_special_tokens=True)[0]

        print(f"📜 Generated Description: {desc} (Type: {type(desc)})")  # Debug

        # Calculate compliance scores
        ctx_score = self._get_context_score(desc)
        matches = self._analyze_text(desc.lower())

        print(f"📊 Context Score: {ctx_score} (Type: {type(ctx_score)})")  # Debug
        print(f"📌 Matched Keywords & Scores: {matches} (Type: {type(matches)})")  # Debug

        final_scores = {}
        for cat, score in matches.items():
            severity_scale = self.categories[cat]["severity_scale"]
            print(f"🔍 Category: {cat}, Score: {score}, Severity Scale: {severity_scale}")  # Debug
            adj = (score * severity_scale) * sigmoid(self.alpha * (ctx_score - self.base_threshold))
            print(f"📊 Adjusted Score for adj {cat}: {adj} (Type: {type(adj)}) {min(100, int(adj * 1000))}")  # Debug
            final_scores[cat] = min(100, int(adj * 1000))
            print(f"📈 Adjusted Score for vah {cat}: {final_scores[cat]} (Type: {type(final_scores[cat])})")  # Debug

        print(f"📊 Final Scores: {final_scores} (Type: {type(final_scores)})")  # Debug
        overall_score = max(final_scores.values(), default=0)A
        print(f"🚨 Final Compliance Scores: {final_scores} (Type: {type(final_scores)})")  # Debug
        print(f"🏆 Overall Risk Score: {overall_score} (Type: {type(overall_score)})")  # Debug

        return {
            "description": desc,
            "scores": final_scores,
            "context_score": int(ctx_score * 100),
            "overall": overall_score
        }

In [None]:
# Cell 3: Usage Example
# Initialize once per session
moderator = ContentModerator(model, processor)

# Analyze image
results = moderator.analyze("/content/HalfNaked_900.jpg")
# results = moderator.analyze("/content/adult_image.jpg")
# results = moderator.analyze("/content/boobs_papa.jpg")
# results = moderator.analyze("/content/javhd-157.jpg")

print("results : ", results)
print("🖼️ Image Analysis:")
print(results["description"])
print("\n🔍 Compliance Scores:")
for cat, score in results["scores"].items():
    print(f"- {cat}: {score}/100")
print(f"\n🚨 Overall Risk: {results['overall']}/100 (Context: {results['context_score']}/100)")

🔄 Loading compliance rules from: /content/compliance_rules.json (Type: <class 'str'>)
📂 JSON Data Loaded (Type: <class 'dict'>)
🛠 Categories Found: 2 (Type: <class 'list'>)
➡ Processing Category: Sensitive Content (Type: <class 'dict'>)
📊 Total Tag Weight for Sensitive Content: 53.83 (Type: <class 'float'>)
✔ Normalized Weights for Sensitive Content: {'actions': 0.009288500835965075, 'acts': 0.009288500835965075, 'adult': 0.009288500835965075, 'alluring': 0.009288500835965075, 'analysis': 0.009288500835965075, 'anatomical': 0.009288500835965075, 'area': 0.009288500835965075, 'attitudes': 0.009288500835965075, 'bare': 0.009288500835965075, 'beach': 0.009288500835965075, 'body': 0.009288500835965075, 'boobs': 0.009288500835965075, 'connection': 0.009288500835965075, 'content': 0.009288500835965075, 'copulation': 0.009288500835965075, 'crude': 0.009288500835965075, 'cues': 0.009288500835965075, 'cultural': 0.009288500835965075, 'culturally': 0.009288500835965075, 'depictions': 0.009288500

In [None]:
# pip install scikit-learn numpy pandas nltk spacy sentence-transformers

In [None]:
import re
import json
import torch
import spacy
import numpy as np
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.preprocessing import LabelEncoder
from sentence_transformers import SentenceTransformer, util
from typing import Dict, List

# Download necessary NLTK data
nltk.download("punkt")
nltk.download("stopwords")

# Load pre-trained models
nlp = spacy.load("en_core_web_sm")  # Named Entity Recognition (NER)
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")  # Semantic Similarity Model

# Load transformer-based classifier for hate speech detection
MODEL_NAME = "facebook/roberta-hate-speech-dynabench-r4-target"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)

# Define Compliance Categories
CATEGORIES = {
    "adult_content": ["explicit", "nudity", "sexual", "pornographic"],
    "harsh_language": ["profanity", "swearing", "abuse"],
    "child_abuse": ["minor exploitation", "grooming"],
    "hate_speech": ["racism", "sexism", "homophobia"],
    "sensitive_info": ["leaked data", "PII exposure"]
}

# Define Rule-Based Filtering
RULES = {
    "adult_content": r"\b(nude|porn|erotic|sex|lust|intimate|strip|orgasm|fetish)\b",
    "harsh_language": r"\b(fuck|shit|bitch|bastard|cunt|asshole|dickhead)\b",
    "child_abuse": r"\b(minor|underage|child porn|kid exploitation|pedo|grooming)\b",
    "hate_speech": r"\b(nazi|white power|kkk|lynch|terrorist|homophobic|slur)\b",
    "sensitive_info": r"\b(\d{3}-\d{2}-\d{4}|\d{16}|\d{4}-\d{4}-\d{4}-\d{4})\b"
}

# Dynamic Allow-List (initialized with known safe terms)
dynamic_allow_list = set(["Bare Minerals", "Assume", "Bass Guitar" ])

# Stop Words for Filtering
stop_words = set(stopwords.words("english"))

# Debugging Print Statement
print("\n🔍 DEBUGGING TRACE STARTED\n")

# Function to Apply Rule-Based Filtering
def apply_rule_based_filter(text: str) -> List[str]:
    """Applies regex-based filtering for quick classification."""
    flagged_categories = []
    for category, pattern in RULES.items():
        if re.search(pattern, text, re.IGNORECASE):
            flagged_categories.append(category)

    print(f"📌 Rule-Based Filtering: Found {flagged_categories} in text: {text}")
    return flagged_categories

# Function to Perform Text Classification using a Transformer Model
def classify_with_transformer(text: str) -> str:
    """Uses a transformer model to classify text into safe or non-compliant categories."""
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    outputs = model(**inputs)
    prediction = torch.argmax(outputs.logits, dim=-1).item()

    classification = ["Safe", "Hate Speech"][prediction]
    print(f"📌 Transformer Model Classification: {classification} for text: {text}")
    return classification

# Function to Extract Key Terms After Stop Word Removal
def extract_key_terms(text: str) -> List[str]:
    """Removes stop words and extracts key terms for context analysis."""
    words = word_tokenize(text)
    key_terms = [word.lower() for word in words if word.lower() not in stop_words]
    print(f"📌 Key Terms Extracted (Stop words removed): {key_terms}")
    return key_terms

# Function to Check Context Using Named Entity Recognition (NER)
def check_context(text: str) -> bool:
    """Uses NER to determine if flagged words appear in a benign context."""
    doc = nlp(text)
    named_entities = [ent.text for ent in doc.ents]

    print(f"📌 Named Entities Detected: {named_entities}")

    # If a named entity is in the dynamic allow-list, it's likely safe
    for entity in named_entities:
        if entity in dynamic_allow_list:
            print(f"✅ Context Safe: {entity} is in allow-list.")
            return True
    return False

# Function to Calculate Semantic Similarity
def is_contextually_safe(text: str, threshold: float = 0.75) -> bool:
    """Uses word embeddings to check if the text is similar to known benign terms."""
    embeddings_text = embedding_model.encode(text, convert_to_tensor=True)

    for safe_term in dynamic_allow_list:
        embeddings_safe_term = embedding_model.encode(safe_term, convert_to_tensor=True)
        similarity_score = util.pytorch_cos_sim(embeddings_text, embeddings_safe_term).item()
        print(f"📌 Semantic Similarity Check: {text} vs {safe_term} -> Score: {similarity_score}")
        if similarity_score > threshold:
            print(f"✅ Semantic Context Safe: Similar to {safe_term}")
            return True
    return False

# Compliance Check Function
def compliance_check(text: str) -> Dict[str, any]:
    """Runs a multi-layered compliance check on the given text."""
    print(f"\n🔹 Checking Compliance for: {text}\n")

    result = {
        "text": text,
        "rule_based_flags": [],
        "ml_classification": "Safe",
        "context_safe": False,
        "semantic_safe": False,
        "final_decision": "Safe",
        "explanation": ""
    }

    # Apply Rule-Based Filtering
    result["rule_based_flags"] = apply_rule_based_filter(text)

    # Apply Transformer Model
    if len(result["rule_based_flags"]) > 0:
        result["ml_classification"] = classify_with_transformer(text)

    # Extract Key Terms
    key_terms = extract_key_terms(text)

    # Check Context with NER
    result["context_safe"] = check_context(text)

    # Check Semantic Similarity
    result["semantic_safe"] = is_contextually_safe(text)

    # Final Decision
    if result["ml_classification"] != "Safe" or len(result["rule_based_flags"]) > 0:
        if result["context_safe"] or result["semantic_safe"]:
            result["final_decision"] = "Safe"
            result["explanation"] = "Content contains flagged words but appears in a non-violating context."
        else:
            result["final_decision"] = "Violation"
            result["explanation"] = f"Content classified as: {result['rule_based_flags']}."

    print(f"🔹 Final Decision: {result['final_decision']} - {result['explanation']}\n")
    return result

# Test Cases
test_cases = [
    "In the image, a man and a woman are lying on a bed and engaging in a lustful act. The woman is straddling the man while getting intimate with him. The scene is quite provocative, as both individuals appear to be enjoying their time together. The woman appears to be fully naked, and the man's intentions are clear as they both participate in their erotic pursuits.",
    "Three bottles of Naked beverages are featured in the image.",
    "The image shows three bottles of a green beverage - Naked Half Naked - in three different colors.",
    "I wanna fuck you up dog ass hole mother fuckr bitch I will fuck you up."
]

# Run Compliance Checks
results = [compliance_check(text) for text in test_cases]

# Display Results
df = pd.DataFrame(results)
import ace_tools_open as tools
tools.display_dataframe_to_user(name="Debugging Compliance Model", dataframe=df)

print("\n✅ DEBUGGING TRACE COMPLETED\n")


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



🔍 DEBUGGING TRACE STARTED


🔹 Checking Compliance for: In the image, a man and a woman are lying on a bed and engaging in a lustful act. The woman is straddling the man while getting intimate with him. The scene is quite provocative, as both individuals appear to be enjoying their time together. The woman appears to be fully naked, and the man's intentions are clear as they both participate in their erotic pursuits.

📌 Rule-Based Filtering: Found ['adult_content'] in text: In the image, a man and a woman are lying on a bed and engaging in a lustful act. The woman is straddling the man while getting intimate with him. The scene is quite provocative, as both individuals appear to be enjoying their time together. The woman appears to be fully naked, and the man's intentions are clear as they both participate in their erotic pursuits.
📌 Transformer Model Classification: Safe for text: In the image, a man and a woman are lying on a bed and engaging in a lustful act. The woman is straddling

text,rule_based_flags,ml_classification,context_safe,semantic_safe,final_decision,explanation
Loading ITables v2.2.5 from the internet... (need help?),,,,,,



✅ DEBUGGING TRACE COMPLETED

