Creator: Dhanajit Brahma

Adapted from the original implementation in tensorflow from here: https://github.com/jsyoon0823/GAIN

Generative Adversarial Imputation Networks (GAIN) Implementation on Letter and Spam Dataset

Reference: J. Yoon, J. Jordon, M. van der Schaar, "GAIN: Missing Data Imputation using Generative Adversarial Nets," ICML, 2018.

In [None]:
#%% Packages
import os
import random
import numpy as np
import pandas as pd
from math import sqrt

from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import HistGradientBoostingRegressor

import torch
import torch.nn as nn
import torch.utils.data
import torch.optim as optim

import time
from tqdm import tqdm
import torch.nn.functional as F

In [None]:
start_time = time.time()

In [None]:
dataset_file = 'datas/dd2/BostonHousing.csv'  # 'Letter.csv' for Letter dataset an 'Spam.csv' for Spam dataset
use_gpu = False  # set it to True to use GPU and False to use CPU
if use_gpu:
    torch.cuda.set_device(0)

In [None]:
datas = pd.read_csv(dataset_file, on_bad_lines='skip')

In [None]:
# ---------------- Detect column types ----------------
numeric_features = datas.select_dtypes(include=["int64", "float64"]).columns.to_list()
categorical_features = datas.select_dtypes(include=["object", "category"]).columns.to_list()

# ---------------- Define numeric imputer using HistGradientBoostingRegressor ----------------
# Custom imputer using HistGradientBoostingRegressor for numeric data
class HGBRImputer:
    def __init__(self):
        self.models = {}
        self.features = None

    # Add y=None to match sklearn's fit signature
    def fit(self, X, y=None):
        self.features = X.columns
        for col in self.features:
            is_missing = X[col].isnull()
            if is_missing.sum() == 0:
                continue
            train_idx = ~is_missing
            # Fill missing values in predictors with median for training
            X_train = X.loc[train_idx].drop(columns=col).fillna(X.loc[train_idx].median())
            y_train = X.loc[train_idx, col]
            model = HistGradientBoostingRegressor(
                learning_rate = 0.1, 
                max_depth = 15, 
                min_samples_leaf = 30, 
                l2_regularization = 0.1, 
                max_bins = 255, 
                scoring = 'neg_mean_squared_error',
                max_iter=100,
                max_leaf_nodes=31,
                random_state=42,
                early_stopping=True,
                validation_fraction=0.25,
                n_iter_no_change=10
            )
            model.fit(X_train, y_train)
            self.models[col] = model
        return self

    def transform(self, X):
        X_filled = X.copy()
        for col, model in self.models.items():
            missing_idx = X_filled[col].isnull()
            if missing_idx.sum() == 0:
                continue
            # Fill missing predictor features with median before prediction
            X_pred = X_filled.loc[missing_idx].drop(columns=col).fillna(X_filled.median())
            preds = model.predict(X_pred)
            X_filled.loc[missing_idx, col] = preds
        return X_filled


# ---------------- Define transformers ----------------

numeric_transformer = Pipeline(steps=[
    ("imputer", HGBRImputer()),
    ("scaler", MinMaxScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),  # handle missing categorical
    ("ordinal", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1))
])

# ---------------- Column Transformer ----------------
transformers = []
if len(numeric_features) > 0:
    transformers.append(("num", numeric_transformer, numeric_features))
if len(categorical_features) > 0:
    transformers.append(("cat", categorical_transformer, categorical_features))

preprocessor = ColumnTransformer(
    transformers=transformers,
    sparse_threshold=0
)

# ---------------- Full Pipeline ----------------
pipeline = Pipeline(steps=[("preprocessor", preprocessor)])

# ---------------- Apply on dataset ----------------
# pipeline.fit(datas)
processed = pipeline.fit_transform(datas)

# Convert back to DataFrame with column names
all_features = []
if len(numeric_features) > 0:
    all_features.extend(numeric_features)
