In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

#import os
#for dirname, _, filenames in os.walk('/kaggle/input'):
#    for filename in filenames:
 #       print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from torchvision.ops import FeaturePyramidNetwork 
import torch.nn.functional as F
import timm 
import numpy as np
import os
import glob
from PIL import Image
import matplotlib.pyplot as plt
from collections import OrderedDict
import random

In [None]:
class SpatialAttentionModule(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttentionModule, self).__init__()
        self.conv = nn.Conv2d(2, 1, kernel_size, padding=kernel_size//2, bias=False)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        attention_map = self.conv(torch.cat([avg_out, max_out], dim=1))
        return x * self.sigmoid(attention_map)

class DilatedDecoder(nn.Module):
    def __init__(self, in_channels, decoder_channels):
        super(DilatedDecoder, self).__init__()
        self.decoder = nn.Sequential(
            nn.Conv2d(in_channels, decoder_channels[0], 3, padding=2, dilation=2), 
            nn.ReLU(inplace=True),
            nn.Conv2d(decoder_channels[0], decoder_channels[1], 3, padding=2, dilation=2), 
            nn.ReLU(inplace=True),
            nn.Conv2d(decoder_channels[1], decoder_channels[2], 3, padding=2, dilation=2), 
            nn.ReLU(inplace=True),
            nn.Conv2d(decoder_channels[2], 1, 1)
)
    def forward(self, x): 
        return self.decoder(x)

class DA_Net(nn.Module):
    def __init__(self, backbone_name='efficientnet_b4', pretrained=True):
        super(DA_Net, self).__init__()
        self.backbone = timm.create_model(backbone_name, pretrained=pretrained, features_only=True, out_indices=(1, 2, 3))
        backbone_channels = self.backbone.feature_info.channels() 
        fpn_out_channels = 256 
        self.fpn = FeaturePyramidNetwork(in_channels_list=backbone_channels, out_channels=fpn_out_channels,)
        self.attention = SpatialAttentionModule()
        self.decoder = DilatedDecoder( in_channels=fpn_out_channels, decoder_channels=[128, 64, 32])

    def forward(self, x): 
        features = self.backbone(x)
        fpn_input = OrderedDict()
        for i, feature_map in enumerate(features):
            fpn_input[f'feat{i}'] = feature_map
        
        fpn_features = self.fpn(fpn_input)
        fpn_highest_res_output = list(fpn_features.values())[0] 
        attention_features = self.attention(fpn_highest_res_output)
        density_map_raw = self.decoder(attention_features)
        density_map = F.relu(density_map_raw) 
        return density_map

def get_my_model(model_name='DA_Net', pretrained_backbone=True):
    if model_name == 'DA_Net':
        model = DA_Net(backbone_name='efficientnet_b5', pretrained=pretrained_backbone)
    else:
        raise ValueError(f"Unknown model name: {model_name}")
    return model

def mse_count_mae_loss_pytorch(y_pred, y_true, count_loss_weight=0.05):
    mse = torch.mean((y_true - y_pred)**2)
    true_count = torch.sum(y_true, dim=(1, 2, 3))
    pred_count = torch.sum(y_pred, dim=(1, 2, 3))
    count_mae = torch.mean(torch.abs(true_count - pred_count))
    return mse + count_loss_weight * count_mae


In [None]:
class CrowdCountingDataset(Dataset):
    def __init__(self, image_dir, label_dir, image_transform=None,image_target_size=(256, 256), density_target_size=(128, 128),enable_horizontal_flip = False):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.image_files = sorted(glob.glob(os.path.join(image_dir, "*.jpg"))) 
        self.label_files = [
            os.path.join(label_dir, os.path.basename(f).replace('.jpg', '.npy'))
            for f in self.image_files
        ]
        valid_indices = [i for i, lf in enumerate(self.label_files) if os.path.exists(lf)]
        self.image_files = [self.image_files[i] for i in valid_indices]
        self.label_files = [self.label_files[i] for i in valid_indices]
        self.image_transform = image_transform
        self.image_target_size = image_target_size
        self.density_target_size = density_target_size
        self.enable_horizontal_flip = enable_horizontal_flip
    
    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = self.image_files[idx]
        label_path = self.label_files[idx]
        image = Image.open(image_path).convert("RGB")
        density_np = np.load(label_path).astype(np.float32)
        original_sum = np.sum(density_np)

        if self.enable_horizontal_flip and random.random() < 0.5: 
            image = T.functional.hflip(image) 
            density_np = np.flip(density_np, axis=1).copy()             
        image = image.resize(self.image_target_size, Image.BILINEAR) 
        density_pil = Image.fromarray(density_np)
        density_resized_pil = density_pil.resize((self.density_target_size[1], self.density_target_size[0]), Image.BILINEAR )
        density_resized_np = np.array(density_resized_pil)
        resized_sum = np.sum(density_resized_np)
        if resized_sum > 1e-6: 
            density_resized_np = density_resized_np * (original_sum / resized_sum)
        elif original_sum > 1e-6: 
             density_resized_np = density_resized_np * (original_sum / (resized_sum + 1e-9))
        
        density_resized_np = np.maximum(0, density_resized_np)
        if self.image_transform:
            image_tensor = self.image_transform(image) 
        else:
            image_tensor = T.ToTensor()(image) 
        density_tensor = torch.from_numpy(density_resized_np).unsqueeze(0) 
        return image_tensor, density_tensor

In [None]:

TRAIN_IMG_DIR = "/kaggle/input/dataset-crowd/processed_data/train/images"
TRAIN_LBL_DIR = "/kaggle/input/dataset-crowd/processed_data/train/density_maps"

In [None]:
IMAGE_INPUT_SIZE = (256, 256) 
DENSITY_MAP_GT_SIZE = (128, 128) 
BATCH_SIZE = 8 
EPOCHS = 100
LEARNING_RATE = 5e-5
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

image_transforms = T.Compose([
    T.ToTensor(), 
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = CrowdCountingDataset(
    TRAIN_IMG_DIR, TRAIN_LBL_DIR, 
    image_transform=image_transforms,
    image_target_size=IMAGE_INPUT_SIZE, 
    density_target_size=DENSITY_MAP_GT_SIZE,
    enable_horizontal_flip=True
)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True if DEVICE.type == 'cuda' else False)

model = get_my_model(model_name='DA_Net', pretrained_backbone=True).to(DEVICE)
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE,weight_decay=WEIGHT_DECAY)
criterion = mse_count_mae_loss_pytorch

print("Starting training")
for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    for batch_idx, (images, gt_density_maps) in enumerate(train_loader):
        images = images.to(DEVICE)
        gt_density_maps = gt_density_maps.to(DEVICE) 
        optimizer.zero_grad()
        pred_density_maps_raw = model(images) 
        if pred_density_maps_raw.shape[2:] != DENSITY_MAP_GT_SIZE:
            pred_density_maps = F.interpolate(
                pred_density_maps_raw, 
                size=DENSITY_MAP_GT_SIZE, 
                mode='bilinear', 
                align_corners=False
            )
        else:
            pred_density_maps = pred_density_maps_raw

        loss = criterion(pred_density_maps, gt_density_maps)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if (batch_idx + 1) % 10 == 0: 
            print(f"Epoch [{epoch+1}/{EPOCHS}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{EPOCHS}] completed. Average Training Loss: {epoch_loss:.4f}")

print("Training finished.")
torch.save(model.state_dict(), 'da_net_model.pth')
print("Model saved to da_net_trained.pth")