### Teste de hipotese

Esse notebook tem por objetivo detectar varios pontos e gerar um descritor otimo que seja resitente a variacoes de transformacoes afins e pequenas transformacoes projetivas, para isso temos:

-- BaseFeatures para extrair informacoes equivariantes (num_channels,dim_first,dim_second,dim_third).

-- SingularPoints lida com escala , e extrai as features consolidadas, em dim_third caracteristicas distintas, orientacao computadas além da lista de pontos.

-- Computa a funcao de perda entre os mapas de orientacao e feature e os pontos que colidiram

In [1]:
# !git clone -b siamese https://github.com/wagner1986/singular-points.git singular_points
# !pip install kornia e2cnn

# !pwd
# %cd /content/singular_points
# !pwd

In [2]:
import torch
from e2cnn import gspaces
from e2cnn import nn as enn    #the equivariant layer we need to build the model
from torch import nn
import numpy as np

In [3]:
from typing import List, Optional

import torch
import torch.nn.functional as F
from torch import nn
from typing_extensions import TypedDict

from kornia.core import Module, Tensor, concatenate
from kornia.filters import SpatialGradient
from kornia.geometry.transform import pyrdown
from kornia.utils.helpers import map_location_to_cpu

from kornia.feature.scale_space_detector import get_default_detector_config, MultiResolutionDetector,Detector_config


class KeyNet_conf(TypedDict):
    num_filters: int
    num_levels: int
    kernel_size: int
    Detector_conf: Detector_config


keynet_default_config: KeyNet_conf = {
    # Key.Net Model
    'num_filters': 8,
    'num_levels': 3,
    'kernel_size': 5,
    # Extraction Parameters
    'Detector_conf': get_default_detector_config(),
}

KeyNet_URL = "https://github.com/axelBarroso/Key.Net-Pytorch/raw/main/model/weights/keynet_pytorch.pth"


class _FeatureExtractor(Module):
    def __init__(self) -> None:
        super().__init__()

        self.hc_block = _HandcraftedBlock()
        self.lb_block = _LearnableBlock()

    def forward(self, x: Tensor) -> Tensor:
        x_hc = self.hc_block(x)
        x_lb = self.lb_block(x_hc)
        return x_lb


class _HandcraftedBlock(Module):
    def __init__(self) -> None:
        super().__init__()
        self.spatial_gradient = SpatialGradient('sobel', 1)

    def forward(self, x: Tensor) -> Tensor:
        sobel = self.spatial_gradient(x)
        dx, dy = sobel[:, :, 0, :, :], sobel[:, :, 1, :, :]

        sobel_dx = self.spatial_gradient(dx)
        dxx, dxy = sobel_dx[:, :, 0, :, :], sobel_dx[:, :, 1, :, :]

        sobel_dy = self.spatial_gradient(dy)
        dyy = sobel_dy[:, :, 1, :, :]

        hc_feats = concatenate([dx, dy, dx**2.0, dy**2.0, dx * dy, dxy, dxy**2.0, dxx, dyy, dxx * dyy], 1)

        return hc_feats


def _KeyNetConvBlock(
    feat_type_in,
    feat_type_out,
    r2_act,
    kernel_size: int = 5,
    stride: int = 1,
    padding: int = 2,
    dilation: int = 1,
) -> nn.Sequential:
    return enn.SequentialModule(
            enn.R2Conv(feat_type_in, feat_type_out, kernel_size=kernel_size, padding=padding, bias=False),
            enn.InnerBatchNorm(feat_type_out),
            enn.ReLU(feat_type_out, inplace=True),
        )


