In [18]:
import os
import scipy.io
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

def generate_density_map(image_shape, points):
    """
    Генерирует density map из точек (координат голов людей).
    """
    density_map = np.zeros(image_shape, dtype=np.float32)
    h, w = image_shape
    
    if len(points) == 0:
        return density_map
    
    # Преобразуем точки в правильный формат
    try:
        points = np.array(points, dtype=np.float32)
        if points.ndim == 1:
            points = points.reshape(-1, 2)
    except:
        return density_map
    
    # Создаём Gaussian-размытие вокруг каждой точки
    for point in points:
        if len(point) < 2:
            continue
            
        x, y = point[0], point[1]
        x = min(max(0, int(round(x))), w-1)
        y = min(max(0, int(round(y))), h-1)
        
        pt2d = np.zeros((h, w), dtype=np.float32)
        pt2d[y, x] = 1
        density_map += cv2.GaussianBlur(pt2d, (15, 15), sigma=4)
    
    return density_map

class ShanghaiTechDataset(Dataset):
    def __init__(self, root_dir, transform=None, train=True):
        self.root_dir = root_dir
        self.transform = transform
        self.train = train
        self.images = []
        self.density_maps = []
        
        img_dir = os.path.join(root_dir, "images")
        gt_dir = os.path.join(root_dir, "ground-truth")
        
        print(f"Путь к изображениям: {img_dir}")
        print(f"Путь к разметке: {gt_dir}")
        
        if not os.path.exists(img_dir):
            raise FileNotFoundError(f"Папка {img_dir} не найдена!")
        if not os.path.exists(gt_dir):
            raise FileNotFoundError(f"Папка {gt_dir} не найдена!")

        for img_file in os.listdir(img_dir):
            if not img_file.endswith(".jpg"):
                continue
                
            img_path = os.path.join(img_dir, img_file)
            mat_file = f"GT_{img_file.replace('.jpg', '.mat')}"
            mat_path = os.path.join(gt_dir, mat_file)
            
            if not os.path.exists(mat_path):
                print(f"Файл разметки {mat_file} не найден!")
                continue
            
            try:
                # Загрузка .mat файла
                mat = scipy.io.loadmat(mat_path)
                points = mat["image_info"][0][0][0][0][0]  # Основной формат
                
                # Альтернативный формат, если основной пустой
                if points.size == 0:
                    points = mat["image_info"][0][0][1][0][0]
                
                # Загрузка изображения
                img = cv2.imread(img_path)
                if img is None:
                    print(f"Ошибка загрузки: {img_path}")
                    continue
                
                # Генерация density map
                density_map = generate_density_map((img.shape[0], img.shape[1]), points)
                
                self.images.append(img)
                self.density_maps.append(density_map)
                
            except Exception as e:
                print(f"Ошибка при обработке {img_file}: {e}")
                continue
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = self.images[idx]
        density = self.density_maps[idx]
        
        if self.transform:
            img = self.transform(img)
        
        img = transforms.functional.to_tensor(img)
        density = torch.from_numpy(density).unsqueeze(0).float()
        
        return img, density

In [25]:
from torchvision.models import vgg16, VGG16_Weights

class CSRNet(nn.Module):
    def __init__(self):
        super(CSRNet, self).__init__()
        # Frontend (VGG-16 без последнего MaxPool)
        vgg = vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
        self.frontend = nn.Sequential(*list(vgg.features.children())[:-1])  # Закрывающая скобка добавлена
        
        # Backend (Dilated Convolutions)
        self.backend = nn.Sequential(
            nn.Conv2d(512, 512, 3, padding=2, dilation=2), nn.ReLU(),
            nn.Conv2d(512, 512, 3, padding=2, dilation=2), nn.ReLU(),
            nn.Conv2d(512, 256, 3, padding=2, dilation=2), nn.ReLU(),
            nn.Conv2d(256, 128, 3, padding=2, dilation=2), nn.ReLU(),
            nn.Conv2d(128, 64, 3, padding=2, dilation=2), nn.ReLU(),
            nn.Conv2d(64, 1, 1)  # Output: density map
        )
    
    def forward(self, x):
        x = self.frontend(x)
        x = self.backend(x)
        return x

