In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import numpy as np
import  matplotlib.pyplot   as plt
import math
import logging
# from torchsummary import summary
from pytorch3d.loss import chamfer_distance
import csv
import time
import argparse
from datetime import datetime

torch.manual_seed(101)
torch.cuda.manual_seed(101)

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device") 

class args():
    def __init__(self, data_type, use_case):
        self.data_type = data_type
        self.use_case = use_case

args = args("ISAR", "Classification")
print(args.data_type)
print(args.use_case)

Using cuda device
ISAR
Classification


In [27]:
class CustomDataset(Dataset):
    """Custom dataset for loading data files and their corresponding labels.

    Initializes the CustomDataset instance.

    Args:
        npy_dir (str): Directory containing the data files.
        labels_file (str): File path to the file containing labels.
        transform_ISAR (callable, optional): Optional transform to be applied on a sample for ISAR data type.
        transform_npy (callable, optional): Optional transform to be applied on a sample for npy data type.
    """

    def __init__(self, dir, transform_ISAR=None, transform_npy=None):

        self.dir = dir

        if args.data_type == 'npy':
            # List all .npy files in the directory.
            self.file_names = [f for f in os.listdir(dir) if f.endswith('.npy') and f.startswith('sample_')]
            self.transform = transform_npy
        elif args.data_type == 'ISAR':
            # List all .npy files in the directory.
            self.file_names = [f for f in os.listdir(dir) if f.endswith('.png') and f.startswith('sample_')]
            self.transform = transform_ISAR
            
        if args.use_case == "Classification":
            _, _, result = dir.split('/')[-1].partition('_')  # divide en 3 partes: antes, separador, después
            result = f"_{result}"
            # Load labels vector from the labels file.
            self.labels_vector = np.load(dir + '/labels_vector' + result + '.npy')
        elif args.use_case == "Regression":
            self.coords = np.load(dir + "/coords.npy")[:, 1:]
            self.dist_max = self.dist_max_calc()
            
        # Sort files based on the sample id which is the number right after "sample_". For example, "sample_5.npy" will be split into ['sample', '5', '.npy'].
        self.file_names.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))
            
    def __len__(self):
        """Returns the total number of samples in the dataset.

        Returns:
            int: Number of samples.
        """
        return len(self.file_names)
    
    def __getitem__(self, idx):

        # Construct the full file path for the sample.
        file_path = os.path.join(self.dir, self.file_names[idx])
        
        if args.data_type == 'npy':
            # Load the numpy array with shape (n x m x 2).
            data = np.load(file_path)
        
            # Convert the numpy array to a PyTorch tensor.
            data = torch.tensor(data, dtype=torch.float32).to(device)
        elif args.data_type == 'ISAR':
            data = Image.open(file_path).convert("RGB")

            # Aplicar transformaciones si las hay
            if self.transform:
                data = self.transform(data).to(device)
            else:
                data.to(device)

        if args.use_case == "Classification":
            # Load labels vector from the labels file.
            target = torch.tensor(self.labels_vector[idx], dtype=torch.float32).to(device)
        elif args.use_case == "Regression":
            target = torch.tensor(self.coords[idx,:], dtype=torch.float32).to(device)
        
        return data, target
    
    def dist_max_calc(self):
        sample_coords = self.coords[1].reshape(int(self.coords.shape[1]/3),3)
        dist_max = 0.0
        for i in range(len(sample_coords)):
            if sum(sample_coords[i] ** 2) > dist_max:
                dist_max = sum(sample_coords[i] ** 2)
            dist_max = np.sqrt(dist_max)
        return dist_max

