In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import OneHotEncoder, StandardScaler
import torch.nn as nn
import torch
from math import ceil
import numpy as np
import tqdm
import itertools

In [None]:
try:
    data = pd.read_csv("6_class.csv")
except:
    data = pd.read_csv("/datasets/6_class.csv")

In [None]:
data.head(10)

In [None]:
data.info()

In [None]:
data.describe()

In [None]:
plt.hist(data["Temperature (K)"])
plt.title("Temperature")

In [None]:
plt.hist(data["Luminosity(L/Lo)"])
plt.title("Light")

In [None]:
plt.hist(data["Radius(R/Ro)"])
plt.title("Radius")

In [None]:
plt.hist(data["Absolute magnitude(Mv)"])
plt.title("Magnitude")

In [None]:
category_columns = ["Star color"]
numeric_columns = ["Luminosity(L/Lo)", "Radius(R/Ro)", "Absolute magnitude(Mv)", "Star type"]

In [None]:
data[category_columns[0]] = data[category_columns[0]].apply(lambda x: x.strip().lower())

In [None]:
data[category_columns[0]].unique()

In [None]:
color = {
    'blue white':'blue-white',
    'white-yellow': 'yellow-white', 
}
data[category_columns[0]] = data[category_columns[0]].apply(lambda x: color[x] if x in color else x)

In [None]:
features = data.drop(["Temperature (K)"], axis=1)
target = data["Temperature (K)"]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(features, target,
                                                    random_state=12345, test_size=0.33, shuffle=True)

In [None]:
ohe = OneHotEncoder(handle_unknown='ignore', drop="first", sparse=False)
ohe.fit(X_train[category_columns])
X_train_ohe = ohe.transform(X_train[category_columns])
X_train_ohe = pd.DataFrame(X_train_ohe, columns=ohe.get_feature_names_out(), index=X_train.index)
X_train = X_train.drop(category_columns, axis=1)
X_train = X_train.join(X_train_ohe)

X_test_ohe = ohe.transform(X_test[category_columns])
X_test_ohe = pd.DataFrame(X_test_ohe, columns=ohe.get_feature_names_out(), index=X_test.index)
X_test = X_test.drop(category_columns, axis=1)
X_test = X_test.join(X_test_ohe)

In [None]:
X_test = X_test.drop("Unnamed: 0", axis=1)
X_train = X_train.drop("Unnamed: 0", axis=1)

In [None]:
X_train

In [None]:
scaler = StandardScaler()
X_train[numeric_columns] = scaler.fit_transform(X_train[numeric_columns])
X_test[numeric_columns]= scaler.transform(X_test[numeric_columns])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seed = 12345
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

In [None]:
X_train = torch.tensor(X_train.values, device=device)
X_test = torch.tensor(X_test.values, device=device)
y_train = torch.tensor(y_train.values, device=device, dtype=torch.float64)
y_test = torch.tensor(y_test.values, device=device, dtype=torch.float64)

In [None]:
input_neruons = X_train.shape[1]
output_neruons = 1

