In [2]:
import json
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import xgboost as xgb

from optuna.samplers import TPESampler
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer, StandardScaler
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# df = pd.read_pickle('data/perfumes.pkl')
df = pd.read_pickle('encoded_perfumes.pkl')
df.drop(columns=["collection", "top_notes", "middle_notes", "base_notes", "all_notes"], inplace=True)
df.drop(columns=["url", "Duft.number_of_ratings", "Haltbarkeit.number_of_ratings", "Sillage.number_of_ratings", "Flakon.number_of_ratings", "Preis-Leistungs-Verhältnis.number_of_ratings"], inplace=True)

drop_list = []
for i in range(0,11):
    drop_list.append(f'scent.{i}')
    drop_list.append(f'sillage.{i}')
    drop_list.append(f'durability.{i}')
    drop_list.append(f'pricing.{i}')
    drop_list.append(f'bottle.{i}')

df.drop(columns=drop_list, inplace=True)


encoded_dfs = [df.copy()]

for col in ['brand', 'perfumer', 'flakon_designer']:
    df[col] = df[col].apply(lambda x: x if pd.notna(x) else None)
    dummies = pd.get_dummies(df[col], prefix=col, prefix_sep='_', dummy_na=False).astype(int)
    encoded_dfs.append(dummies)

# Concatenate all dataframes horizontally
df_encoded = pd.concat(encoded_dfs, axis=1)
df = df_encoded


numeric_df = df.drop(columns=['name', 'brand', 'year', 'flakon_designer', 'perfumer'])
feature_df = numeric_df.drop(columns=['Duft.rating', 'Haltbarkeit.rating', 'Sillage.rating', 'Flakon.rating', 'Preis-Leistungs-Verhältnis.rating'])

feature_df.fillna(0, inplace=True)

feature_df.info
feature_df.isna().any().any()
df[df.isna().any(axis=1)]

In [None]:
X = feature_df # Numeric features
y = df['Duft.rating']  # Target column

# Drop rows where Duft.rating is NaN
mask = ~y.isna()  # Keep only rows where y is not NaN
X = X[mask]
y = y[mask]

In [None]:
def train(inp, out):
    device = torch.device("mps")  # Use MPS for Mac GPU acceleration

    # Drop NaN values
    mask = ~out.isna()
    X = inp[mask].to_numpy()
    y = out[mask].to_numpy()


    # Convert to PyTorch tensors and move to GPU
    X = torch.tensor(X, dtype=torch.float32, device=device)
    y = torch.tensor(y, dtype=torch.float32, device=device)

    # Normalize data
    X_mean, X_std = X.mean(), X.std()
    X_norm = (X - X_mean) / X_std

    y_mean, y_std = y.mean(), y.std()
    y_norm = (y - y_mean) / y_std

    # Split into train and test sets (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(X_norm.cpu().numpy(), 
                                                         y_norm.cpu().numpy(), 
                                                         test_size=0.2, 
                                                         random_state=42)

    # Convert train-test sets back to PyTorch tensors and move to GPU
    X_train = torch.tensor(X_train, dtype=torch.float32, device=device)
    X_test = torch.tensor(X_test, dtype=torch.float32, device=device)
    y_train = torch.tensor(y_train, dtype=torch.float32, device=device)
    y_test = torch.tensor(y_test, dtype=torch.float32, device=device)

    # Compute closed-form solution (θ = (X^T * X)^(-1) * X^T * y)
    theta_hat = torch.linalg.pinv(X_train.T @ X_train) @ X_train.T @ y_train

    # Predictions
    y_pred = X_test @ theta_hat

    # Compute Mean Squared Error
    mse = torch.mean((y_test - y_pred) ** 2)
    print(f"Test Set MSE: {mse.item():.4f}")

    # Find top 10 most influential features
    theta_abs = torch.abs(theta_hat)
    top_indices = torch.argsort(theta_abs, descending=True)[:10]

    print("Top 10 most influential features:")
    print(inp.columns[top_indices.cpu().numpy()])  # Convert indices to feature names

In [None]:
train(feature_df, df['Duft.rating'])

In [None]:
train(feature_df, df['Haltbarkeit.rating'])

In [None]:
train(feature_df, df['Sillage.rating'])

In [None]:
train(feature_df, df['Preis-Leistungs-Verhältnis.rating'])

In [None]:
# X = feature_df  # Numeric features
# y = df['Duft.rating']  # Target column