if len(categorical_features) > 0:
    all_features.extend(categorical_features)

df_processed = pd.DataFrame(processed, columns=all_features)

print(df_processed.head())


In [None]:
d0= df_processed.to_csv('datas/BostonHousing_processed.csv', index=False)

In [None]:
#%% System Parameters
# 1. Mini batch size
mb_size = 128
# 2. Missing rate
p_miss = 0.2
# 3. Hint rate
p_hint = 0.9
# 4. Loss Hyperparameters
alpha = 10
# 5. Train Rate
train_rate = 0.8

#%% Data

# Data generation
datas_file = 'datas/BostonHousing_processed.csv'  
Data = pd.read_csv(datas_file, on_bad_lines='skip')
#Data = d.values #np.loadtxt(datas_file, delimiter=",")#, skiprows=1)

# Parameters
No = len(Data)
Dim = len(Data.iloc[0,:])

# Hidden state dimensions
H_Dim1 = Dim
H_Dim2 = Dim

# Normalization (0 to 1)
Min_Val = np.zeros(Dim)
Max_Val = np.zeros(Dim)

for i in range(Dim):
    Min_Val = np.min(Data.iloc[:,i])
    Data.iloc[:,i] = Data.iloc[:,i] - Min_Val
    Max_Val = np.max(Data.iloc[:,i])
    Data.iloc[:,i] = Data.iloc[:,i] / (Max_Val + 1e-6) 


#%% Missing introducing
p_miss_vec = p_miss * np.ones((Dim,1)) 
   
Missing = np.zeros((No,Dim))

for i in range(Dim):
    A = np.random.uniform(0., 1., size = [len(Data),])
    B = A > p_miss_vec[i]
    Missing[:,i] = 1.*B

    
#%% Train Test Division    
   
idx = np.random.permutation(No)

Train_No = int(No * train_rate)
Test_No = No - Train_No
    
# Train / Test Features
trainX = Data.iloc[idx[:Train_No],:]
testX = Data.iloc[idx[Train_No:],:]

# Train / Test Missing Indicators
trainM = Missing[idx[:Train_No],:]
testM = Missing[idx[Train_No:],:]

#%% Necessary Functions

# 1. Xavier Initialization Definition
# def xavier_init(size):
#     in_dim = size[0]
#     xavier_stddev = 1. / tf.sqrt(in_dim / 2.)
#     return tf.random_normal(shape = size, stddev = xavier_stddev)
def xavier_init(size):
    in_dim = size[0]
    xavier_stddev = 1. / np.sqrt(in_dim / 2.)
    return np.random.normal(size = size, scale = xavier_stddev)
    
# Hint Vector Generation
def sample_M(m, n, p):
    A = np.random.uniform(0., 1., size = [m, n])
    B = A > p
    C = 1.*B
    return C
   

### GAIN Architecture   
GAIN Consists of 3 Components
- Generator
- Discriminator
- Hint Mechanism

In [None]:
#%% 1. Discriminator
if use_gpu is True:
    D_W1 = torch.tensor(xavier_init([Dim*2, H_Dim1]),requires_grad=True, device="cuda")     # Data + Hint as inputs
    D_b1 = torch.tensor(np.zeros(shape = [H_Dim1]),requires_grad=True, device="cuda")

    D_W2 = torch.tensor(xavier_init([H_Dim1, H_Dim2]),requires_grad=True, device="cuda")
    D_b2 = torch.tensor(np.zeros(shape = [H_Dim2]),requires_grad=True, device="cuda")

    D_W3 = torch.tensor(xavier_init([H_Dim2, Dim]),requires_grad=True, device="cuda")
    D_b3 = torch.tensor(np.zeros(shape = [Dim]),requires_grad=True, device="cuda")       # Output is multi-variate