In [20]:
class ShanghaiTechDataset(Dataset):
    def __init__(self, root_dir, transform=None, train=True):
        self.root_dir = root_dir
        self.transform = transform
        self.train = train
        self.images = []
        self.density_maps = []
        
        # Исправленные пути - УБРАЛ ДУБЛИРОВАНИЕ train_data/test_data
        img_dir = os.path.join(root_dir, "images")
        gt_dir = os.path.join(root_dir, "ground-truth")
        
        print(f"Проверка пути к изображениям: {img_dir}")  # Для отладки
        print(f"Проверка пути к разметке: {gt_dir}")      # Для отладки
        
        # Проверка существования путей
        if not os.path.exists(img_dir):
            raise FileNotFoundError(f"Папка с изображениями не найдена: {img_dir}")
        if not os.path.exists(gt_dir):
            raise FileNotFoundError(f"Папка с разметкой не найдена: {gt_dir}")

        for img_file in os.listdir(img_dir):
            if img_file.endswith(".jpg"):
                img_path = os.path.join(img_dir, img_file)
                mat_file = f"GT_{img_file.replace('.jpg', '.mat')}"
                mat_path = os.path.join(gt_dir, mat_file)
                
                if not os.path.exists(mat_path):
                    print(f"Предупреждение: файл разметки {mat_file} не найден")
                    continue
                
                # Загрузка .mat-файла
                mat = scipy.io.loadmat(mat_path)
                points = mat["image_info"][0][0][0][0][0]  # Координаты людей
                
                # Генерация density map
                img = cv2.imread(img_path)
                if img is None:
                    print(f"Предупреждение: не удалось загрузить изображение {img_path}")
                    continue
                    
                density_map = generate_density_map((img.shape[0], img.shape[1]), points)
                
                self.images.append(img)
                self.density_maps.append(density_map)
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = self.images[idx]
        density = self.density_maps[idx]
        
        if self.transform:
            img = self.transform(img)
        
        # Нормализация
        img = transforms.functional.to_tensor(img)
        density = torch.from_numpy(density).unsqueeze(0).float()
        
        return img, density

In [21]:
def generate_density_map(image_shape, points):
    """
    Генерирует density map из точек (координат голов людей).
    Теперь с правильным вызовом GaussianBlur.
    """
    density_map = np.zeros(image_shape, dtype=np.float32)
    h, w = image_shape
    
    if len(points) == 0:
        return density_map
    
    # Преобразуем точки в правильный формат
    try:
        points = np.array(points, dtype=np.float32)
        if points.ndim == 1:
            points = points.reshape(-1, 2)
    except:
        return density_map
    
    # Создаём Gaussian-размытие вокруг каждой точки
    for point in points:
        if len(point) < 2:
            continue
            
        x, y = point[0], point[1]
        x = min(max(0, int(round(x))), w-1)
        y = min(max(0, int(round(y))), h-1)
        
        pt2d = np.zeros((h, w), dtype=np.float32)
        pt2d[y, x] = 1
        
        # Исправленный вызов GaussianBlur:
        # Теперь явно указываем sigmaX (sigmaY будет равен sigmaX, если не указан)
        density_map += cv2.GaussianBlur(pt2d, (15, 15), sigmaX=4)
    
    return density_map


class ShanghaiTechDataset(Dataset):
    def __init__(self, root_dir, transform=None, train=True):
        self.root_dir = root_dir
        self.transform = transform
        self.train = train
        self.images = []
        self.density_maps = []
        
        img_dir = os.path.join(root_dir, "images")
        gt_dir = os.path.join(root_dir, "ground-truth")
        
        print(f"Проверка пути к изображениям: {img_dir}")
        print(f"Проверка пути к разметке: {gt_dir}")
        
        if not os.path.exists(img_dir):
            raise FileNotFoundError(f"Папка с изображениями не найдена: {img_dir}")
        if not os.path.exists(gt_dir):
            raise FileNotFoundError(f"Папка с разметкой не найдена: {gt_dir}")

        for img_file in os.listdir(img_dir):
            if img_file.endswith(".jpg"):
                img_path = os.path.join(img_dir, img_file)
                mat_file = f"GT_{img_file.replace('.jpg', '.mat')}"
                mat_path = os.path.join(gt_dir, mat_file)
                
                if not os.path.exists(mat_path):
                    print(f"Предупреждение: файл разметки {mat_file} не найден")
                    continue
                
                try:
                    # Загрузка .mat-файла
                    mat = scipy.io.loadmat(mat_path)
                    points = mat["image_info"][0][0][0][0][0]  # Координаты людей
                    
                    # Альтернативный вариант чтения точек
                    if points.size == 0:
                        points = mat["image_info"][0][0][1][0][0]
                    
                    # Загрузка изображения
                    img = cv2.imread(img_path)
                    if img is None:
                        print(f"Предупреждение: не удалось загрузить изображение {img_path}")
                        continue
                        
                    # Генерация density map с исправленной функцией
                    density_map = generate_density_map((img.shape[0], img.shape[1]), points)
                    
                    self.images.append(img)
                    self.density_maps.append(density_map)
                    
                except Exception as e:
                    print(f"Ошибка при обработке {img_file}: {str(e)}")
                    continue
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = self.images[idx]
        density = self.density_maps[idx]
        
        if self.transform:
            img = self.transform(img)
        
        img = transforms.functional.to_tensor(img)
        density = torch.from_numpy(density).unsqueeze(0).float()
        
        return img, density