class CNNModule(nn.Module):
    """Convolutional module to extract features from a 2-channel input.

    This network processes an input tensor with shape (batch_size, 2, height, width)
    and returns a flattened feature vector.
    """
    # La red LSTM podria usarse para relacionar columnas o filas del npy entre sí

    def __init__(self):
        super(CNNModule, self).__init__()
        
        # Capas convolucionales
        if args.data_type == 'npy':
            self.conv1 = nn.LazyConv2d(96, kernel_size=3, stride=2, padding=1)
            self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
            self.conv2 = nn.LazyConv2d(256, kernel_size=5, padding=2)
            self.pool2 = nn.MaxPool2d(kernel_size=(3, 1), stride=(2, 1), padding=(1, 0))
            self.conv3 = nn.LazyConv2d(384, kernel_size=3, padding=1)
            self.conv4 = nn.LazyConv2d(384, kernel_size=3, padding=1)
            self.conv5 = nn.LazyConv2d(256, kernel_size=3, padding=1)
            self.pool3 = nn.MaxPool2d(kernel_size=(2, 1), stride=(2, 1))
            self.ReLU = nn.ReLU()
            self.Flatten = nn.Flatten()

        elif args.data_type == 'ISAR':
            self.conv1 = nn.LazyConv2d(out_channels=96, kernel_size=22, stride=2, padding=1)
            self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2)
            self.conv2 = nn.LazyConv2d(256, kernel_size=5, padding=2)
            self.pool2 = nn.MaxPool2d(kernel_size=3, stride=2)
            self.conv3 = nn.LazyConv2d(384, kernel_size=3, padding=1)
            self.conv4 = nn.LazyConv2d(384, kernel_size=3, padding=1)
            self.conv5 = nn.LazyConv2d(256, kernel_size=3, padding=1)
            self.pool3 = nn.MaxPool2d(kernel_size=3, stride=2)
            self.ReLU = nn.ReLU()
            self.Flatten = nn.Flatten()

    def forward(self, x):
        x = self.conv1(x)
        x = self.ReLU(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.ReLU(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.ReLU(x)
        x = self.conv4(x)
        x = self.ReLU(x)
        x = self.conv5(x)
        x = self.ReLU(x)
        x = self.pool3(x)
        x = self.Flatten(x)
        return x
    
class GeometryPredictor(nn.Module):

    def __init__(self, num_labels=None, coords_width=None):

        super(GeometryPredictor, self).__init__()
        
        self.cnn = CNNModule()

        # Fully connected layers
        self.fc1 = nn.LazyLinear(4096)  # Adjust size if needed.
        self.fc2 = nn.LazyLinear(4096)
        
        if args.use_case == "Classification":
            self.fc3 = nn.LazyLinear(num_labels)  # Number of outputs equal to num_labels.
        if args.use_case == "Regression":
            self.fc3 = nn.LazyLinear(coords_width)  # 3*N salidas, una por coordenada (x, y, z) de cada uno de los N vertices

    def forward(self, x):

        # Process the input through the CNN to extract features.
        features = self.cnn(x)

        # Pass the features through fully connected layers.
        x = F.relu(self.fc1(features))
        x = F.relu(self.fc2(x))
        output = self.fc3(x)

        return output

In [28]:
# Definir las transformaciones (por ejemplo, redimensionar las imágenes y normalizarlas)
transform_I = transforms.Compose([
    transforms.Resize((128, 128)),                                      # Cambiar según el tamaño de las imágenes
    transforms.ToTensor(),                                              # Convertir las imágenes a tensores
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])     # Normalización
])

# Crear el dataset personalizado
dataset = CustomDataset(dir = "/home/newfasant2/N101/N101-IA/Datasets/Reorganized/Classification_500_0_64_f_64_d", transform_ISAR=transform_I)

if args.use_case == "Classification":
    # Inicializar modelo
    model = GeometryPredictor(num_labels=len(np.unique(dataset.labels_vector))).to(device)

# Dividir el dataset en entrenamiento y validación
train_size = int(0.7 * len(dataset))  # 70% para entrenamiento
val_size = int(0.15 * len(dataset))  # 15% para validación
test_size = len(dataset) - train_size - val_size  # 15% para test

generator = torch.Generator().manual_seed(101)

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator=generator)

# Crear DataLoaders para iterar sobre los datasets
train_batch_size = 32
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)


In [29]:
# Cargar un modelo
model.load_state_dict(torch.load("/home/newfasant2/N101/N101-IA/CNN/Models/Classification_ISAR_500samples_250ep_32bs.pth"))
model.eval()

  model.load_state_dict(torch.load("/home/newfasant2/N101/N101-IA/CNN/Models/Classification_ISAR_500samples_250ep_32bs.pth"))


