In [None]:
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import PIL
import pandas as pd
from PIL import Image
import torch
from torch import nn,optim
from torch.nn import functional as F
from torchvision import transforms as T
from torch.utils.data import DataLoader, Dataset, random_split
from typing import Type, Union, List, Optional, Any
import copy
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
from tqdm import tqdm  

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
val_path = 'DATABASE/Validación (RESNET 3)/Perfiles rotados/'
val_vft_path = 'DATABASE/Validación (RESNET 3)/vft values/vft_values.csv'

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, vft, transform=None):
        '''
        data : train data path
        vft : train vft path (csv file)
        '''
        self.train_data = data
        self.train_vft = pd.read_csv(vft)
        self.transform = transform
        
        self.profiles = self.train_vft.iloc[:, 0]
        self.velocities = self.train_vft.iloc[:, 1]

    def __len__(self):
        return len(self.train_vft)
    
    def __getitem__(self, index):
        profile = self.profiles[index]
        velocity = self.velocities[index]
        img = Image.open(os.path.join(self.train_data, profile))
        
        if self.transform is not None:
            img = self.transform(img)
        
        return (img, velocity, profile)

In [None]:
transform = T.ToTensor()

In [None]:
val_dataset = CustomDataset(val_path,val_vft_path,transform)
VAL_SIZE = len(val_dataset)
print(VAL_SIZE)

In [None]:
val_loader = DataLoader(val_dataset, batch_size=16, shuffle= True)

In [None]:
images, vft_values, names= next(iter(val_loader))
print(vft_values[1], names[1])

In [None]:
class Bottleneck(nn.Module):
    expansion = 4
    
    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

# Definir la arquitectura de ResNet con bloque Bottleneck
class ResNet(nn.Module):
    def __init__(self, block: Type[Bottleneck], layers: List[int], num_classes=1):
        super(ResNet, self).__init__()
        self.inplanes = 32
        self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=5, stride=2, padding=2, bias=False) 
        self.bn1 = nn.BatchNorm2d(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 32, layers[0])
        self.layer2 = self._make_layer(block, 64, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 128, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 256, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))
            #layers.append(nn.Dropout(p=0.3))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

## Cargar modelo

In [None]:
model_path = 'Redes/resnet7.pth'
resnet4= ResNet(block=Bottleneck, layers=[2, 2, 2, 2], num_classes=1)
resnet4.load_state_dict(torch.load(model_path))

In [None]:
def visualize_batch2(images, vft_values, values, names,image_path):
    batch_size = len(images)
    num_rows = 4
    num_cols = 4
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(10, 10))
    for i in range(num_rows):
        for j in range(num_cols):
            index = i * num_cols + j
            if index >= batch_size:
                break
            image = images[index,0]
            vft_value = vft_values[index].item()
            value = values[index] 
            name = names[index]
            axes[i][j].imshow(image, cmap='gray')
            axes[i][j].set_title(f'vsini: {vft_value} [km/s]', fontsize=6)  
            axes[i][j].axis('off')
            axes[i][j].text(0.45, 0.98, f'vsini real: {value} [km/s]', color='red', transform=axes[i][j].transAxes, ha='center',fontsize=6)
            axes[i][j].text(0.5, 1.15, f'{name}', color='black', transform=axes[i][j].transAxes, ha='center',fontsize=6)
    plt.tight_layout()
    plt.savefig(image_path)
    plt.show()

In [None]:
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
from tqdm import tqdm  

images_val, vft_values_val, names = next(iter(val_loader))
images_val = images_val.to(device, dtype=torch.float32)
resnet4 = resnet4.to(device)
resnet4.eval()

with torch.no_grad():
    scores = resnet4(images_val)
images_val = images_val.cpu()
scores = scores.cpu()
true_values = [float(value.item()) for value in vft_values_val] 

pbar = tqdm(val_loader, total=len(val_loader))
for images_val, vft_values_val, _ in pbar:
    images_val = images_val.to(device, dtype=torch.float32)
    with torch.no_grad():
        batch_scores = resnet4(images_val)
    images_val = images_val.cpu()
    batch_scores = batch_scores.cpu()
    batch_values = [float(value.item()) for value in vft_values_val] 
    true_values.extend(batch_values)
    scores = torch.cat((scores, batch_scores))

r2 = r2_score(true_values, scores)
rmse = mean_squared_error(true_values, scores)
errors = [error.item() for tensor_error in [(true - score) for true, score in zip(true_values, scores)] for error in tensor_error]
pbar.close()

In [None]:
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.scatter(true_values, scores, color='blue')
plt.title(f'Comparación entre Valores Reales y Predichos, \n$R^2$ = {r2:.4f} , RMSE = {rmse:.4f}')
plt.xlabel('Valores Reales')
plt.ylabel('Valores Predichos')
m, b = np.polyfit(true_values, scores, 1)
plt.plot(true_values, m*true_values + b, color='red')

plt.subplot(1, 2, 2)
plt.scatter(true_values, errors, color='blue')
plt.title(f'Dispersión del Error en datos de Entrenamiento')
plt.xlabel('Velocidad [$km/s$]')
plt.ylabel('Error Absoluto')

plt.tight_layout()
image_path = os.path.join("Imagenes/Articulo/Error/", "r6_ent.png")
#plt.savefig(image_path)
plt.show()

In [None]:
mean_error = np.mean(errors)
std_deviation = np.std(errors)

plt.hist(errors, bins=20, color='blue', edgecolor='black')
plt.axvline(mean_error, color='red', linestyle='dashed', linewidth=1, label=f'Media = {mean_error:.2f}')
plt.axvline(mean_error + std_deviation, color='green', linestyle='dashed', linewidth=1, label=f'Desviación Estándar = {std_deviation:.2f}')
plt.axvline(mean_error - std_deviation, color='green', linestyle='dashed', linewidth=1)
plt.xlabel('Error Absoluto')
plt.ylabel('Frecuencia')
plt.title('Histograma de Errores de Predicción en datos de Entrenamiento')
plt.legend()
image_path = os.path.join("Imagenes/Articulo/Error/", "Histogram_entr6.png")
#plt.savefig(image_path)
plt.show()

1) grafico de dispersion del error, para ambos set de datos(calibracion y validacion),calcular error y luego histograma (20 bars),se debe saber valor medio del error(gaussiano), y desviacion estandar. Grafico error vs velocidad (si esta bien, centro en 0).
2) Para datos de entrenamiento, grafico con error(y) vs velocidad (x).
3) Pasar graficos a latex (.dat), 1 columna valor real, 2 columna valor predicho (1 ; 2) : Exportar FFT datos nuevos(solo imagen: eje x frec, eje y magnitud), tomar 1 modelo, y calcular 3 velocidades diferentes.
4) Probar red en modelos reales (adecuar espectro en mismo formato )

resnet4 = resnet4.to(device='cuda')
resnet4.eval()
ruta_carpeta = 'Modelos reales/Ajuste gaussiano/'
nombre_archivo = 'HD41117.jpg'
ruta_completa = os.path.join(ruta_carpeta, nombre_archivo)
imagen = Image.open(ruta_completa)

imagen = transform(imagen)
imagen = imagen.to(device)
resnet4 = resnet4.to(device)
with torch.no_grad():
    scores = resnet4(imagen.unsqueeze(0)) 
imagen = imagen.cpu()
scores = scores.cpu()
scores.item()