In [None]:
class Baseline(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super(Baseline, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.relu1 = nn.LeakyReLU(0.2)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.relu2 = nn.LeakyReLU(0.2)
        self.fc3 = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x

In [None]:
base_model = Baseline(input_neruons, input_neruons//2, input_neruons//2, output_neruons).to(dtype=torch.float64, device=device)

In [None]:
optimizer = torch.optim.Adam(base_model.parameters(), lr=1e-3)
 
loss = nn.MSELoss().to(device=device)

In [None]:
batch_size = 10
 
num_epochs = 1000
 
num_batches = ceil(len(X_train)/batch_size)

accs = []
epoch_ = []
 
for epoch in tqdm.tqdm(range(num_epochs)):
    order = np.random.permutation(len(X_train))
    for batch_idx in range(num_batches):
        start_index = batch_idx * batch_size
        optimizer.zero_grad()
  
        batch_indexes = order[start_index:start_index+batch_size]
        X_batch = X_train[batch_indexes]
        y_batch = y_train[batch_indexes]
  
        preds = base_model(X_batch).flatten()
            
        loss_value = loss(preds, y_batch)
 
        loss_value.backward()
            
        optimizer.step()
        
    if epoch % 10 == 0 or epoch == num_epochs - 1:
        base_model.eval()
        test_preds = base_model(X_test)
        accuracy = mean_squared_error(test_preds.to("cpu").detach().numpy(), y_test.to("cpu").detach().numpy(), squared=False)
        accs.append(accuracy)
        epoch_.append(epoch)

In [None]:
plt.plot(epoch_, accs)
plt.title("RMSELoss")

In [None]:
preds = np.array([i[0] for i in base_model(X_test).to("cpu").detach().numpy()])

In [None]:
categories = [i for i in range(len(preds) // 2)]
true_values = y_test.to("cpu").numpy()[:len(preds) // 2]
predictions = preds[:len(preds) // 2]

x = np.arange(len(categories))

plt.figure(figsize=(20, 20))

plt.bar(x, true_values, 0.4, label='True Values', color='blue')

plt.bar(x, predictions, 0.35, label='Predictions', color='red', alpha=0.8)

plt.legend()

plt.show()

In [None]:
print(f"RMSE = {mean_squared_error(preds, y_test.to('cpu').numpy(), squared=False):0.2f}")

In [None]:
def custom_nn(params: dict, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test):
    input_neurons = params.get("input_neurons", X_train.shape[1])
    output_neurons = params.get("output_neurons", 1)
    n_combinated_layers = params.get("n_combinated_layers", 5)
    function_activation = params.get("function_activation", nn.LeakyReLU(0.2))
    drop_out_every_layer = params.get("drop_out_every_layer", 3)
    drop_out_part = params.get("drop_out_part", 0.25)
    num_epochs = params.get("num_epochs", 1000)
    batch_size = params.get("batch_size", 10)
    device = params.get("device", "cuda")
    rmse_ = params.get("rmse_", 10**100)
    middle_layers = params.get("middle_layers")
    
    n_neruons = [input_neurons]

    for i in range(n_combinated_layers):
        for g in middle_layers:
            n_neruons.append(g)
    n_neruons.append(output_neurons)

    net_layers = []

    for i in range(1, len(n_neruons) - 1):
        if i == 1 or i % drop_out_every_layer != 0:
            net_layers.append(nn.Linear(n_neruons[i - 1], n_neruons[i]))
            net_layers.append(function_activation)   
        else:
            net_layers.append(nn.Dropout(drop_out_part))
            net_layers.append(nn.Linear(n_neruons[i - 1], n_neruons[i]))
            net_layers.append(function_activation)

    net_layers.append(nn.Linear(n_neruons[-2], n_neruons[-1]))

    net = nn.Sequential(*net_layers).to(dtype=torch.float64, device=device)

    optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
    loss = nn.MSELoss().to(device=device)

    num_batches = ceil(len(X_train)/batch_size)

    best_rmse = rmse_
    best_optimizer = None
    best_net = None

    # training_model
    for epoch in tqdm.tqdm(range(num_epochs)):
        for batch_idx in range(num_batches):
            start_index = batch_idx * batch_size
            optimizer.zero_grad()

            batch_indexes = order[start_index:start_index+batch_size]
            X_batch = X_train[batch_indexes]
            y_batch = y_train[batch_indexes]

            preds = net(X_batch).flatten()
                
            loss_value = loss(preds, y_batch)

            loss_value.backward()
                
            optimizer.step()
                
            if epoch % 10 == 0 or epoch == num_epochs - 1:
                net.eval()
                test_preds = net(X_test)
                rmse = mean_squared_error(test_preds.to("cpu").detach().numpy(), y_test.to("cpu").detach().numpy(), squared=False)

                if rmse < best_rmse:
                    best_rmse = rmse
                    best_optimizer = optimizer
                    best_net = net
                    

    return best_net, best_optimizer, best_rmse

In [None]:
def cross_validate(params):
    param_names = list(params.keys())
    param_values = [params[param_name] for param_name in param_names]

    param_combinations = list(itertools.product(*param_values))

    for combination in param_combinations:
        params_for_net = {param_names[i]: combination[i] for i in range(len(param_names))}
        yield params_for_net

In [None]:
params = {
    "middle_layers":[[input_neruons//2, input_neruons//2, input_neruons],
                     [input_neruons//3, input_neruons//3, input_neruons//2]],
    "n_combinated_layers":[1, 5, 10],
    "num_epochs":[1000, 2000, 3000],
    "batch_size":[20, 30, 40], 
}

In [None]:
iterations = 1

for i in params:
    iterations *= len(params[i])

In [None]:
params = cross_validate(params)

In [None]:
best_params = None
best_rmse = 10 ** 100
best_model = None
best_optimizer = None

In [None]:
for i in range(iterations):
    param = next(params)
    net, optimizer, rmse = custom_nn(param)

    if rmse < best_rmse:
        best_rmse = rmse
        best_params = param
        best_model = net
        best_optimizer = optimizer
        print(best_rmse)

In [None]:
best_params

In [None]:
n_neruons = [14]

In [None]:
for i in range(10):
    for g in [7, 7, 14]:
        n_neruons.append(g)
n_neruons.append(output_neruons)

In [None]:
net_layers = []

In [None]:
for i in range(1, len(n_neruons) - 1):
    if i == 1 or i % 3 != 0:
        net_layers.append(nn.Linear(n_neruons[i - 1], n_neruons[i]))
        net_layers.append(nn.LeakyReLU(0.2))   
    else:
        net_layers.append(nn.Dropout(0.25))
        net_layers.append(nn.Linear(n_neruons[i - 1], n_neruons[i]))
        net_layers.append(nn.LeakyReLU(0.2))

net_layers.append(nn.Linear(n_neruons[-2], n_neruons[-1]))

In [None]:
net = nn.Sequential(*net_layers).to(dtype=torch.float64, device=device)

In [None]:
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
 
loss = nn.MSELoss().to(device=device)

In [None]:
batch_size = 40
 
num_epochs = 3000
 
num_batches = ceil(len(X_train)/batch_size)

accs = []
epoch_ = []

best_rmse = 10**100
best_optimizer = None
best_net = None

for epoch in tqdm.tqdm(range(num_epochs)):
    for batch_idx in range(num_batches):
        start_index = batch_idx * batch_size
        optimizer.zero_grad()

        batch_indexes = order[start_index:start_index+batch_size]
        X_batch = X_train[batch_indexes]
        y_batch = y_train[batch_indexes]

        preds = net(X_batch).flatten()
            
        loss_value = loss(preds, y_batch)

        loss_value.backward()
            
        optimizer.step()
        
    if epoch % 10 == 0 or epoch == num_epochs - 1:
        net.eval()
        test_preds = net(X_test)
        rmse = mean_squared_error(test_preds.to("cpu").detach().numpy(), y_test.to("cpu").detach().numpy(), squared=False)
        accs.append(rmse)
        epoch_.append(epoch)

        if rmse < best_rmse:
            best_rmse = rmse
            best_optimizer = optimizer
            best_net = net

In [None]:
plt.plot(epoch_, accs)
plt.title("RMSELoss")

In [None]:
preds = np.array([i[0] for i in net(X_test).to("cpu").detach().numpy()])

In [None]:
categories = [i for i in range(len(preds) // 2)]
true_values = y_test.to("cpu").numpy()[:len(preds) // 2]
predictions = preds[:len(preds) // 2]

x = np.arange(len(categories))

plt.figure(figsize=(20, 20))

plt.bar(x, true_values, 0.4, label='True Values', color='blue')

plt.bar(x, predictions, 0.35, label='Predictions', color='red', alpha=0.8)

plt.legend()

plt.show()

In [None]:
print(f"RMSE = {mean_squared_error(preds, y_test.to('cpu').numpy(), squared=False):0.2f}")