class _LearnableBlock(nn.Sequential):
    def __init__(self, in_channels: int = 10, out_channels: int = 8, group_size=8) -> None:
        super().__init__()
        r2_act = gspaces.Rot2dOnR2(N=group_size)
            
        feat_type_in = enn.FieldType(r2_act, in_channels * [r2_act.trivial_repr])
        self.in_type = feat_type_in
        feat_type_out = enn.FieldType(r2_act, out_channels * [r2_act.regular_repr])  
        self.block0 = _KeyNetConvBlock(feat_type_in, feat_type_out, r2_act)
        
        feat_type_out = enn.FieldType(r2_act, out_channels * [r2_act.regular_repr])
        self.block1 = _KeyNetConvBlock(self.block0.out_type, feat_type_out, r2_act)

        feat_type_out = enn.FieldType(r2_act, out_channels * [r2_act.regular_repr])
        self.block2 = _KeyNetConvBlock(self.block1.out_type, feat_type_out, r2_act)
        self.gpool = enn.GroupPooling(self.block2.out_type)

    def forward(self, x: Tensor) -> Tensor:
        x = enn.GeometricTensor(x, self.in_type)  
        x = self.block0(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.gpool(x)
        return x.tensor

class KeyNet(Module):
    def __init__(self, pretrained: bool = False, keynet_conf: KeyNet_conf = keynet_default_config) -> None:
        super().__init__()

        num_filters = keynet_conf['num_filters']
        self.num_levels = keynet_conf['num_levels']
        kernel_size = keynet_conf['kernel_size']
        padding = kernel_size // 2

        self.feature_extractor = _FeatureExtractor()
        self.last_conv = nn.Sequential(
            nn.Conv2d(
                in_channels=num_filters * self.num_levels, out_channels=1, kernel_size=kernel_size, padding=padding
            ),
            nn.ReLU(inplace=True),
        )
        if pretrained:
            pretrained_dict = torch.hub.load_state_dict_from_url(KeyNet_URL, map_location=map_location_to_cpu)
            self.load_state_dict(pretrained_dict['state_dict'], strict=True)
        self.eval()

    def forward(self, x: Tensor) -> Tensor:
        shape_im = x.shape
        feats: List[Tensor] = [self.feature_extractor(x)]
        for i in range(1, self.num_levels):
            x = pyrdown(x, factor=1.2)
            feats_i = self.feature_extractor(x)
            feats_i = F.interpolate(feats_i, size=(shape_im[2], shape_im[3]), mode='bilinear')
            feats.append(feats_i)
        scores = self.last_conv(concatenate(feats, 1))
        return scores


class KeyNetDetector(MultiResolutionDetector):
    def __init__(
        self,
        pretrained: bool = False,
        num_features: int = 2048,
        keynet_conf: KeyNet_conf = keynet_default_config,
        ori_module: Optional[Module] = None,
        aff_module: Optional[Module] = None,
    ) -> None:
        model = KeyNet(pretrained, keynet_conf)
        super().__init__(model, num_features, keynet_conf['Detector_conf'], ori_module, aff_module)


In [10]:
# detector = KeyNetDetector()
# # Gera uma imagem de exemplo em escala de cinza
# image = torch.rand(1, 1, 128, 128)  # Imagem de exemplo 128x128 pixels em escala de cinza

# # Faz a inferência com o modelo
# with torch.no_grad():
#     lafs, resps = detector(image)
#     print(lafs.shape, resps.shape)
    

model = KeyNet(pretrained=False,keynet_conf=keynet_default_config)

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu! (when checking argument for argument mat2 in method wrapper_CUDA_bmm)

In [5]:
import torch

def criar_mascara(size_batch,dimensao_janela, tamanho_borda):
    num_channels = 1
    mascara = torch.zeros((size_batch,num_channels, dimensao_janela, dimensao_janela), dtype=torch.uint8)
    mascara[..., tamanho_borda:-tamanho_borda, tamanho_borda:-tamanho_borda] = 1
    return mascara.to(torch.float32)

def my_similarity(a, b):
    a_norm = torch.nn.functional.normalize(a.view(a.size(0), -1), dim=-1)
    b_norm = torch.nn.functional.normalize(b.view(b.size(0), -1), dim=-1)
    return torch.cdist(a_norm, b_norm, p=2)

# Crie métodos para calcular a perda
def loss_fn(map_anch, map_pos, margin=0.9):
    similarities = my_similarity(map_anch, map_pos)
    # Calcular a média da diagonal principal (âncoras vs. seus respectivos positivos)
    mean_diagonal = torch.mean(torch.diagonal(similarities))
    # Calcular a média dos outros elementos (âncoras vs. seus negativos correspondentes)
    mean_other = torch.mean(similarities[~torch.eye(similarities.shape[0], dtype=torch.bool)])
    losses = torch.relu(mean_diagonal - mean_other + margin)  # pos - neg + margin
    return losses,mean_diagonal,mean_other


def extract_feat_in_batch(model, batch_img):
    repo_features = torch.tensor([], dtype=torch.float).to(batch_img.device)
    for image in batch_img:
        feature = model(image[None])  # Adicione 'None' ou 'unsqueeze(0)' se necessário
        repo_features = torch.cat([repo_features, feature], dim=0)  # Coloque 'feature' dentro de uma lista []
    return repo_features
                
import gc
from tqdm import tqdm
def train_one_epoch(model, train_loader, loss_map, optimizer=None, device='cpu', transformations=None,is_training=True):
    model.train(is_training) # Set model to training mode
    total_loss = 0.
    desc="Train " if is_training else "Test "
    t = tqdm(train_loader, desc=desc)
    batch_i = 0
    loss_maps = 0.
    for batch_image, labels in t:
        batch_image = batch_image.to(device)
        mask = criar_mascara(batch_image.shape[0],batch_image.shape[-1],20).to(device)
        features_key_summary = extract_feat_in_batch(model,batch_image*mask)

        batch_t,mask_t,features_key_summary_t = transformations(batch_image,mask,features_key_summary)# transformar orientacoes e pontos
        features_key_summary_t2 = extract_feat_in_batch(model,batch_image*mask)# prever os pontos da imagem transformada

        
        loss,mean_diagonal,mean_other = loss_map(features_key_summary_t,features_key_summary_t2,margin = 0.6)

        if is_training:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        total_loss += loss.item()
        t.set_description("{} Loss: {:.5f} Mean AP {:.5f} Mean AN {:.5f}".format(desc,loss,mean_diagonal,mean_other))
        del features_key_summary
        del batch_t, mask_t, features_key_summary_t
        del features_key_summary_t2
        gc.collect()
        torch.cuda.empty_cache()
        batch_i += 1
    return total_loss/batch_i



In [6]:
from teste_util import *
import teste_util as TS

# Fixar a semente do Torch para operações específicas
fixed_seed()

# leitura dos dados
trainloader,testloader =read_dataload_flower(120,'./data/datasets')
#gerar variacao de transformacoes pespectivas e fotometrica
iterator=iter(trainloader)
img,labels = next(iterator)
params_lists =AugmentationParamsGenerator(6,img.shape)

In [7]:

from torch import optim
from torch.optim.lr_scheduler import ExponentialLR

transforms = kornia.augmentation.AugmentationSequential(
    kornia.augmentation.RandomAffine(degrees=360, translate=(0.2, 0.2), scale=(0.95, 1.05), shear=10,p=0.8),
    kornia.augmentation.RandomPerspective(0.2, p=0.7),
    kornia.augmentation.RandomBoxBlur((4,4),p=0.5),
    # kornia.augmentation.RandomEqualize(p=0.3),
    data_keys=["input","input","input"],
    same_on_batch=True,
    # random_apply=10,
)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

gc.collect()
torch.cuda.empty_cache()
epochs=3
i_epoch = 0
loss = 0
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
scheduler = ExponentialLR(optimizer, gamma=0.75)

import torch

def train_with_early_stopping(model, trainloader, testloader, criterion_d, optimizer, scheduler, device, transformations, epochs=100, patience=20):
    best_loss = float('inf')
    best_model = None
    epochs_without_improvement = 0
    
    for epoch in range(epochs):
        # Atualizar a taxa de aprendizado
        if (epoch % 10 == 0) and (epoch != 0):
            scheduler.step()

        running_loss = train_one_epoch(model, trainloader, loss_map=criterion_d,  optimizer=optimizer, device=device, transformations=transformations, is_training=True)
        
        with torch.no_grad():
            loss_test = train_one_epoch(model, testloader, loss_map=criterion_d,  optimizer=None, device=device, transformations=transformations, is_training=False)

        # Verificar se a perda melhorou
        if loss_test < best_loss:
            best_loss = loss_test
            epochs_without_improvement = 0
            best_model = model.state_dict()
        else:
            epochs_without_improvement += 1

        # Verificar a condição de parada
        if epochs_without_improvement == patience:
            print(f"No improvement in loss for {epochs_without_improvement} epochs. Training stopped.")
            break

        print(f"Epoch [{epoch}/{epochs}] - Running Loss: {running_loss:.4f}, Test Loss: {loss_test:.4f}, Initial LR: {optimizer.param_groups[0]['initial_lr']:.6f}, Current LR: {optimizer.param_groups[0]['lr']:.6f}, Epochs without Improvement: {epochs_without_improvement}")
    
    # Carregar a melhor configuração do modelo
    model.load_state_dict(best_model)
    print(f'Epoch: {epoch}, Best Loss: {best_loss:.4f}')

train_with_early_stopping(model.to(device), trainloader, testloader, loss_fn, optimizer, scheduler, device, transforms, epochs=epochs, patience=50)
# save_model(model,path_siamese)

AugmentationSequential(
  (RandomAffine_0): RandomAffine(degrees=360, translate=(0.2, 0.2), scale=(0.95, 1.05), shear=10, p=0.8, p_batch=1.0, same_on_batch=True, resample=bilinear, padding_mode=zeros, align_corners=False)
  (RandomPerspective_1): RandomPerspective(distortion_scale=0.2, p=0.7, p_batch=1.0, same_on_batch=True, align_corners=False, resample=bilinear)
  (RandomBoxBlur_2): RandomBoxBlur(p=0.5, p_batch=1.0, same_on_batch=True, kernel_size=(4, 4), border_type=reflect, normalized=True)
)


Train  Loss: 0.59988 Mean AP 1.20886 Mean AN 1.20897: 100%|██████████| 17/17 [01:34<00:00,  5.58s/it]
Test  Loss: 0.57500 Mean AP 1.22576 Mean AN 1.25076: 100%|██████████| 17/17 [00:17<00:00,  1.02s/it]


Epoch [0/3] - Running Loss: 0.5555, Test Loss: 0.3919, Initial LR: 0.001000, Current LR: 0.001000, Epochs without Improvement: 0


Train  Loss: 0.51889 Mean AP 1.16270 Mean AN 1.24381: 100%|██████████| 17/17 [01:33<00:00,  5.51s/it]
Test  Loss: 0.55040 Mean AP 1.30822 Mean AN 1.35782: 100%|██████████| 17/17 [00:17<00:00,  1.02s/it]


Epoch [1/3] - Running Loss: 0.4887, Test Loss: 0.5044, Initial LR: 0.001000, Current LR: 0.001000, Epochs without Improvement: 1


Train  Loss: 0.12246 Mean AP 0.52837 Mean AN 1.00590: 100%|██████████| 17/17 [01:34<00:00,  5.54s/it]
Test  Loss: 0.56468 Mean AP 1.20675 Mean AN 1.24207: 100%|██████████| 17/17 [00:17<00:00,  1.03s/it]

Epoch [2/3] - Running Loss: 0.5015, Test Loss: 0.4912, Initial LR: 0.001000, Current LR: 0.001000, Epochs without Improvement: 2
Epoch: 2, Best Loss: 0.3919