In [22]:
# Пути к данным
train_data_path = os.path.join("data", "part_A", "train_data")
test_data_path = os.path.join("data", "part_A", "test_data")

# Трансформации для изображений
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),  # Приводим все изображения к одному размеру
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Нормализация для VGG
])

# Создаем датасеты
train_dataset = ShanghaiTechDataset(train_data_path, transform, train=True)
test_dataset = ShanghaiTechDataset(test_data_path, transform, train=False)

# Создаем DataLoader'ы
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

Проверка пути к изображениям: data\part_A\train_data\images
Проверка пути к разметке: data\part_A\train_data\ground-truth
Проверка пути к изображениям: data\part_A\test_data\images
Проверка пути к разметке: data\part_A\test_data\ground-truth


In [27]:
# Инициализация модели
print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CSRNet().to(device)

# Функция потерь и оптимизатор
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

False
Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to C:\Users\Egor/.cache\torch\hub\checkpoints\vgg16-397923af.pth


1.9%


KeyboardInterrupt: 

In [None]:
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for i, (images, targets) in enumerate(train_loader):
        images = images.to(device)
        targets = targets.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, targets)
        
        # Backward pass и оптимизация
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    # Выводим статистику
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}")

In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    total_mae = 0.0
    total_mse = 0.0
    
    with torch.no_grad():
        for images, targets in test_loader:
            images = images.to(device)
            targets = targets.to(device)
            
            outputs = model(images)
            
            # Подсчет людей
            pred_count = torch.sum(outputs).item()
            true_count = torch.sum(targets).item()
            
            # Вычисление метрик
            total_mae += abs(pred_count - true_count)
            total_mse += (pred_count - true_count)**2
    
    mae = total_mae / len(test_loader)
    mse = (total_mse / len(test_loader))**0.5
    
    print(f"\nTest Results:")
    print(f"MAE: {mae:.2f}")
    print(f"MSE: {mse:.2f}")
    return mae, mse

# Запуск оценки
mae, mse = evaluate_model(model, test_loader)

In [None]:
import matplotlib.pyplot as plt

def visualize_results(model, test_loader, num_samples=3):
    model.eval()
    with torch.no_grad():
        for i, (images, targets) in enumerate(test_loader):
            if i >= num_samples:
                break
                
            images = images.to(device)
            outputs = model(images)
            
            # Денормализация изображения
            img = images[0].cpu().permute(1, 2, 0).numpy()
            img = (img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])) * 255
            img = np.clip(img, 0, 255).astype(np.uint8)
            
            # Получаем карты плотности
            pred_density = outputs[0][0].cpu().numpy()
            true_density = targets[0][0].cpu().numpy()
            
            # Подсчет людей
            pred_count = np.sum(pred_density)
            true_count = np.sum(true_density)
            
            # Визуализация
            plt.figure(figsize=(15, 5))
            
            plt.subplot(1, 3, 1)
            plt.imshow(img)
            plt.title("Original Image")
            plt.axis('off')
            
            plt.subplot(1, 3, 2)
            plt.imshow(true_density, cmap='jet')
            plt.title(f"True Density (Count: {true_count:.1f})")
            plt.axis('off')
            
            plt.subplot(1, 3, 3)
            plt.imshow(pred_density, cmap='jet')
            plt.title(f"Predicted Density (Count: {pred_count:.1f})")
            plt.axis('off')
            
            plt.show()

# Запуск визуализации
visualize_results(model, test_loader)

In [None]:
torch.save(model.state_dict(), "csrnet_model.pth")

In [None]:
loaded_model = CSRNet().to(device)
loaded_model.load_state_dict(torch.load("csrnet_model.pth"))
loaded_model.eval()