# # Drop rows where Duft.rating is NaN
# mask = ~y.isna()  # Keep only rows where y is not NaN
# X = X[mask]
# y = y[mask]

# X_np = X.to_numpy().astype(np.float32)  # Ensure float32 for PyTorch
# y_np = y.to_numpy().astype(np.float32).reshape(-1, 1)  # Reshape for single output

# # Split into train and test
# X_train, X_test, y_train, y_test = train_test_split(X_np, y_np, test_size=0.2, random_state=42)

# # Convert to PyTorch tensors
# X_train_tensor = torch.tensor(X_train)
# y_train_tensor = torch.tensor(y_train)
# X_test_tensor = torch.tensor(X_test)
# y_test_tensor = torch.tensor(y_test)

# # Create DataLoaders
# batch_size = 16
# train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True)
# test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=batch_size, shuffle=False)

# # Define Neural Network
# class NeuralNet(nn.Module):
#     def __init__(self, input_size):
#         super(NeuralNet, self).__init__()
#         self.fc1 = nn.Linear(input_size, 64)  # First layer with 64 neurons
#         self.fc2 = nn.Linear(64, 32)  # Second layer with 32 neurons
#         self.fc3 = nn.Linear(32, 1)  # Output layer
#         self.relu = nn.LeakyReLU(0.01)  # Activation function

#     def forward(self, x):
#         x = self.relu(self.fc1(x))
#         x = self.relu(self.fc2(x))
#         x = self.fc3(x)  # No activation here (Regression)
#         return x

# # Initialize model
# input_dim = X_train.shape[1]
# model = NeuralNet(input_dim)

In [None]:
# # Loss and Optimizer
# criterion = nn.MSELoss()
# optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)  # Adam optimizer

# # Training Loop
# num_epochs = 150  # Adjust epochs if needed
# for epoch in range(num_epochs):
#     model.train()
#     total_loss = 0
#     for X_batch, y_batch in train_loader:
#         optimizer.zero_grad()
#         y_pred = model(X_batch)
#         loss = criterion(y_pred, y_batch)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()
    
#     if (epoch+1) % 10 == 0:  # Print every 10 epochs
#         print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}")

In [None]:
# # Evaluation
# model.eval()
# with torch.no_grad():
#     y_test_pred = model(X_test_tensor)
#     test_loss = criterion(y_test_pred, y_test_tensor).item()
    
# print(f"Test MSE: {test_loss:.4f}")

In [None]:
# from sklearn.metrics import r2_score

# y_test_pred_np = y_test_pred.numpy()
# r2 = r2_score(y_test, y_test_pred_np)
# print(f"R² Score: {r2:.4f}")

In [None]:
# # Prepare data (same as before)
# X = feature_df  # Numeric features
# y = df['Duft.rating']  # Target column

# # Drop rows where Duft.rating is NaN
# mask = ~y.isna()  # Keep only rows where y is not NaN
# X = X[mask]
# y = y[mask]

# X_np = X.to_numpy().astype(np.float32)  # Ensure float32 for PyTorch
# y_np = y.to_numpy().astype(np.float32).reshape(-1, 1)  # Reshape for single output

# # Split into train and test
# X_train, X_test, y_train, y_test = train_test_split(X_np, y_np, test_size=0.2, random_state=42)

# # Convert to PyTorch tensors
# X_train_tensor = torch.tensor(X_train)
# y_train_tensor = torch.tensor(y_train)
# X_test_tensor = torch.tensor(X_test)
# y_test_tensor = torch.tensor(y_test)

# # Dynamic Neural Network class that can adapt to hyperparameters
# class DynamicNeuralNet(nn.Module):
#     def __init__(self, input_size, n_layers, layer_sizes, activation):
#         super(DynamicNeuralNet, self).__init__()
        
#         # Dictionary of available activation functions
#         self.activations = {
#             'relu': nn.ReLU(),
#             'leaky_relu': nn.LeakyReLU(0.01),
#             'tanh': nn.Tanh(),
#             'sigmoid': nn.Sigmoid(),
#             'elu': nn.ELU()
#         }
        
#         # Set activation function
#         self.activation = self.activations[activation]
        
#         # Create network architecture dynamically
#         layers = []
#         prev_size = input_size
        
#         # Create hidden layers
#         for i in range(n_layers):
#             layers.append(nn.Linear(prev_size, layer_sizes))
#             layers.append(self.activation)
#             prev_size = layer_sizes
        
