<a href="https://colab.research.google.com/github/arshiii08/AI-Surveillance-using-DeepPose-estimation/blob/main/Minor_Project_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import json
import torch
from PIL import Image
import clip  # OpenAI CLIP package
from tqdm import tqdm  # For progress bar with ETA
from collections import Counter

# -------------------------

# Setup: Paths and Device

# -------------------------

image\_dir = 'C:/Users/devel/Desktop/DensePose/train2014/train2014'                # Folder containing your images (update as needed)
output\_file = 'train2014\_activity\_annotated.json'  # Output JSON file for pseudo-labels

device = "cuda" if torch.cuda.is\_available() else "cpu"

# -------------------------

# Load the CLIP Model

# -------------------------

# We use the "ViT-B/32" CLIP model along with its preprocessing function.

model, preprocess = clip.load("ViT-B/32", device=device)
model.eval()

# -------------------------

# Define Candidate Activity Labels

# -------------------------

# Update this list with your desired activity categories.

activity\_labels = \[
"standing",
"sitting",
"walking",
"running",
"jumping",
"lying down",
"bending"
]

# Create text prompts for zero-shot classification

text\_prompts = \[f"a photo of a person {label}" for label in activity\_labels]
text\_tokens = clip.tokenize(text\_prompts).to(device)

# Create person count detection prompts

person\_count\_prompts = \[
"a photo of a single person",
"a photo of multiple people",
"a photo with no people"
]
person\_count\_tokens = clip.tokenize(person\_count\_prompts).to(device)

# Create activity clarity prompts

activity\_clarity\_prompts = \[
"a photo with a clearly identifiable human activity",
"a photo with an ambiguous or unclear human activity"
]
activity\_clarity\_tokens = clip.tokenize(activity\_clarity\_prompts).to(device)

# -------------------------

# Initialize Annotations Dictionary and Counters

# -------------------------

annotations = {}
activity\_distribution = Counter()
total\_images = 0
valid\_images = 0

# -------------------------

# Get List of Image Files

# -------------------------