GeometryPredictor(
  (cnn): CNNModule(
    (conv1): LazyConv2d(0, 96, kernel_size=(22, 22), stride=(2, 2), padding=(1, 1))
    (pool1): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): LazyConv2d(0, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (pool2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv3): LazyConv2d(0, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv4): LazyConv2d(0, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv5): LazyConv2d(0, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (pool3): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (ReLU): ReLU()
    (Flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (fc1): LazyLinear(in_features=0, out_features=4096, bias=True)
  (fc2): LazyLinear(in_features=0, out_features=4096, bias=True)
  (fc3): LazyLinear(in_features=0, out_features=2, bias=True)
)

In [90]:
# %matplotlib widget
# Importante incluir esta linea al comienzo de la celda donde se quiere visualizar el resultado

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=len(train_dataset), shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)

random_ch = np.random.randint(0, len(train_dataset))
# random_ch = 111
print("choice:", random_ch)

criterion_CEL = nn.CrossEntropyLoss()

with torch.no_grad():
    # for npy, target_label in test_loader:
            
    #     output_logits = model(npy)
    #     pred_probs = F.softmax(output_logits, dim=1)
    #     loss = criterion_CEL(output_logits, target_label.long())
        
    #     test_loss = loss.item()
    # for npy, target_label in val_loader:
            
    #     output_logits = model(npy)
    #     pred_probs = F.softmax(output_logits, dim=1)
    #     loss = criterion_CEL(output_logits, target_label.long())
        
    #     test_loss = loss.item()
    # print("test_loss", test_loss)
    # train_results = []
    # for data, target_label in train_loader:
    #     output_logits = model(data)
    #     pred_probs = F.softmax(output_logits, dim=1)
    print("target: ", target_label[random_ch])
    print("output_logits: ", output_logits[random_ch])
    print("predicted_probs: ", pred_probs[random_ch])
    print("Loss_torch: ", criterion_CEL(output_logits[random_ch], target_label[random_ch].long()))
    def cel_own(logits, label):
        loss_own = -np.log((np.exp(logits[label]))/(sum(np.exp(logits))))
        return loss_own
    print("Loss own: ", cel_own(output_logits[random_ch].cpu(), target_label[random_ch].long().cpu()))
    # for i in range(len(target_label)):
    #     train_results.append((int(target_label[i].item()), torch.argmax(pred_probs[i]).item()))
    # train_success = train_results[train_results[0]!=train_results[1]]
    # train_perc = (1- train_success/len(train_results))*100



choice: 36
target:  tensor(1., device='cuda:0')
output_logits:  tensor([ 1.6384, -1.6513], device='cuda:0')
predicted_probs:  tensor([0.9641, 0.0359], device='cuda:0')
Loss_torch:  tensor(3.3263, device='cuda:0')
Loss own:  tensor(3.3263)


  loss_own = -np.log((np.exp(logits[label]))/(sum(np.exp(logits))))


In [49]:
with torch.no_grad():
    train_results = []
    for data, target_label in train_loader:
        output_logits = model(data)
        pred_probs = F.softmax(output_logits, dim=1)
        # print("target: ", target_label)
        # print("predicted: ", pred_probs)
    for i in range(len(target_label)):
        train_results.append((int(target_label[i].item()), torch.argmax(pred_probs[i]).item()))
    train_success = train_results[train_results[0]!=train_results[1]]
    train_perc = (1 - len(train_success)/len(train_results))*100

    test_results = []
    for data, target_label in test_loader:
        output_logits = model(data)
        pred_probs = F.softmax(output_logits, dim=1)
        # print("target: ", target_label)
        # print("predicted: ", pred_probs)
    for i in range(len(target_label)):
        test_results.append((int(target_label[i].item()), torch.argmax(pred_probs[i]).item()))
    test_success = test_results[test_results[0]!=test_results[1]]
    test_perc = (1 - len(test_success)/len(test_results))*100

    val_results = []
    for data, target_label in test_loader:
        output_logits = model(data)
        pred_probs = F.softmax(output_logits, dim=1)
        # print("target: ", target_label)
        # print("predicted: ", pred_probs)
    for i in range(len(target_label)):
        val_results.append((int(target_label[i].item()), torch.argmax(pred_probs[i]).item()))
    val_success = val_results[val_results[0]!=val_results[1]]
    val_perc = (1 - len(val_success)/len(val_results))*100

In [50]:
print("Train percentage of success: ", train_perc, "%")
print("Test percentage of success: ", test_perc, "%")
print("Validation percentage of success: ", val_perc, "%")

Train percentage of success:  99.42857142857143 %
Test percentage of success:  97.33333333333334 %
Validation percentage of success:  97.33333333333334 %


In [45]:
target_label.shape

results = []

for i in range(len(target_label)):
    results.append((int(target_label[i].item()), torch.argmax(pred_probs[i]).item()))

for pair in results:
    if pair[0] != pair[1]:
        print(pair)

test_success = len(results[results[0]!=results[1]])
print("test_success:", 1- test_success/len(results))




(0, 1)
(0, 1)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(1, 0)
(0, 1)
(1, 0)
test_success: 0.9942857142857143


In [13]:
import numpy as np
import matplotlib.pyplot as plt
a = np.load("/home/newfasant2/N101/N101-IA/Datasets/Raw/Samples_64_f_64_d/Avenger-716_UAV_50000_fixed_meters/sample_116_85.857452_P_isar.npy")
print(a.shape)

# plt.imshow(a, cmap="gray")
# plt.xticks([])
# plt.yticks([])

plt.imsave("/home/newfasant2/N101/N101-IA/Datasets/avenger.png", arr=a, cmap="gray", format="png")

(64, 64)