#         # Output layer
#         layers.append(nn.Linear(prev_size, 1))
        
#         # Sequential container for all layers
#         self.network = nn.Sequential(*layers)
    
#     def forward(self, x):
#         return self.network(x)

# # Objective function for Optuna
# def objective(trial):
#     # Hyperparameters to tune
#     n_layers = trial.suggest_int('n_layers', 1, 5)  # Number of hidden layers
#     layer_size = trial.suggest_int('layer_size', 16, 256)  # Size of each layer
#     activation = trial.suggest_categorical('activation', ['relu', 'leaky_relu', 'tanh', 'elu'])
    
#     # Optimizer selection
#     optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'AdamW', 'SGD', 'RMSprop'])
#     lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)  # Learning rate on log scale
    
#     # Other hyperparameters
#     batch_size = trial.suggest_categorical('batch_size', [8, 16, 32, 64])
#     epochs = trial.suggest_int('epochs', 50, 300)
    
#     # Create model using the hyperparameters
#     input_dim = X_train.shape[1]
#     model = DynamicNeuralNet(input_dim, n_layers, layer_size, activation)
    
#     # Select optimizer
#     if optimizer_name == 'Adam':
#         optimizer = optim.Adam(model.parameters(), lr=lr)
#     elif optimizer_name == 'AdamW':
#         weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
#         optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
#     elif optimizer_name == 'SGD':
#         momentum = trial.suggest_float('momentum', 0.0, 0.99)
#         optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
#     else:  # RMSprop
#         optimizer = optim.RMSprop(model.parameters(), lr=lr)
    
#     # Loss function
#     criterion = nn.MSELoss()
    
#     # Create DataLoaders
#     train_loader = DataLoader(
#         TensorDataset(X_train_tensor, y_train_tensor), 
#         batch_size=batch_size, 
#         shuffle=True
#     )
    
#     # Training loop
#     for epoch in range(epochs):
#         model.train()
#         total_loss = 0
#         for X_batch, y_batch in train_loader:
#             optimizer.zero_grad()
#             y_pred = model(X_batch)
#             loss = criterion(y_pred, y_batch)
#             loss.backward()
#             optimizer.step()
#             total_loss += loss.item()
        
#         # Optuna has a built-in pruning mechanism to stop unpromising trials
#         if epoch % 10 == 0:
#             # Evaluate on validation set for pruning
#             model.eval()
#             with torch.no_grad():
#                 y_val_pred = model(X_test_tensor)
#                 val_loss = criterion(y_val_pred, y_test_tensor).item()
            
#             trial.report(val_loss, epoch)
            
#             # Handle pruning based on the intermediate result
#             if trial.should_prune():
#                 raise optuna.exceptions.TrialPruned()
    
#     # Final evaluation
#     model.eval()
#     with torch.no_grad():
#         y_test_pred = model(X_test_tensor)
#         test_loss = criterion(y_test_pred, y_test_tensor).item()
#         r2 = r2_score(y_test.flatten(), y_test_pred.numpy().flatten())
    
#     # We want to maximize R², so return negative loss or return R² directly
#     return r2  # Optuna minimizes by default, but since we return R², it will maximize it

# # Create a study object and optimize the objective function
# study = optuna.create_study(
#     direction='maximize',  # We want to maximize R²
#     sampler=TPESampler(),  # Use Tree-structured Parzen Estimator for Bayesian optimization
#     pruner=optuna.pruners.MedianPruner()  # Pruning mechanism
# )
# study.optimize(objective, n_trials=50)  # Adjust number of trials based on your computation budget

# # Print the best hyperparameters and score
# print("Best trial:")
# trial = study.best_trial
# print(f"  Value (R²): {trial.value:.4f}")
# print("  Params: ")
# for key, value in trial.params.items():
#     print(f"    {key}: {value}")

# # Train the final model with the best hyperparameters
# best_params = study.best_params

# # Create final model
# input_dim = X_train.shape[1]
# final_model = DynamicNeuralNet(
#     input_dim, 
#     best_params['n_layers'], 
#     best_params['layer_size'], 
#     best_params['activation']
# )