image\_files = \[f for f in os.listdir(image\_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp'))]

# -------------------------

# Loop Over Images to Generate Pseudo-Labels with Progress Bar

# -------------------------

for img\_name in tqdm(image\_files, desc="Processing images", unit="image"):
img\_path = os.path.join(image\_dir, img\_name)
try:
image = Image.open(img\_path).convert("RGB")
total\_images += 1
except Exception as e:
print(f"\nError opening {img\_path}: {e}")
continue

```
# Preprocess the image for CLIP
image_input = preprocess(image).unsqueeze(0).to(device)

with torch.no_grad():
    # Compute image features and normalize them
    image_features = model.encode_image(image_input)
    image_features /= image_features.norm(dim=-1, keepdim=True)

    # 1. Check if the image contains a single person
    person_count_features = model.encode_text(person_count_tokens)
    person_count_features /= person_count_features.norm(dim=-1, keepdim=True)
    person_count_similarity = (100.0 * image_features @ person_count_features.T).softmax(dim=-1)
    person_count_idx = person_count_similarity.argmax().item()

    # Skip if not a single person
    if person_count_idx != 0:  # 0 corresponds to "a photo of a single person"
        continue

    # 2. Check if the activity is clearly identifiable
    clarity_features = model.encode_text(activity_clarity_tokens)
    clarity_features /= clarity_features.norm(dim=-1, keepdim=True)
    clarity_similarity = (100.0 * image_features @ clarity_features.T).softmax(dim=-1)
    clarity_idx = clarity_similarity.argmax().item()

    # Skip if activity is ambiguous
    if clarity_idx != 0:  # 0 corresponds to "a photo with a clearly identifiable human activity"
        continue

    # 3. Classify the activity for valid images
    text_features = model.encode_text(text_tokens)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    activity_similarity = (100.0 * image_features @ text_features.T).softmax(dim=-1)

    # Get confidence scores for all activities
    confidence_scores = activity_similarity[0].cpu().numpy()
    best_idx = activity_similarity.argmax().item()
    predicted_activity = activity_labels[best_idx]
    confidence = float(confidence_scores[best_idx])

    # Only consider as valid if confidence is above threshold
    if confidence < 0.5:  # You can adjust this threshold
        continue

    # This is a valid image with a clearly identifiable activity
    valid_images += 1
    activity_distribution[predicted_activity] += 1

# Save the pseudo-label for valid images in the annotations dictionary
annotations[img_name] = {
    'pseudo_label': predicted_activity,
}
```

# -------------------------

# Save the Pseudo-Labels to a JSON File

# -------------------------

with open(output\_file, 'w') as f:
json.dump(annotations, f, indent=4)

# -------------------------

# Display Summary Statistics

# -------------------------

print(f"\nAnnotation complete. Results saved to {output\_file}")
print(f"Total images processed: {total\_images}")
print(f"Valid images (single person with clear activity): {valid\_images}")
print("\nActivity Distribution:")
print("-" \* 40)

# Sort activities by frequency (most common first)

for activity, count in activity\_distribution.most\_common():
percentage = (count / valid\_images) \* 100 if valid\_images > 0 else 0
print(f"{activity}: {count} images ({percentage:.1f}%)")

In [None]:
import json
import re

# Load the DensePose file

densepose\_file = 'C:/Users/devel/Desktop/DensePose/train2014\_densepose\_annotations.json'
with open(densepose\_file, 'r') as f:
densepose\_data = json.load(f)

# Load the activity-annotated file

activity\_file = 'C:/Users/devel/Desktop/DensePose/train2014\_activity\_annotated.json'
with open(activity\_file, 'r') as f:
activity\_data = json.load(f)

# Extract numeric image\_ids from activity\_data keys (e.g., 'COCO\_train2014\_000000401169.jpg' → 401169)

activity\_image\_ids = set()
for key in activity\_data.keys():
\# Use regex to extract the 12-digit number at the end of the filename (before .jpg)
match = re.search(r'\_(\d{12}).jpg', key)
if match:
image\_id = int(match.group(1))  # Convert to integer
activity\_image\_ids.add(image\_id)

# Debugging: Print the extracted image\_ids

print(f"First few numeric image\_ids: {list(activity\_image\_ids)\[:5]}")

# Extract UV maps from the DensePose annotations for images in the activity-annotated file

uv\_maps = \[]
for annotation in densepose\_data\['annotations']:
if 'dp\_U' in annotation and 'dp\_V' in annotation and 'dp\_I' in annotation:
\# Check if the image\_id is in the activity-annotated image\_ids
if annotation\['image\_id'] in activity\_image\_ids:
uv\_map = {
'image\_id': annotation\['image\_id'],
'dp\_U': annotation\['dp\_U'],
'dp\_V': annotation\['dp\_V'],
'dp\_I': annotation\['dp\_I']
}
uv\_maps.append(uv\_map)

# Save the extracted UV maps to a new file

uv\_maps\_file = 'C:/Users/devel/Desktop/DensePose/train2014\_extracted.json'
with open(uv\_maps\_file, 'w') as f:
json.dump(uv\_maps, f)

print(f"Filtered {len(uv\_maps)} UV maps and saved them to {uv\_maps\_file}.")

In [None]:
import json
import numpy as np
from tqdm import tqdm

def augment\_uv\_map(uv\_map):
rotation\_angle = np.random.uniform(-30, 30)
cos\_theta = np.cos(np.radians(rotation\_angle))
sin\_theta = np.sin(np.radians(rotation\_angle))

```
u = np.array(uv_map['dp_U'])
v = np.array(uv_map['dp_V'])

u_rotated = cos_theta * u - sin_theta * v
v_rotated = sin_theta * u + cos_theta * v

u_rotated = np.clip(u_rotated, 0, 1)
v_rotated = np.clip(v_rotated, 0, 1)

return {
    'image_id': uv_map['image_id'],
    'dp_U': u_rotated.tolist(),
    'dp_V': v_rotated.tolist(),
    'dp_I': uv_map['dp_I']
}
```

# Hardcoded file paths

input\_file = 'C:/Users/devel/Desktop/DensePose/train2014\_extracted.json'
output\_file = 'C:/Users/devel/Desktop/DensePose/train2014\_extracted\_augmented.json'

# Load input data

print(f"Loading input file: {input\_file}")
with open(input\_file, 'r') as f:
uv\_maps = json.load(f)

# Process with progress tracking

print("Augmenting UV maps...")
augmented\_uv\_maps = \[]
for uv\_map in tqdm(uv\_maps, desc="Processing", unit=" map"):
augmented = augment\_uv\_map(uv\_map)
augmented\_uv\_maps.append(augmented)

# Save output

print(f"\nSaving augmented data to: {output\_file}")
with open(output\_file, 'w') as f:
json.dump(augmented\_uv\_maps, f)

print(f"Completed! Processed {len(augmented\_uv\_maps)} UV maps.")

In [None]:
import os
import json
import cv2
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random\_split
from torchvision import transforms, models
from tqdm import tqdm

\########################################

# Dataset Definition with Optional DensePose Mask

\########################################

class ActivityDensePoseDataset(Dataset):
def **init**(self, images\_dir, activity\_annotation\_path, densepose\_annotation\_path, transform=None, use\_densepose\_mask=False):
"""
Args:
images\_dir (str): Directory containing the images.
activity\_annotation\_path (str): Path to train2014\_activity\_annotated.json.
densepose\_annotation\_path (str): Path to train2014\_extracted\_augmented.json.
transform (callable, optional): Optional transform to apply to images.
use\_densepose\_mask (bool): When True, generates a DensePose mask to stack with image.
"""
self.images\_dir = images\_dir
self.transform = transform
self.use\_densepose\_mask = use\_densepose\_mask

```
    # Load activity annotations (mapping: image filename → { "pseudo_label": ... } )
    with open(activity_annotation_path, 'r') as f:
        self.activity_annotations = json.load(f)

    # Load DensePose annotations (list of dictionaries)
    with open(densepose_annotation_path, 'r') as f:
        densepose_list = json.load(f)

    # Build mapping: image filename → list of DensePose annotations.
    self.densepose_annotations = {}
    for ann in densepose_list:
        # If annotation is stored as a string, convert it.
        if isinstance(ann, str):
            try:
                ann = json.loads(ann.strip())
            except Exception:
                continue
        try:
            image_id = int(ann["image_id"])
        except Exception:
            continue
        # Format image filename (zero-padded to 12 digits).
        filename = f"COCO_train2014_{image_id:012d}.jpg"
        if filename not in self.densepose_annotations:
            self.densepose_annotations[filename] = []
        self.densepose_annotations[filename].append(ann)

    # List of image filenames from the activity annotations.
    self.image_files = list(self.activity_annotations.keys())

    # Mapping of activity text to integer label.
    self.label_mapping = {
        "standing": 0,
        "sitting": 1,
        "lying down": 2,
        "bending": 3,
        "jumping": 4,
        "walking": 5,
        "running": 6
    }

def __len__(self):
    return len(self.image_files)

def __getitem__(self, idx):
    image_file = self.image_files[idx]
    image_path = os.path.join(self.images_dir, image_file)
    image = Image.open(image_path).convert("RGB")
    if self.transform:
        image = self.transform(image)
    activity_info = self.activity_annotations[image_file]
    pseudo_label = activity_info.get("pseudo_label", "").lower()
    label = self.label_mapping.get(pseudo_label, -1)
    densepose_data = self.densepose_annotations.get(image_file, [])
    if self.use_densepose_mask:
        mask = generate_densepose_mask(densepose_data, output_size=(image.shape[1], image.shape[2]))
        # Concatenate the mask (1 channel) with the image (3 channels) to form a 4-channel tensor.
        image = torch.cat([image, mask], dim=0)
    return {"image": image, "label": label, "densepose": densepose_data}
```

\########################################

# DensePose Mask Generator Function

\########################################

def generate\_densepose\_mask(densepose\_data, output\_size=(224,224)):
"""
Generate a single-channel mask from DensePose annotations.
For each annotated point (with normalized coordinates), mark a pixel with 1.0,
then apply Gaussian blur to smooth.
"""
mask = np.zeros(output\_size, dtype=np.float32)
for ann in densepose\_data:
dp\_U = ann.get('dp\_U', \[])
dp\_V = ann.get('dp\_V', \[])
for u, v in zip(dp\_U, dp\_V):
\# Map normalized coordinates \[0,1] to pixel coordinates.
x = int(u \* output\_size\[1])
y = int(v \* output\_size\[0])
x = np.clip(x, 0, output\_size\[1]-1)
y = np.clip(y, 0, output\_size\[0]-1)
mask\[y, x] = 1.0
mask = cv2.GaussianBlur(mask, (7, 7), 0)
mask = torch.from\_numpy(mask).unsqueeze(0)  # Shape: (1, H, W)
return mask

\########################################

# Custom Collate Function

\########################################

def custom\_collate(batch):
\# Stack image tensors and labels; keep densepose lists as-is.
images = torch.stack(\[item\["image"] for item in batch], dim=0)
labels = torch.tensor(\[item\["label"] for item in batch])
densepose = \[item\["densepose"] for item in batch]
return {"image": images, "label": labels, "densepose": densepose}

\########################################

# Phase 1: Training with Raw Images (3 channels)

\########################################

def train\_raw\_model(dataset, num\_epochs=5, batch\_size=16, learning\_rate=1e-3, val\_split=0.2):
\# Split dataset into training and validation sets.
dataset\_size = len(dataset)
val\_size = int(val\_split \* dataset\_size)
train\_size = dataset\_size - val\_size
train\_dataset, val\_dataset = random\_split(dataset, \[train\_size, val\_size])

```
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=custom_collate)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=custom_collate)

# Load pretrained ResNet-50 using the updated API.
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model.fc = nn.Linear(model.fc.in_features, 7)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} Training", leave=False):
        images = batch["image"].to(device)
        labels = batch["label"].to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / train_size

    model.eval()
    correct = 0
    total = 0
    for batch in tqdm(val_loader, desc=f"Epoch {epoch+1} Validating", leave=False):
        images = batch["image"].to(device)
        labels = batch["label"].to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()
    val_acc = correct / total

    print(f"Epoch {epoch+1}/{num_epochs}: Train Loss = {epoch_loss:.4f}, Val Acc = {val_acc:.4f}")

return model
```

\########################################

# Phase 2: Transfer Learning using DensePose Data

\########################################

def modify\_model\_for\_densepose(model):
"""
Modify the first conv layer to accept 4-channel input.
Initialize the extra channel weights as the average of the first three channels.
"""
old\_conv = model.conv1
new\_conv = nn.Conv2d(4,
old\_conv.out\_channels,
kernel\_size=old\_conv.kernel\_size,
stride=old\_conv.stride,
padding=old\_conv.padding,
bias=old\_conv.bias is not None)
with torch.no\_grad():
new\_conv.weight\[:, :3] = old\_conv.weight
new\_conv.weight\[:, 3] = old\_conv.weight.mean(dim=1)
model.conv1 = new\_conv
return model

def fine\_tune\_with\_densepose(dataset, base\_model, num\_epochs=5, batch\_size=16, learning\_rate=1e-4, val\_split=0.2):
dataset\_size = len(dataset)
val\_size = int(val\_split \* dataset\_size)
train\_size = dataset\_size - val\_size
train\_dataset, val\_dataset = random\_split(dataset, \[train\_size, val\_size])

```
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=custom_collate)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=custom_collate)

# Modify the base model to accept 4-channel input.
model = modify_model_for_densepose(base_model)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} DensePose Training", leave=False):
        images = batch["image"].to(device)  # 4-channel input now.
        labels = batch["label"].to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / train_size

    model.eval()
    correct = 0
    total = 0
    for batch in tqdm(val_loader, desc=f"Epoch {epoch+1} DensePose Validating", leave=False):
        images = batch["image"].to(device)
        labels = batch["label"].to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()
    val_acc = correct / total
    print(f"(DensePose FT) Epoch {epoch+1}/{num_epochs}: Train Loss = {epoch_loss:.4f}, Val Acc = {val_acc:.4f}")

return model
```

\########################################

# Main Execution

\########################################

if **name** == "**main**":
\# Update your paths.
images\_directory = "C:/Users/devel/Desktop/DensePose/train2014/train2014"
activity\_json = "C:/Users/devel/Desktop/DensePose/train2014\_activity\_annotated.json"
densepose\_json = "C:/Users/devel/Desktop/DensePose/train2014\_extracted\_augmented.json"

```
# Define a transformation (resize to 224x224 and convert to tensor).
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# --- Phase 1: Train with Raw Images ---
print("=== Phase 1: Training on Raw Images ===")
raw_dataset = ActivityDensePoseDataset(images_directory, activity_json, densepose_json,
                                         transform=transform, use_densepose_mask=False)
raw_model = train_raw_model(raw_dataset, num_epochs=5, batch_size=16, learning_rate=1e-3)

# --- Phase 2: Fine-tune with DensePose Data ---
print("=== Phase 2: Fine-tuning with DensePose Data ===")
densepose_dataset = ActivityDensePoseDataset(images_directory, activity_json, densepose_json,
                                               transform=transform, use_densepose_mask=True)
densepose_model = fine_tune_with_densepose(densepose_dataset, base_model=raw_model,
                                           num_epochs=5, batch_size=16, learning_rate=1e-4)

# Optionally, save the trained models.
torch.save(raw_model.state_dict(), "raw_model.pth")
torch.save(densepose_model.state_dict(), "densepose_model.pth")
```