In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import numpy as np
from scipy.stats import ks_2samp
from sklearn.metrics import mean_absolute_error, r2_score, median_absolute_error, mean_absolute_percentage_error

In [2]:
#arquitectura neuronal
class HaloToGalaxyModel(nn.Module):
    def __init__(self, input_size=4, output_size=1, hidden_dim=64):
        super(HaloToGalaxyModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_dim) 
        self.fc2 = nn.Linear(hidden_dim, hidden_dim) 
        self.fc3 = nn.Linear(hidden_dim, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  
        return x

# Función para cargar datos desde un CSV
def load_data_from_csv(file_path):
    data = pd.read_csv(file_path)
    X = data.iloc[:, 5:10].values  
    #y = data.iloc[:, 12:16].values  
    #y = data.iloc[:, 12].values  #esto sólo carga la masa,
    #y = data.iloc[:, 13].values #color
    #y = data.iloc[:, 14].values #radio
    #y = data.iloc[:, 15].values #sSFR   
    y1 = data.iloc[:, 13].values  #color,
    y2 = data.iloc[:, 15].values #sSFR   
    return X, y1,y2


class customLossYan(nn.Module):
    def __init__(self, quantiles):
        super(customLossYan, self).__init__()
        self.quantiles = quantiles

    def forward(self, y_true, y_pred):
        losses = []
        for i,q in enumerate(self.quantiles):
            #print("predicción: ",y_pred)
            #print("verdad: ",y_true)
            errors = y_true[:,i] - y_pred[:,i]
            losses.append(
                torch.max((q - 1) * errors, q * errors)
            )
        loss = torch.mean(torch.stack(losses).sum(dim=0))
        return loss           


def quantile_loss(y_true, y_pred, quantiles):
    losses = []
    for i, q in enumerate(quantiles):
        errors = y_true[:, i] - y_pred[:, i]
        losses.append(np.maximum((q - 1) * errors, q * errors))
    loss = np.mean(np.stack(losses).sum(axis=0))
    return loss

def coverage_probability(y_true, y_pred, quantiles):
    coverage = []
    for i, q in enumerate(quantiles):
        coverage.append(np.mean((y_true[:, i] <= y_pred[:, i]) & (y_pred[:, i] <= y_true[:, i])))
    return np.mean(coverage)

In [3]:
#cargar datos
file_path = 'datasetcompleto.csv'  
X, y1,y2 = load_data_from_csv(file_path)

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [5]:
X = torch.tensor(X, dtype=torch.float32).to(device)
y1 = torch.tensor(y1, dtype=torch.float32).unsqueeze(1)
y2 = torch.tensor(y2, dtype=torch.float32).unsqueeze(1)
y1 = y1.repeat(1, 3)
y2 = y2.repeat(1, 3)

# Concatenar los dos tensores a lo largo de la segunda dimensión
y = torch.cat((y1, y2), dim=1)

In [6]:
print(y)

tensor([[  1.1002,   1.1002,   1.1002, -13.6158, -13.6158, -13.6158],
        [  1.0834,   1.0834,   1.0834, -12.5470, -12.5470, -12.5470],
        [  0.9506,   0.9506,   0.9506, -11.6729, -11.6729, -11.6729],
        ...,
        [  1.0424,   1.0424,   1.0424, -13.4485, -13.4485, -13.4485],
        [  1.0193,   1.0193,   1.0193, -13.0122, -13.0122, -13.0122],
        [  0.9831,   0.9831,   0.9831, -13.6909, -13.6909, -13.6909]])


In [7]:
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.15, random_state=69)

X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.15, random_state=69)

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False)


hidden_dim = 100 #tamaño de las capas ocultas
num_epochs = 1000
early_stop_patience = 20
best_val_loss = float('inf')
epochs_no_improve = 0

quantiles = [0.25, 0.5, 0.75,0.25, 0.5, 0.75] 


modelfile = 'colorysSFR.pth'
model = HaloToGalaxyModel(X.shape[1], len(quantiles), hidden_dim).to(device)
model.load_state_dict(torch.load(modelfile))
#quantiles = np.linspace(0, 1, 50)[1:-1]  # Excluir 0 y 1

criterion = customLossYan(quantiles)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

In [8]:
print(y_test)