# # Setup best optimizer
# if best_params['optimizer'] == 'Adam':
#     final_optimizer = optim.Adam(final_model.parameters(), lr=best_params['lr'])
# elif best_params['optimizer'] == 'AdamW':
#     final_optimizer = optim.AdamW(
#         final_model.parameters(), 
#         lr=best_params['lr'], 
#         weight_decay=best_params['weight_decay']
#     )
# elif best_params['optimizer'] == 'SGD':
#     final_optimizer = optim.SGD(
#         final_model.parameters(), 
#         lr=best_params['lr'], 
#         momentum=best_params['momentum']
#     )
# else:  # RMSprop
#     final_optimizer = optim.RMSprop(final_model.parameters(), lr=best_params['lr'])

# # Loss function
# criterion = nn.MSELoss()

# # Create DataLoader with best batch size
# train_loader = DataLoader(
#     TensorDataset(X_train_tensor, y_train_tensor), 
#     batch_size=best_params['batch_size'], 
#     shuffle=True
# )
# test_loader = DataLoader(
#     TensorDataset(X_test_tensor, y_test_tensor), 
#     batch_size=best_params['batch_size'], 
#     shuffle=False
# )

# # Train the final model
# for epoch in range(best_params['epochs']):
#     final_model.train()
#     total_loss = 0
#     for X_batch, y_batch in train_loader:
#         final_optimizer.zero_grad()
#         y_pred = final_model(X_batch)
#         loss = criterion(y_pred, y_batch)
#         loss.backward()
#         final_optimizer.step()
#         total_loss += loss.item()
    
#     if (epoch+1) % 10 == 0:
#         print(f"Epoch {epoch+1}/{best_params['epochs']}, Loss: {total_loss / len(train_loader):.4f}")

# # Final evaluation
# final_model.eval()
# with torch.no_grad():
#     y_test_pred = final_model(X_test_tensor)
#     test_loss = criterion(y_test_pred, y_test_tensor).item()
    
# print(f"Test MSE: {test_loss:.4f}")

# # Calculate and print R² score
# y_test_pred_np = y_test_pred.numpy()
# r2 = r2_score(y_test, y_test_pred_np)
# print(f"Final R² Score: {r2:.4f}")



# # Save the best model
# torch.save(final_model.state_dict(), 'best_duft_rating_model.pth')

In [None]:
# Plot optimization history
plt.figure(figsize=(10, 6))
plt.subplot(1, 2, 1)
optuna.visualization.matplotlib.plot_optimization_history(study)
plt.subplot(1, 2, 2)
optuna.visualization.matplotlib.plot_param_importances(study)
plt.tight_layout()
plt.show()

### Decision Trees

In [None]:
# Load feature dataframe
X = feature_df  # Numeric features
y = df['Duft.rating']  # Target column

# Drop rows where Duft.rating is NaN
mask = ~y.isna()  # Keep only rows where y is not NaN
X = X[mask]
y = y[mask]

# Convert to numpy arrays
X_np = X.to_numpy().astype(np.float16)
y_np = y.to_numpy().astype(np.float16)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_np, y_np, test_size=0.2, random_state=42)

# Convert to XGBoost DMatrix format (for efficiency)
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

# Optuna objective function for XGBoost
def objective(trial):
    params = {
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "booster": "gbtree",
        "tree_method": "hist",  # Faster training
        "n_jobs": -1,
        "max_depth": trial.suggest_int("max_depth", 3, 15),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "n_estimators": trial.suggest_int("n_estimators", 50, 500),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "lambda": trial.suggest_float("lambda", 1e-8, 10.0, log=True),  # L2 regularization
        "alpha": trial.suggest_float("alpha", 1e-8, 10.0, log=True),   # L1 regularization
    }
    
    # Train XGBoost model
    model = xgb.train(params, dtrain, num_boost_round=params["n_estimators"])
    
    # Predict and evaluate
    y_pred = model.predict(dtest)
    r2 = r2_score(y_test, y_pred)
    
    return r2  # Optuna will maximize R²

# Run Optuna optimization
study = optuna.create_study(direction="maximize", sampler=TPESampler())
study.optimize(objective, n_trials=50)

# Print best hyperparameters
print("Best trial:")
best_params = study.best_trial.params
print(f"  Best R² Score: {study.best_trial.value:.4f}")
print("  Best Hyperparameters:", best_params)

# Train final model with best parameters
final_model = xgb.train(best_params, dtrain, num_boost_round=best_params["n_estimators"])

# Evaluate final model
y_pred_final = final_model.predict(dtest)
final_r2 = r2_score(y_test, y_pred_final)

print(f"Final Test R² Score: {final_r2:.4f}")

# Save the model
final_model.save_model("best_xgboost_duft_rating.json")