In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PowerTransformer, PolynomialFeatures
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split

from torch import nn
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import pytorch_optimizer as optim1


from pytorch_lightning import LightningModule
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger
import pytorch_lightning as pl
from torchmetrics.classification import F1Score, MulticlassCohenKappa, Accuracy

In [None]:
# Install dependencies as needed:
# pip install kagglehub[pandas-datasets]
import kagglehub
from kagglehub import KaggleDatasetAdapter

# Set the path to the file you'd like to load
file_path = "synthetic_coffee_health_10000.csv"

# Load the latest version
df = kagglehub.load_dataset(
  KaggleDatasetAdapter.PANDAS,
  "uom190346a/global-coffee-health-dataset",
  file_path,
  # Provide any additional arguments like 
  # sql_query or pandas_kwargs. See the 
  # documenation for more information:
  # https://github.com/Kaggle/kagglehub/blob/main/README.md#kaggledatasetadapterpandas
)
df.set_index('ID', inplace=True)
df.drop(columns=['Health_Issues'], inplace=True) 
print("First 5 records:", df.head())

In [None]:
df.info()
df

In [None]:
X = df.drop(columns=['Sleep_Quality'], axis=1)
y = df['Sleep_Quality']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_train

In [None]:
num = X.select_dtypes(include=['int64', 'float64']).columns
cat = X.select_dtypes(include=['object', 'category']).columns
num, cat

In [None]:
num_pipe = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    # ('poly', PolynomialFeatures(degree=1, include_bias=False)),
    ('power', PowerTransformer(method='yeo-johnson')),
    ('scaler', StandardScaler())
])
cat_pipe = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
    ('num', num_pipe, X_train.select_dtypes(include=['int64', 'float64']).columns),
    ('cat', cat_pipe, X_train.select_dtypes(include=['object', 'category']).columns)
])

In [None]:
X_train = preprocessor.fit_transform(X_train)
X_val = preprocessor.transform(X_val)


In [None]:
df.Sleep_Quality.value_counts()

In [None]:
map = lambda s: {'Poor': 0, 'Fair':1, 'Good': 2, 'Excellent': 3}[s]
y_train = y_train.map(map).values
y_val = y_val.map(map).values
y_train, y_val

# Dataset and Dataloader

In [None]:
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.long)

In [None]:
train_set = TensorDataset(X_train, y_train)
val_set = TensorDataset(X_val, y_val)

train_loader = DataLoader(train_set, batch_size=32, shuffle=True)
val_loader = DataLoader(val_set, batch_size=32)

In [None]:
feature, target = next(iter(train_loader))
print(feature.shape, target.shape)

# NN

In [None]:
class NN(nn.Module):
    def __init__(self, input_size, num_classes) -> None:
        super().__init__()

        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.Mish(),

            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.Mish(),
            nn.Dropout(0.2),

            nn.Linear(64, 32),
            nn.BatchNorm1d(32),
            nn.Mish(),
            nn.Dropout(0.2),

            nn.Linear(32, 16),
            nn.BatchNorm1d(16),
            nn.Mish(),
            nn.Dropout(0.2),

            nn.Linear(16, 8),
            nn.BatchNorm1d(8),
            nn.Mish(),
            nn.Dropout(0.2),

            nn.Linear(8, num_classes)
        )

    def forward(self, X):
        return self.fc(X)

In [None]:
class PLNN(LightningModule):
    def __init__(self, input_size, num_classes=4, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters() 
        self.model = NN(input_size=input_size, num_classes=num_classes)
        self.criterion = nn.CrossEntropyLoss()
        self.kappa = MulticlassCohenKappa(num_classes=num_classes, weights='quadratic')
        self.f1 = F1Score(task='multiclass', num_classes=num_classes, average='weighted')
        self.accurasy = Accuracy(task='multiclass', num_classes=num_classes)
    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self(inputs) 
        loss = self.criterion(outputs, labels)
        acc = self.accurasy(outputs, labels)
        f1 = self.accurasy(outputs, labels)
        kappa = self.accurasy(outputs, labels)

        
        self.log('train_loss', loss, on_epoch=True, prog_bar=True)
        self.log('train_f1', f1, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_epoch=True, prog_bar=True)
        self.log('train_kappa', kappa, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self(inputs)
        loss = self.criterion(outputs, labels)
        acc = self.accurasy(outputs, labels)
        f1 = self.accurasy(outputs, labels)
        kappa = self.accurasy(outputs, labels)


        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.log('val_f1', f1, on_epoch=True, prog_bar=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True)
        self.log('val_kappa', kappa, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        optimizer = optim1.AdamW(self.parameters(), lr=self.hparams.learning_rate)
        return optimizer

# Config

In [None]:
if torch.cuda.is_available():
    accelerator_type = 'gpu'
    devices_to_use = 1
else:
    accelerator_type = 'cpu'
    devices_to_use = 'auto'

checkpoint_callback = ModelCheckpoint(
    monitor='val_f1',
    dirpath='checkpoints_SleepQuality/',
    filename='bodyP-{epoch:02d}-{val_f1:.2f}',
    save_top_k=1,
    mode='max'
)
early_stopping = EarlyStopping(
    monitor='val_f1',
    patience=10,
    mode='max'
)
lr_monitor_callback = LearningRateMonitor(logging_interval='epoch')

trainer1 = pl.Trainer(
    max_epochs=300,
    callbacks=[checkpoint_callback, early_stopping, lr_monitor_callback],
    logger=TensorBoardLogger("tb_logs_SleepQuality", name="simple_model_experiment"),
    accelerator=accelerator_type,
    devices=devices_to_use,
    log_every_n_steps=10,
    deterministic=True
)

# train

In [None]:
model = PLNN(input_size=X_train.shape[1], num_classes=4, learning_rate=1e-3)
trainer1.fit(model, train_loader, val_loader)

# eval

In [None]:
trainer1.validate(model, val_loader, ckpt_path='best')