tensor([[  0.6808,   0.6808,   0.6808, -10.0063, -10.0063, -10.0063],
        [  0.5005,   0.5005,   0.5005,  -9.5698,  -9.5698,  -9.5698],
        [  0.6943,   0.6943,   0.6943,  -9.7549,  -9.7549,  -9.7549],
        ...,
        [  1.1777,   1.1777,   1.1777, -14.2123, -14.2123, -14.2123],
        [  0.8678,   0.8678,   0.8678,  -9.6318,  -9.6318,  -9.6318],
        [  1.1048,   1.1048,   1.1048, -14.1196, -14.1196, -14.1196]])


In [9]:
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(targets, outputs)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)
    
    train_loss /= len(train_loader.dataset)
    
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(targets, outputs)
            val_loss += loss.item() * inputs.size(0)
                        
    val_loss /= len(val_loader.dataset)    
    
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), modelfile)
    else:
        epochs_no_improve += 1
        if epochs_no_improve == early_stop_patience:
            print('Early stopping!')
            break


Epoch [1/1000], Loss: 1.4706, Val Loss: 1.1937
Epoch [2/1000], Loss: 1.1933, Val Loss: 1.1748
Epoch [3/1000], Loss: 1.1764, Val Loss: 1.1567
Epoch [4/1000], Loss: 1.1569, Val Loss: 1.1398
Epoch [5/1000], Loss: 1.1383, Val Loss: 1.1188
Epoch [6/1000], Loss: 1.1211, Val Loss: 1.1040
Epoch [7/1000], Loss: 1.1060, Val Loss: 1.1189
Epoch [8/1000], Loss: 1.0926, Val Loss: 1.0795
Epoch [9/1000], Loss: 1.0802, Val Loss: 1.0707
Epoch [10/1000], Loss: 1.0704, Val Loss: 1.0551
Epoch [11/1000], Loss: 1.0609, Val Loss: 1.0497
Epoch [12/1000], Loss: 1.0527, Val Loss: 1.0340
Epoch [13/1000], Loss: 1.0462, Val Loss: 1.0406
Epoch [14/1000], Loss: 1.0393, Val Loss: 1.0271
Epoch [15/1000], Loss: 1.0321, Val Loss: 1.0264
Epoch [16/1000], Loss: 1.0263, Val Loss: 1.0214
Epoch [17/1000], Loss: 1.0188, Val Loss: 0.9990
Epoch [18/1000], Loss: 1.0125, Val Loss: 0.9998
Epoch [19/1000], Loss: 1.0047, Val Loss: 1.0043
Epoch [20/1000], Loss: 0.9992, Val Loss: 0.9836
Epoch [21/1000], Loss: 0.9917, Val Loss: 0.9735
E

KeyboardInterrupt: 

In [None]:
model.load_state_dict(torch.load(modelfile))

model.eval()
test_loss = 0.0
y_test_all = []
outputs_all = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(targets, outputs)
        test_loss += loss.item() * inputs.size(0)
        y_test_all.append(targets.cpu().numpy())
        outputs_all.append(outputs.cpu().numpy())

test_loss /= len(test_loader.dataset)
y_test_all = np.concatenate(y_test_all)
outputs_all = np.concatenate(outputs_all)

mae = mean_absolute_error(y_test_all, outputs_all)
rmse = np.sqrt(test_loss)
r2 = r2_score(y_test_all, outputs_all)
median_ae = median_absolute_error(y_test_all, outputs_all)
mape = mean_absolute_percentage_error(y_test_all, outputs_all)
q_loss = quantile_loss(y_test_all, outputs_all, quantiles)
coverage = coverage_probability(y_test_all, outputs_all, quantiles)
#ks_statistic, p_value = ks_test_metric(torch.tensor(y_test_all), torch.tensor(outputs_all))

print(f'Test Loss (MSE): {test_loss:.4f}')
print(f'MAE: {mae:.4f}')
print(f'RMSE: {rmse:.4f}')
print(f'R²: {r2:.4f}')
print(f'Median Absolute Error: {median_ae:.4f}')
print(f'MAPE: {mape:.4f}')
print(f'Quantile Loss: {q_loss:.4f}')
print(f'Coverage Probability: {coverage:.4f}')
#print(f'KS Statistic: {ks_statistic:.4f}, P-value: {p_value:.4f}')


In [None]:
print(X[9906])

In [None]:
model.eval()
experimento = model(X[9906])

In [None]:
experimento