else:
    D_W1 = torch.tensor(xavier_init([Dim*2, H_Dim1]),requires_grad=True)     # Data + Hint as inputs
    D_b1 = torch.tensor(np.zeros(shape = [H_Dim1]),requires_grad=True)

    D_W2 = torch.tensor(xavier_init([H_Dim1, H_Dim2]),requires_grad=True)
    D_b2 = torch.tensor(np.zeros(shape = [H_Dim2]),requires_grad=True)

    D_W3 = torch.tensor(xavier_init([H_Dim2, Dim]),requires_grad=True)
    D_b3 = torch.tensor(np.zeros(shape = [Dim]),requires_grad=True)       # Output is multi-variate

theta_D = [D_W1, D_W2, D_W3, D_b1, D_b2, D_b3]

#%% 2. Generator
if use_gpu is True:
    G_W1 = torch.tensor(xavier_init([Dim*2, H_Dim1]),requires_grad=True, device="cuda")     # Data + Mask as inputs (Random Noises are in Missing Components)
    G_b1 = torch.tensor(np.zeros(shape = [H_Dim1]),requires_grad=True, device="cuda")

    G_W2 = torch.tensor(xavier_init([H_Dim1, H_Dim2]),requires_grad=True, device="cuda")
    G_b2 = torch.tensor(np.zeros(shape = [H_Dim2]),requires_grad=True, device="cuda")

    G_W3 = torch.tensor(xavier_init([H_Dim2, Dim]),requires_grad=True, device="cuda")
    G_b3 = torch.tensor(np.zeros(shape = [Dim]),requires_grad=True, device="cuda")
else:
    G_W1 = torch.tensor(xavier_init([Dim*2, H_Dim1]),requires_grad=True)     # Data + Mask as inputs (Random Noises are in Missing Components)
    G_b1 = torch.tensor(np.zeros(shape = [H_Dim1]),requires_grad=True)

    G_W2 = torch.tensor(xavier_init([H_Dim1, H_Dim2]),requires_grad=True)
    G_b2 = torch.tensor(np.zeros(shape = [H_Dim2]),requires_grad=True)

    G_W3 = torch.tensor(xavier_init([H_Dim2, Dim]),requires_grad=True)
    G_b3 = torch.tensor(np.zeros(shape = [Dim]),requires_grad=True)

theta_G = [G_W1, G_W2, G_W3, G_b1, G_b2, G_b3]

## GAIN Functions

In [None]:
#%% 1. Generator
def generator(new_x,m):
    inputs = torch.cat(dim = 1, tensors = [new_x,m])  # Mask + Data Concatenate
    G_h1 = F.relu(torch.matmul(inputs, G_W1) + G_b1)
    G_h2 = F.relu(torch.matmul(G_h1, G_W2) + G_b2)   
    G_prob = torch.sigmoid(torch.matmul(G_h2, G_W3) + G_b3) # [0,1] normalized Output
    
    return G_prob

#%% 2. Discriminator
def discriminator(new_x, h):
    inputs = torch.cat(dim = 1, tensors = [new_x,h])  # Hint + Data Concatenate
    D_h1 = F.relu(torch.matmul(inputs, D_W1) + D_b1)  
    D_h2 = F.relu(torch.matmul(D_h1, D_W2) + D_b2)
    D_logit = torch.matmul(D_h2, D_W3) + D_b3
    D_prob = torch.sigmoid(D_logit)  # [0,1] Probability Output
    
    return D_prob

#%% 3. Other functions
# Random sample generator for Z
def sample_Z(m, n):
    return np.random.uniform(0., 0.01, size = [m, n])        

# Mini-batch generation
def sample_idx(m, n):
    A = np.random.permutation(m)
    idx = A[:n]
    return idx

## GAIN Losses

In [None]:
def discriminator_loss(M, New_X, H):
    # Generator
    G_sample = generator(New_X,M)
    # Combine with original data
    Hat_New_X = New_X * M + G_sample * (1-M)

    # Discriminator
    D_prob = discriminator(Hat_New_X, H)

    #%% Loss
    D_loss = -torch.mean(M * torch.log(D_prob + 1e-8) + (1-M) * torch.log(1. - D_prob + 1e-8))
    return D_loss

