<a href="https://colab.research.google.com/github/abhishek-1406/SafeScanTool/blob/main/Models/clip%26ocr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

TRAINNG CLIP MODEL

In [None]:
import sys
sys.path.append('/content/model')


In [None]:
pip install torch torchvision transformers pandas scikit-learn tqdm


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
from transformers import CLIPModel
import torch.nn as nn
import torch

class CLIP_MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.classifier = nn.Sequential(
            nn.Linear(512 + 512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)  # 2 classes: hateful, non-hateful
        )

    def forward(self, input_ids, attention_mask, pixel_values):
        with torch.no_grad():
            outputs = self.clip(input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values)
        combined = torch.cat([outputs.text_embeds, outputs.image_embeds], dim=1)
        return self.classifier(combined)


In [None]:
import os
import json
import torch
from torch.utils.data import Dataset
from PIL import Image
from transformers import CLIPProcessor

class HateSpeechDataset(Dataset):
    def __init__(self, jsonl_path, image_root):
        self.data = []
        self.image_root = image_root
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

        with open(jsonl_path, 'r') as f:
            for line in f:
                try:
                    item = json.loads(line.strip())
                    img_path = os.path.join(self.image_root, os.path.basename(item['img']))
                    if os.path.exists(img_path):
                        self.data.append(item)
                    else:
                        print(f"[WARNING] Missing image: {img_path}")
                except Exception as e:
                    print(f"[ERROR] Skipping line due to JSON error: {e}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        image_path = os.path.join(self.image_root, os.path.basename(item['img']))
        image = Image.open(image_path).convert("RGB")
        text = item["text"]
        label = int(item["label"])

        inputs = self.processor(
            text=[text],
            images=image,
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=77
        )

        return {
            'input_ids': inputs['input_ids'].squeeze(0),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'pixel_values': inputs['pixel_values'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long)
        }



In [None]:
from torch.utils.data import DataLoader
from transformers import CLIPProcessor

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def collate_fn(batch):
    texts = [item['input_ids'] for item in batch]
    masks = [item['attention_mask'] for item in batch]
    images = [item['pixel_values'] for item in batch]
    labels = [item['label'] for item in batch]

    # Pad input_ids and attention_mask dynamically
    input_ids = torch.nn.utils.rnn.pad_sequence(texts, batch_first=True, padding_value=processor.tokenizer.pad_token_id)
    attention_mask = torch.nn.utils.rnn.pad_sequence(masks, batch_first=True, padding_value=0)
    pixel_values = torch.stack(images)
    labels = torch.stack(labels)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'pixel_values': pixel_values,
        'label': labels
    }



Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

In [None]:
from torch.utils.data import DataLoader

train_dataset = HateSpeechDataset(
    "/content/drive/MyDrive/data/train.jsonl",
    "/content/drive/MyDrive/data/img"
)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)



In [None]:
from transformers import CLIPModel
import torch.nn as nn
import torch

class CLIP_MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.classifier = nn.Sequential(
            nn.Linear(512 + 512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)
        )

    def forward(self, input_ids, attention_mask, pixel_values):
        with torch.no_grad():
            outputs = self.clip(input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values)
        combined = torch.cat([outputs.text_embeds, outputs.image_embeds], dim=1)
        return self.classifier(combined)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
model = CLIP_MLP().to(device)
model.load_state_dict(torch.load("/content/drive/MyDrive/clip_mlp_epoch5.pt", map_location=device))
model.eval()

