In [14]:
import torch
import clip
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
from PIL import Image
import os
import pandas as pd
import json

In [15]:
import cv2
import torch
import clip
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)


In [16]:
import os
print("Current Working Directory:", os.getcwd())


Current Working Directory: d:\Projects\Patient_montioring_system


In [48]:
class CustomDataset(Dataset):
    def __init__(self, image_dir, annotation_file, preprocess, limit=70):
        """
        Args:
            image_dir (str): Path to the directory containing images.
            annotation_file (str): Path to the CSV file with image-caption mappings.
            preprocess (callable): Preprocessing function for images.
            limit (int): Maximum number of entries to include in the dataset.
        """
        self.image_dir = image_dir
        self.annotations = pd.read_csv(annotation_file).head(limit)  # Limit dataset to first 'limit' rows
        self.preprocess = preprocess

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        # Get the filename and label from the CSV file
        img_info = self.annotations.iloc[idx]
        img_filename = img_info['filename']
        caption = img_info['label']

        # Construct the full path to the image file
        img_path = os.path.join(self.image_dir, img_filename)

        # Check if the file exists (optional safety check)
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")

        # Load and preprocess the image
        image = self.preprocess(Image.open(img_path).convert("RGB"))

        # Tokenize the caption using CLIP's tokenizer
        text = clip.tokenize([caption])[0]

        return image, text

In [49]:
image_dir = "d:/Projects/Patient_montioring_system/Train/train"
if not os.path.exists(image_dir):
    print(f"File not found: {image_dir}")

annotation_file = "d:/Projects/Patient_montioring_system/Train/Training_set.csv" 

In [50]:
dataset = CustomDataset(image_dir=image_dir, annotation_file=annotation_file, preprocess=preprocess)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

In [51]:
for images, texts in dataloader:
    print(images)
    print(texts)
    print("Image batch shape:", images.shape)
    print("Text batch shape:", texts.shape)
    break  # Just to show one batch

tensor([[[[ 1.6822,  1.6676,  1.6676,  ...,  1.7406,  1.7406,  1.7552],
          [ 1.6822,  1.6676,  1.6676,  ...,  1.7406,  1.7406,  1.7552],
          [ 1.6822,  1.6676,  1.6676,  ...,  1.7406,  1.7406,  1.7552],
          ...,
          [ 1.9303,  1.9303,  1.9303,  ...,  1.9303,  1.9303,  1.9303],
          [ 1.9303,  1.9303,  1.9303,  ...,  1.9303,  1.9303,  1.9303],
          [ 1.9303,  1.9303,  1.9303,  ...,  1.9303,  1.9303,  1.9303]],

         [[ 1.8198,  1.8047,  1.8047,  ...,  1.8798,  1.8798,  1.8948],
          [ 1.8198,  1.8047,  1.8047,  ...,  1.8798,  1.8798,  1.8948],
          [ 1.8198,  1.8047,  1.8047,  ...,  1.8798,  1.8798,  1.8948],
          ...,
          [ 2.0749,  2.0749,  2.0749,  ...,  2.0749,  2.0749,  2.0749],
          [ 2.0749,  2.0749,  2.0749,  ...,  2.0749,  2.0749,  2.0749],
          [ 2.0749,  2.0749,  2.0749,  ...,  2.0749,  2.0749,  2.0749]],

         [[ 1.9042,  1.8899,  1.8899,  ...,  1.9610,  1.9610,  1.9753],
          [ 1.9042,  1.8899,  

In [52]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()

In [53]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for images, texts in dataloader:
        images, texts = images.to(device), texts.to(device)

        # Forward pass
        logits_per_image, logits_per_text = model(images, texts)
        ground_truth = torch.arange(len(images), dtype=torch.long, device=device)
        
        # Compute loss
        loss = (loss_fn(logits_per_image, ground_truth) + loss_fn(logits_per_text, ground_truth)) / 2

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(dataloader):.4f}")

# Save the fine-tuned model
torch.save(model.state_dict(), "fine_tuned_clip_vit_b32.pth")

Epoch [1/10], Loss: 0.6164
Epoch [2/10], Loss: 0.2355
Epoch [3/10], Loss: 0.0786
Epoch [4/10], Loss: 0.0778
Epoch [5/10], Loss: 0.0496
Epoch [6/10], Loss: 0.0712
Epoch [7/10], Loss: 0.1858
Epoch [8/10], Loss: 0.2821
Epoch [9/10], Loss: 0.1037
Epoch [10/10], Loss: 0.2618


In [56]:
model.load_state_dict(torch.load("fine_tuned_clip_vit_b32.pth"))
model.eval()

text_prompts = ["using_laptop", "hugging","drinking","texting"]
text_tokens = clip.tokenize(text_prompts).to(device)

# Start webcam capture
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Preprocess the frame for CLIP model input
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image_input = preprocess(pil_image).unsqueeze(0).to(device)

    with torch.no_grad():
        # Encode image and text features
        image_features = model.encode_image(image_input)
        text_features = model.encode_text(text_tokens)

        # Compute similarity scores and probabilities
        logits_per_image = (image_features @ text_features.T).softmax(dim=-1)
    
    probs = logits_per_image.cpu().numpy()[0]

    # Display predictions on the video feed
    for i, prompt in enumerate(text_prompts):
        cv2.putText(frame, f"{prompt}: {probs[i]:.2f}", (10, 30 + i * 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
    
    cv2.imshow("CLIP Webcam", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