def generator_loss(X, M, New_X, H):
    #%% Structure
    # Generator
    G_sample = generator(New_X,M)

    # Combine with original data
    Hat_New_X = New_X * M + G_sample * (1-M)

    # Discriminator
    D_prob = discriminator(Hat_New_X, H)

    #%% Loss
    G_loss1 = -torch.mean((1-M) * torch.log(D_prob + 1e-8))
    MSE_train_loss = torch.mean((M * New_X - M * G_sample)**2) / torch.mean(M)

    G_loss = G_loss1 + alpha * MSE_train_loss 

    #%% MSE Performance metric
    MSE_test_loss = torch.mean(((1-M) * X - (1-M)*G_sample)**2) / torch.mean(1-M)
    return G_loss, MSE_train_loss, MSE_test_loss
    
def test_loss(X, M, New_X):
    #%% Structure
    # Generator
    G_sample = generator(New_X,M)

    #%% MSE Performance metric
    MSE_test_loss = torch.mean(((1-M) * X - (1-M)*G_sample)**2) / torch.mean(1-M)
    return MSE_test_loss, G_sample

## Optimizers

In [None]:
optimizer_D = torch.optim.Adam(params=theta_D)
optimizer_G = torch.optim.Adam(params=theta_G)

## Training

In [None]:
#%% Start Iterations
for it in tqdm(range(5000)):    
    
    #%% Inputs
    mb_idx = sample_idx(Train_No, mb_size)
    X_mb = trainX.iloc[mb_idx,:]  
    
    Z_mb = sample_Z(mb_size, Dim) 
    M_mb = trainM[mb_idx,:]  
    H_mb1 = sample_M(mb_size, Dim, 1-p_hint)
    H_mb = M_mb * H_mb1
    
    New_X_mb = M_mb * X_mb + (1-M_mb) * Z_mb  # Missing Data Introduce
    X_mb = torch.tensor(X_mb.to_numpy(), device='cuda' if use_gpu else 'cpu')
    #M_mb = torch.tensor(M_mb.to_numpy(), device='cuda' if use_gpu else 'cpu')
    New_X_mb = torch.tensor(New_X_mb.to_numpy(), device='cuda' if use_gpu else 'cpu')

    
    if use_gpu is True:
        X_mb = torch.tensor(X_mb, device="cuda")
        M_mb = torch.tensor(M_mb, device="cuda")
        H_mb = torch.tensor(H_mb, device="cuda")
        New_X_mb = torch.tensor(New_X_mb, device="cuda")
    else:
        X_mb = torch.tensor(X_mb)
        M_mb = torch.tensor(M_mb)
        H_mb = torch.tensor(H_mb)
        New_X_mb = torch.tensor(New_X_mb)
    
    optimizer_D.zero_grad()
    D_loss_curr = discriminator_loss(M=M_mb, New_X=New_X_mb, H=H_mb)
    D_loss_curr.backward()
    optimizer_D.step()
    
    optimizer_G.zero_grad()
    G_loss_curr, MSE_train_loss_curr, MSE_test_loss_curr = generator_loss(X=X_mb, M=M_mb, New_X=New_X_mb, H=H_mb)
    G_loss_curr.backward()
    optimizer_G.step()    
        
    #%% Intermediate Losses
    if it % 100 == 0:
        print('Iter: {}'.format(it))
        print('Train_loss: {:.4}'.format(np.sqrt(MSE_train_loss_curr.item())))
        print('Test_loss: {:.4}'.format(np.sqrt(MSE_test_loss_curr.item())))
        print()

## Testing

In [None]:
Z_mb = sample_Z(Test_No, Dim) 
M_mb = testM
X_mb = testX
        