CLIP_MLP(
  (clip): CLIPModel(
    (text_model): CLIPTextTransformer(
      (embeddings): CLIPTextEmbeddings(
        (token_embedding): Embedding(49408, 512)
        (position_embedding): Embedding(77, 512)
      )
      (encoder): CLIPEncoder(
        (layers): ModuleList(
          (0-11): 12 x CLIPEncoderLayer(
            (self_attn): CLIPAttention(
              (k_proj): Linear(in_features=512, out_features=512, bias=True)
              (v_proj): Linear(in_features=512, out_features=512, bias=True)
              (q_proj): Linear(in_features=512, out_features=512, bias=True)
              (out_proj): Linear(in_features=512, out_features=512, bias=True)
            )
            (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
            (mlp): CLIPMLP(
              (activation_fn): QuickGELUActivation()
              (fc1): Linear(in_features=512, out_features=2048, bias=True)
              (fc2): Linear(in_features=2048, out_features=512, bias=True)
       

In [None]:
import sys
sys.path.append('/content/model')


OCR

In [None]:
!pip install pyspellchecker

Collecting pyspellchecker
  Downloading pyspellchecker-0.8.3-py3-none-any.whl.metadata (9.5 kB)
Downloading pyspellchecker-0.8.3-py3-none-any.whl (7.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m52.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyspellchecker
Successfully installed pyspellchecker-0.8.3


Checking through text input

In [None]:
from transformers import CLIPProcessor

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

text_input = "therapy dogs are now being trained to console rape victims of muslims"
image_path = "/content/drive/MyDrive/data/img/23158.png"

image = Image.open(image_path).convert("RGB")
inputs = processor(
    text=[text_input],
    images=image,
    return_tensors="pt",
    truncation=True,
    padding="max_length",
    max_length=77
)
inputs = {k: v.to(device) for k, v in inputs.items()}

with torch.no_grad():
    output = model(**inputs)
    predicted_class = torch.argmax(output, dim=1).item()

label_map = {1: "non-hateful", 0: "hateful"}
print("Prediction:", label_map[predicted_class])

Checking without text input( text acquired through ocr )

In [None]:
!pip install pyspellchecker

In [None]:
!pip install easyocr

Collecting easyocr
  Downloading easyocr-1.7.2-py3-none-any.whl.metadata (10 kB)
Collecting python-bidi (from easyocr)
  Downloading python_bidi-0.6.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting pyclipper (from easyocr)
  Downloading pyclipper-1.3.0.post6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting ninja (from easyocr)
  Downloading ninja-1.11.1.4-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (5.0 kB)
Downloading easyocr-1.7.2-py3-none-any.whl (2.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m28.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ninja-1.11.1.4-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (422 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m422.8/422.8 kB[0m [31m37.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyclipper-1.3.0.post6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (

In [None]:
import cv2
import numpy as np
from spellchecker import SpellChecker
import easyocr
import re

# Preprocessing Functions
def preprocess_image(image_path):
    # Load the image
    img = cv2.imread(image_path)

    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)


    return gray

# Extract text using EasyOCR
def extract_text_with_easyocr(image_path):
    reader = easyocr.Reader(['en'])


    # Preprocess the image
    preprocessed_image = preprocess_image(image_path)

    # Use EasyOCR on the preprocessed image
    result = reader.readtext(preprocessed_image, detail=0)
    return " ".join(result)

def correct_spelling(text):
    spell = SpellChecker()
    words = text.split()  # Split the text into words
    corrected_words = [spell.correction(word) if spell.correction(word) is not None else word for word in words]
    return " ".join(corrected_words)  # Join the words back into a single string

def remove_special_characters(text):
    # Use regex to remove special characters (retain letters and spaces)
    cleaned_text = re.sub(r'[^A-Za-z0-9\s]', '', text)
    return cleaned_text

In [None]:
image_path = "/content/drive/MyDrive/data/img/23158.png"
ocr_text = extract_text_with_easyocr(image_path)
corrected_text = correct_spelling(ocr_text)
cleaned_text = remove_special_characters(corrected_text)

print("OCR Text:", cleaned_text)



Progress: |██████████████████████████████████████████████████| 100.0% Complete



Progress: |--------------------------------------------------| 0.0% CompleteProgress: |--------------------------------------------------| 0.1% CompleteProgress: |--------------------------------------------------| 0.1% CompleteProgress: |--------------------------------------------------| 0.2% CompleteProgress: |--------------------------------------------------| 0.2% CompleteProgress: |--------------------------------------------------| 0.3% CompleteProgress: |--------------------------------------------------| 0.4% CompleteProgress: |--------------------------------------------------| 0.4% CompleteProgress: |--------------------------------------------------| 0.5% CompleteProgress: |--------------------------------------------------| 0.5% CompleteProgress: |--------------------------------------------------| 0.6% CompleteProgress: |--------------------------------------------------| 0.6% CompleteProgress: |--------------------------------------------------| 0.7% Complet

In [None]:
image = Image.open(image_path).convert("RGB")
inputs = processor(
    text=[cleaned_text],
    images=image,
    return_tensors="pt",
    truncation=True,
    padding="max_length",
    max_length=77
)
inputs = {k: v.to(device) for k, v in inputs.items()}

with torch.no_grad():
    output = model(**inputs)
    predicted_class = torch.argmax(output, dim=1).item()

label_map = {0: "non-hateful", 1: "hateful"}
print("Prediction:", label_map[predicted_class])

Prediction: hateful


IMAGE API

Ocr_Flask

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from flask import Flask, request, jsonify
import os
import cv2
import torch
from flask_cors import CORS
from pyngrok import ngrok
import torch.nn as nn
import re
import easyocr
from spellchecker import SpellChecker
from transformers import CLIPModel, CLIPProcessor
from PIL import Image
from clarifai.client.model import Model
from io import BytesIO
import asyncio

# Install clarifai if it's not available
try:
    from clarifai.client.model import Model
except ImportError:
    !pip install clarifai
    from clarifai.client.model import Model


# ================================
# 1. INITIALIZE FLASK
# ================================
app = Flask(__name__)
CORS(app)

public_url = ngrok.connect(5005)
print(f"🚀 Ngrok Tunnel URL: {public_url}")

# ================================
# 2. LOAD MODELS
# ================================

# Path 2: Clarifai Setup
pat = "2ea8be29d848446b996b5ebd476bd4e6"  # Your PAT
user_id = "clarifai"
app_id = "main"
model_id = "general-image-recognition"
clarifai_model = Model(user_id=user_id, app_id=app_id, model_id=model_id, pat=pat, is_async=False )

# Path 1: CLIP + MLP Model Setup
class CLIP_MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.classifier = nn.Sequential(
            nn.Linear(512 + 512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)
        )

    def forward(self, input_ids, attention_mask, pixel_values):
        with torch.no_grad():
            outputs = self.clip(input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values)
        combined = torch.cat([outputs.text_embeds, outputs.image_embeds], dim=1)
        return self.classifier(combined)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model = CLIP_MLP().to(device)
clip_model.load_state_dict(torch.load("/content/drive/MyDrive/clip_mlp_epoch5.pt", map_location=device))
clip_model.eval()

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# ================================
# 3. HELPER FUNCTIONS
# ================================

# --- OCR Helpers ---
def preprocess_image(image_path):
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return gray

def extract_text_with_easyocr(image_path):
    reader = easyocr.Reader(['en'])
    preprocessed_image = preprocess_image(image_path)
    result = reader.readtext(preprocessed_image, detail=0)
    return " ".join(result)

def correct_spelling(text):
    spell = SpellChecker()
    words = text.split()
    corrected_words = [spell.correction(word) if spell.correction(word) else word for word in words]
    return " ".join(corrected_words)

def remove_special_characters(text):
    return re.sub(r'[^A-Za-z0-9\s]', '', text)

def image_has_text(image_path):
    ocr_text = extract_text_with_easyocr(image_path)
    return len(ocr_text.strip()) > 5, ocr_text  # True if text detected



def classify_with_clarifai(image_path):
    try:
        with open(image_path, 'rb') as image_file:
            image_data = image_file.read()

        # Create and set a new event loop if not exists
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        response = clarifai_model.predict_by_bytes(image_data)
        hateful_keywords = ["hate", "violence", "racism", "abuse", "explicit", "discrimination", "offensive"]

        if response.outputs:
            labels = response.outputs[0].data.concepts
            for label in labels:
                if any(keyword in label.name.lower() for keyword in hateful_keywords):
                    return "hateful"
        return "non-hateful"
    except Exception as e:
        return f"Clarifai Error: {str(e)}"


# --- CLIP + OCR Classification ---
def classify_with_clip(image_path):
    # Extract and clean text
    ocr_text = extract_text_with_easyocr(image_path)
    corrected_text = correct_spelling(ocr_text)
    cleaned_text = remove_special_characters(corrected_text)

    image = Image.open(image_path).convert("RGB")
    inputs = processor(
        text=[cleaned_text],
        images=image,
        return_tensors="pt",
        truncation=True,
        padding="max_length",
        max_length=77
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        output = clip_model(**inputs)
        predicted_class = torch.argmax(output, dim=1).item()

    label_map = {0: "non-hateful", 1: "hateful"}
    return label_map[predicted_class]

# ================================
# 4. FLASK ROUTE
# ================================
@app.route('/predict_image', methods=['POST'])
def predict_image():
    if 'image' not in request.files:
        return jsonify({"error": "No image provided"}), 400

    image = request.files['image']
    save_path = f"temp_{image.filename}"
    image.save(save_path)

    # Check if image has text
    has_text, _ = image_has_text(save_path)
    if has_text:
        prediction = classify_with_clip(save_path)
        method = "Path 1 (CLIP + OCR)"
    else:
        prediction = classify_with_clarifai(save_path)
        method = "Path 2 (Clarifai)"

    os.remove(save_path)  # Clean up temp file
    return jsonify({"method": method, "prediction": prediction})

# ================================
# 5. RUN APP
# ================================
if __name__ == '__main__':
    app.run(port=5005)

Working on image-api flask

In [None]:
!pip install pyspellchecker

Collecting pyspellchecker
  Downloading pyspellchecker-0.8.3-py3-none-any.whl.metadata (9.5 kB)
Downloading pyspellchecker-0.8.3-py3-none-any.whl (7.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m36.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyspellchecker
Successfully installed pyspellchecker-0.8.3


In [None]:
!pip install flask_cors

Collecting flask_cors
  Downloading flask_cors-6.0.1-py3-none-any.whl.metadata (5.3 kB)
Downloading flask_cors-6.0.1-py3-none-any.whl (13 kB)
Installing collected packages: flask_cors
Successfully installed flask_cors-6.0.1


In [None]:
!pip install pyngrok


Collecting pyngrok
  Downloading pyngrok-7.2.12-py3-none-any.whl.metadata (9.4 kB)
Downloading pyngrok-7.2.12-py3-none-any.whl (26 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.12


In [None]:
!ngrok config add-authtoken 30IMAxM5tnGyIsqk6z0f1cep62w_4m67nTD8VnQLxqadDKjKW

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
!pip install easyocr

Collecting easyocr
  Downloading easyocr-1.7.2-py3-none-any.whl.metadata (10 kB)
Collecting python-bidi (from easyocr)
  Downloading python_bidi-0.6.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting pyclipper (from easyocr)
  Downloading pyclipper-1.3.0.post6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting ninja (from easyocr)
  Downloading ninja-1.11.1.4-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (5.0 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->easyocr)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->easyocr)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->easyocr)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (

In [None]:
!pip install clarifai

Collecting clarifai
  Downloading clarifai-11.6.5-py3-none-any.whl.metadata (22 kB)
Collecting clarifai-grpc>=11.6.4 (from clarifai)
  Downloading clarifai_grpc-11.6.5-py3-none-any.whl.metadata (4.4 kB)
Collecting clarifai-protocol>=0.0.25 (from clarifai)
  Downloading clarifai_protocol-0.0.25-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (14 kB)
Collecting schema==0.7.5 (from clarifai)
  Downloading schema-0.7.5-py2.py3-none-any.whl.metadata (34 kB)
Collecting uv==0.7.12 (from clarifai)
  Downloading uv-0.7.12-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting psutil==7.0.0 (from clarifai)
  Downloading psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting contextlib2>=0.5.5 (from schema==0.7.5->clarifai)
  Downloading contextlib2-21.6.0-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading clarifai-11.6.5-py3-none-any.whl (27

In [None]:
import requests
import base64

# Replace with your API key
API_KEY = "AIzaSyAThUa2Mi97AKr3ASw9OBWXHDRsrUBiP18"

def detect_meme_text(image_path):
    # Read the image file as binary
    with open(image_path, "rb") as image_file:
        image_content = base64.b64encode(image_file.read()).decode('UTF-8')

    # Prepare the API request
    request_body = {
        "requests": [{
            "image": {
                "content": image_content
            },
            "features": [{
                "type": "TEXT_DETECTION"
            }]
        }]
    }

    # Send request to Google Vision API
    response = requests.post(
        f"https://vision.googleapis.com/v1/images:annotate?key={API_KEY}",
        json=request_body
    )

    # Process the response
    if response.status_code == 200:
        result = response.json()
        try:
            detected_text = result['responses'][0]['fullTextAnnotation']['text']
            print("Extracted Text:")
            print(detected_text)
        except KeyError:
            print("No text found in the image.")
    else:
        print(f"Error: {response.status_code}, {response.text}")

# Usage - replace 'meme.jpg' with your image path
detect_meme_text("/content/drive/MyDrive/data/img/74132.png")


Extracted Text:
fuck allah, fuck
muslims and fuck
islam, and if you don't
like what i say then fuck


In [None]:
image_path = "/content/drive/MyDrive/data/img/53976.png"

In [None]:
from PIL import Image
from transformers import CLIPProcessor

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

image = Image.open(image_path).convert("RGB")
inputs = processor(
    text=[detected_text],
    images=image,
    return_tensors="pt",
    truncation=True,
    padding="max_length",
    max_length=77
)
inputs = {k: v.to(device) for k, v in inputs.items()}

with torch.no_grad():
    output = model(**inputs)
    predicted_class = torch.argmax(output, dim=1).item()

label_map = {0: "non-hateful", 1: "hateful"}
print("Prediction:", label_map[predicted_class])

FINAL FLASK PATH

In [None]:
from flask import Flask, request, jsonify
import os
import cv2
import torch
from flask_cors import CORS
from pyngrok import ngrok
import torch.nn as nn
import re
import requests
import base64
from transformers import CLIPModel, CLIPProcessor
from PIL import Image
from clarifai.client.model import Model
import asyncio
import threading

# ================================
# 1. INITIALIZE FLASK
# ================================
app = Flask(__name__)
CORS(app)

public_url = ngrok.connect(5005)
print(f"🚀 Ngrok Tunnel URL: {public_url}")

# ================================
# 2. API KEYS & GLOBAL VARIABLES
# ================================
GOOGLE_API_KEY = "AIzaSyAThUa2Mi97AKr3ASw9OBWXHDRsrUBiP18"
CLARIFAI_PAT = "2ea8be29d848446b996b5ebd476bd4e6"

# ================================
# 3. CLIP + MLP MODEL SETUP (Path 1)
# ================================
class CLIP_MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.classifier = nn.Sequential(
            nn.Linear(512 + 512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)
        )

    def forward(self, input_ids, attention_mask, pixel_values):
        with torch.no_grad():
            outputs = self.clip(input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values)
        combined = torch.cat([outputs.text_embeds, outputs.image_embeds], dim=1)
        return self.classifier(combined)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model = CLIP_MLP().to(device)
clip_model.load_state_dict(torch.load("/content/drive/MyDrive/clip_mlp_epoch5.pt", map_location=device))
clip_model.eval()

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# ================================
# 4. GOOGLE VISION OCR (for Path 1)
# ================================
def detect_meme_text(image_path):
    """Extract text using Google Vision OCR"""
    with open(image_path, "rb") as image_file:
        image_content = base64.b64encode(image_file.read()).decode('UTF-8')

    request_body = {
        "requests": [{
            "image": {"content": image_content},
            "features": [{"type": "TEXT_DETECTION"}]
        }]
    }

    response = requests.post(
        f"https://vision.googleapis.com/v1/images:annotate?key={GOOGLE_API_KEY}",
        json=request_body
    )

    if response.status_code == 200:
        result = response.json()
        try:
            detected_text = result['responses'][0]['fullTextAnnotation']['text']
            return detected_text.strip()
        except KeyError:
            return ""
    else:
        return ""

# Clean text for CLIP
def remove_special_characters(text):
    return re.sub(r'[^A-Za-z0-9\s]', '', text)

# ================================
# 5. GOOGLE SAFE SEARCH + CLARIFAI (Path 2)
# ================================
def google_safe_search(image_path):
    """Check if image is hateful using Google Vision Safe Search"""
    with open(image_path, "rb") as image_file:
        image_content = base64.b64encode(image_file.read()).decode('UTF-8')

    request_body = {
        "requests": [{
            "image": {"content": image_content},
            "features": [{"type": "SAFE_SEARCH_DETECTION"}]
        }]
    }

    response = requests.post(
        f"https://vision.googleapis.com/v1/images:annotate?key={GOOGLE_API_KEY}",
        json=request_body
    )

    if response.status_code == 200:
        result = response.json()
        safe_search = result['responses'][0]['safeSearchAnnotation']
        hate_likelihood = safe_search.get("violence", "VERY_UNLIKELY")
        racy_likelihood = safe_search.get("racy", "VERY_UNLIKELY")
        hateful_levels = ["LIKELY", "VERY_LIKELY"]

        return hate_likelihood in hateful_levels or racy_likelihood in hateful_levels
    return False

def run_in_thread_with_loop(func, *args):
    """Run async function in a new thread with its own event loop"""
    result = [None]
    exception = [None]

    def target():
        try:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            result[0] = loop.run_until_complete(func(*args))
        except Exception as e:
            exception[0] = e
        finally:
            loop.close()

    thread = threading.Thread(target=target)
    thread.start()
    thread.join()

    if exception[0]:
        raise exception[0]
    return result[0]

async def async_clarifai_hate_symbol(image_path):
    """Async version of Clarifai hate symbol detection"""
    with open(image_path, 'rb') as image_file:
        image_data = image_file.read()

    model_url = "https://clarifai.com/clarifai/main/models/hate-symbol-detection"
    model = Model(url=model_url, pat=CLARIFAI_PAT)
    response = model.predict_by_bytes(image_data)
    regions = response.outputs[0].data.regions
    return bool(regions)

def clarifai_hate_symbol(image_path):
    """Check if image contains hate symbols using Clarifai Hate Symbol Model"""
    try:
        return run_in_thread_with_loop(async_clarifai_hate_symbol, image_path)
    except Exception as e:
        print(f"Clarifai error: {e}")
        # Fallback to False if Clarifai fails
        return False

# Alternative approach: Use synchronous Clarifai client
def clarifai_hate_symbol_sync(image_path):
    """Synchronous version using requests directly to Clarifai API"""
    try:
        import requests

        with open(image_path, 'rb') as image_file:
            image_data = image_file.read()

        # Use Clarifai REST API directly instead of the async client
        url = "https://api.clarifai.com/v2/models/hate-symbol-detection/outputs"

        headers = {
            "Authorization": f"Key {CLARIFAI_PAT}",
            "Content-Type": "application/json"
        }

        data = {
            "inputs": [{
                "data": {
                    "image": {
                        "base64": base64.b64encode(image_data).decode('utf-8')
                    }
                }
            }]
        }

        response = requests.post(url, headers=headers, json=data)

        if response.status_code == 200:
            result = response.json()
            regions = result.get('outputs', [{}])[0].get('data', {}).get('regions', [])
            return bool(regions)
        else:
            print(f"Clarifai API error: {response.status_code} - {response.text}")
            return False

    except Exception as e:
        print(f"Clarifai sync error: {e}")
        return False

# ================================
# 6. CLASSIFICATION FUNCTIONS
# ================================
def classify_with_clip(image_path):
    """Path 1: CLIP + Google Vision OCR"""
    ocr_text = detect_meme_text(image_path)
    cleaned_text = remove_special_characters(ocr_text)

    image = Image.open(image_path).convert("RGB")
    inputs = processor(
        text=[cleaned_text if cleaned_text else "neutral"],
        images=image,
        return_tensors="pt",
        truncation=True,
        padding="max_length",
        max_length=77
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        output = clip_model(**inputs)
        predicted_class = torch.argmax(output, dim=1).item()

    label_map = {0: "non-hateful", 1: "hateful"}
    return label_map[predicted_class]

def classify_with_google_clarifai(image_path):
    """Path 2: Google Safe Search + Clarifai"""
    # Use the synchronous version to avoid event loop issues
    if google_safe_search(image_path) or clarifai_hate_symbol_sync(image_path):
        return "hateful"
    return "non-hateful"

# ================================
# 7. FLASK ROUTE
# ================================
@app.route('/predict_image', methods=['POST'])
def predict_image():
    if 'image' not in request.files:
        return jsonify({"error": "No image provided"}), 400

    image = request.files['image']
    save_path = f"temp_{image.filename}"
    image.save(save_path)

    try:
        # Path 1 or Path 2 Decision
        ocr_text = detect_meme_text(save_path)
        if len(ocr_text.strip()) > 5:
            prediction = classify_with_clip(save_path)
            method = "Path 1 (CLIP + Google Vision OCR)"
        else:
            prediction = classify_with_google_clarifai(save_path)
            method = "Path 2 (Google Safe Search + Clarifai)"

        return jsonify({"method": method, "prediction": prediction})

    except Exception as e:
        print(f"Error in prediction: {e}")
        return jsonify({"error": str(e)}), 500

    finally:
        # Clean up the temporary file
        if os.path.exists(save_path):
            os.remove(save_path)

# ================================
# 8. RUN APP
# ================================
if __name__ == '__main__':
    app.run(port=5005)