<a href="https://colab.research.google.com/github/MengOonLee/Deep_learning/blob/master/PyTorch/Transformer/TabTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%bash
pip install -qU torch lightning

In [1]:
import pandas as pd
import torch
import lightning as L

class TabularDataset(torch.utils.data.Dataset):
    def __init__(self, X_num, X_cat, y):
        self.X_num = torch.tensor(data=X_num, dtype=torch.float32)
        self.X_cat = torch.tensor(data=X_cat, dtype=torch.long)
        self.y = torch.tensor(data=y, dtype=torch.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X_num[idx], self.X_cat[idx], self.y[idx]

class CensusDataModule(L.LightningDataModule):
    def __init__(self, train_url, test_url, batch_size=256):
        super().__init__()
        self.train_url = train_url
        self.test_url = test_url
        self.batch_size = batch_size

    def _load_data(self, df):
        df = df.copy().drop(columns=['fnlwgt', 'education_num']).dropna()
        df['income_bracket'] = df['income_bracket'].astype(str)\
            .str.replace('.', '').str.strip()
        y = df['income_bracket'].map({'<=50K': 0, '>50K': 1}).astype('float32')

        NUMERIC_FEATURES = ['capital_gain', 'capital_loss', 'hours_per_week']
        X_num = df[NUMERIC_FEATURES].astype('float32')
        X_cat = df.drop(columns=NUMERIC_FEATURES + ['income_bracket'])\
            .astype(str).apply(lambda s: s.str.strip())
        return X_num.values, X_cat, y.values

    def _build_category_maps(self, train_cat):
        maps = {}
        for col in train_cat.columns:
            unique_vals = sorted(train_cat[col].unique())
            maps[col] = {val: i+1 for i, val in enumerate(unique_vals)}
            maps[col]['__UNK__'] = 0
        return maps

    def _apply_category_maps(self, X_cat, maps):
        X_cat = X_cat.copy().apply(
            lambda c: c.map(maps[c.name]).fillna(0).astype(int))
        return X_cat.values

    def setup(self, stage=None):
        CSV_HEADERS = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
            'marital_status', 'occupation', 'relationship', 'race', 'gender',
            'capital_gain', 'capital_loss', 'hours_per_week', 'native_country',
            'income_bracket']
        df_train = pd.read_csv(self.train_url, header=None,
            names=CSV_HEADERS)
        df_test = pd.read_csv(self.test_url, header=0,
            names=CSV_HEADERS)

        X_num_train, X_cat_train, y_train = self._load_data(df=df_train)
        X_num_test, X_cat_test, y_test = self._load_data(df=df_test)

        self.cat_maps = self._build_category_maps(train_cat=X_cat_train)
        X_cat_train = self._apply_category_maps(X_cat=X_cat_train,
            maps=self.cat_maps)
        X_cat_test = self._apply_category_maps(X_cat=X_cat_test,
            maps=self.cat_maps)

        self.mean = X_num_train.mean(axis=0)
        self.std = X_num_train.std(axis=0)
        self.num_features = X_num_train.shape[1]
        self.cat_cardinalities = [len(self.cat_maps[c]) for c in self.cat_maps.keys()]

        self.ds_train = TabularDataset(X_num=X_num_train, X_cat=X_cat_train, y=y_train)
        ds_test = TabularDataset(X_num=X_num_test, X_cat=X_cat_test, y=y_test)
        self.ds_val, self.ds_test = torch.utils.data.random_split(dataset=ds_test,
            lengths=[0.5, 0.5], generator=torch.Generator().manual_seed(42))

    def train_dataloader(self):
        return torch.utils.data.DataLoader(dataset=self.ds_train,
            batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return torch.utils.data.DataLoader(dataset=self.ds_val,
            batch_size=self.batch_size)

    def test_dataloader(self):
        return torch.utils.data.DataLoader(dataset=self.ds_test,
            batch_size=self.batch_size)

In [11]:
import torch
import lightning as L

class Normalization(torch.nn.Module):
    def __init__(self, mean, std):
        super().__init__()
        self.register_buffer('mean', torch.tensor(data=mean, dtype=torch.float32))
        self.register_buffer('std', torch.tensor(data=std, dtype=torch.float32))

    def forward(self, x):
        return (x - self.mean) / self.std

class ClassifyModel(L.LightningModule):
    def __init__(self, num_features, cat_cardinalities, mean, std):
        super().__init__()

        self.normalizer = Normalization(mean=mean, std=std)

        self.emb_layers = torch.nn.ModuleList(modules=[
            torch.nn.Embedding(num_embeddings=c+1, embedding_dim=min(8, (c+1)//2))
            for c in cat_cardinalities
        ])
        emb_out_dim = sum(e.embedding_dim for e in self.emb_layers)

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=num_features + emb_out_dim, out_features=128),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=0.3),
            torch.nn.Linear(in_features=128, out_features=64),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=0.3),
            torch.nn.Linear(in_features=64, out_features=1),
            torch.nn.Sigmoid()
        )

        self.criterion = torch.nn.BCELoss()

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

    def forward(self, x_num, x_cat):
        x_num = self.normalizer(x_num)
        x_emb = [emb(x_cat[:, i]) for i, emb in enumerate(self.emb_layers)]
        x_emb = torch.cat(tensors=x_emb, dim=1)
        x = torch.cat(tensors=[x_num, x_emb], dim=1)
        return self.fc(x)

    def training_step(self, batch, batch_idx):
        x_num, x_cat, y = batch
        preds = self(x_num=x_num, x_cat=x_cat).squeeze()
        loss = self.criterion(preds, y)
        self.log('train_loss', loss)
        return loss

In [12]:
import torch
import lightning as L

if __name__ == '__main__':
    train_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data'
    test_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test'
    datamodule = CensusDataModule(train_url=train_url, test_url=test_url)
    datamodule.setup()

    model = ClassifyModel(
        num_features=datamodule.num_features,
        cat_cardinalities=datamodule.cat_cardinalities,
        mean=datamodule.mean,
        std=datamodule.std
    )

    trainer = L.Trainer(max_epochs=3, accelerator='auto', devices=1)
    trainer.fit(model=model, datamodule=datamodule)

INFO: 💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
INFO:lightning.pytorch.utilities.rank_zero:💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
/usr/local/lib/python3.12/dist-packages/lightning/pytorch/trainer/configuration_validator.py:68: You passed in a `val_dataloader` but have no `validation_step`. Skipping va

Training: |          | 0/? [00:00<?, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=3` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=3` reached.