New_X_mb = M_mb * X_mb + (1-M_mb) * Z_mb  # Missing Data Introduce
X_mb = torch.tensor(X_mb.to_numpy(), device='cuda' if use_gpu else 'cpu')
New_X_mb = torch.tensor(New_X_mb.to_numpy(), device='cuda' if use_gpu else 'cpu')


if use_gpu is True:
    X_mb = torch.tensor(X_mb, device='cuda')
    M_mb = torch.tensor(M_mb, device='cuda')
    New_X_mb = torch.tensor(New_X_mb, device='cuda')
else:
    X_mb = torch.clone().detach()#.tensor(X_mb)
    M_mb = torch.tensor(M_mb)
    New_X_mb = torch.clone().detach()#.tensor(New_X_mb)
    
MSE_final, Sample = test_loss(X=X_mb, M=M_mb, New_X=New_X_mb)
        
print('Final Test RMSE: ' + str(np.sqrt(MSE_final.item())))

In [None]:
from sklearn.metrics import r2_score
import torch
import numpy as np

# Assuming X_mb, M_mb, and New_X_mb are torch tensors with true, mask, and imputed data

# Convert tensors to numpy arrays for sklearn compatibility, focusing only on missing positions
with torch.no_grad():
    imputed_values = New_X_mb[M_mb == 0].cpu().numpy()
    true_values = X_mb[M_mb == 0].cpu().numpy()

# Calculate R2 score
if len(true_values) > 1:  # Ensure more than 1 sample for R2
    r2 = r2_score(true_values, imputed_values)
else:
    r2 = float('nan')  # Not defined for fewer than 2 samples

# Existing RMSE and MAE calculations...

MSE_final, Sample = test_loss(X=X_mb, M=M_mb, New_X=New_X_mb)
rmse = np.sqrt(MSE_final.item())
mae = torch.mean(torch.abs(imputed_values - true_values)).item() 

print(f'Final Test RMSE: {rmse:.6f}')
print(f'Final Test MAE: {mae:.6f}')
print(f'Final Test R2 Score: {r2:.6f}')


In [None]:
# Assume the test_loss function returns MSE and Sample count (as in your context)

MSE_final, Sample = test_loss(X=X_mb, M=M_mb, New_X=New_X_mb)

# Calculate RMSE
rmse = np.sqrt(MSE_final.item())

# Calculate MAE manually over missing entries (mask == 0)
with torch.no_grad():
    # Imputed values at missing positions
    imputed_values = New_X_mb[M_mb == 0]
    # True values at missing positions
    true_values = X_mb[M_mb == 0]
    # Mean Absolute Error
    mae = torch.mean(torch.abs(imputed_values - true_values)).item()

# Calculate Accuracy:
# For continuous values, Accuracy is not a standard metric.
# However, if data is categorical or discretized, accuracy can be defined as:
# Accuracy = (Number of correct imputations) / (Total missing entries)
# Here is a simple example assuming categorical values.

with torch.no_grad():
    if X_mb.dtype == torch.float32:  # Continuous data likely
        accuracy = None  # Accuracy generally not defined
    else:
        # For categorical case: count exact matches
        correct = (imputed_values == true_values).sum().item()
        total_missing = imputed_values.numel()
        accuracy = correct / total_missing if total_missing > 0 else None

print(f'Final Test RMSE: {rmse:.6f}')
print(f'Final Test MSE: {MSE_final.item():.6f}')
print(f'Final Test MAE: {mae:.6f}')
if accuracy is not None:
    print(f'Final Test Accuracy: {accuracy:.6f}')
else:
    print("Accuracy metric not applicable for continuous data.")


In [None]:
imputed_data = M_mb * X_mb + (1-M_mb) * Sample
print("Imputed test data:")
# np.set_printoptions(formatter={'float': lambda x: "{0:0.8f}".format(x)})

if use_gpu is True:
    print(imputed_data.cpu().detach().numpy())
else:
    print(imputed_data.detach().numpy())

In [None]:
print("--- %s seconds ---" % (time.time() - start_time))