# Variational Autoencoder Imputation


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import re
from sklearn.cluster import KMeans
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.metrics import silhouette_score

### Data Cleaning

In [None]:
#remove irrelevant columns
data=pd.read_csv('/Users/cindywei/Ariadne Labs/ariadneML/Community Health Provider Survey_May 21, 2025_09.21 - Pre-Survey Results .csv')
data = data.drop(columns=[]) ## insert irrelevant (non-numeric, ordinal, or categorical) columns here

#create binary mask for missing values
mask = data.isnull().astype(int)

#fill missing values temporarily with zeros
data_filled = data.fillna(0)

### Encode Variables

In [None]:
#encode ordinal variables
#e.x. likert, frequency, effectiveness, etc.
ordinal_columns = [] #ordinal column named

ordinal_data=data_filled[[col for col in ordinal_columns]]
scales=[] #ordinal scales for every column, ex ["Strong Disagree", "Disagree", "Neutral", "Agree", "Strongly Agree"]
for col,scale in zip(ordinal_columns,scales):
    ordinal_data[col] = pd.Categorical(
        data_filled[col],
        categories=scale,
        ordered=True
    ).codes
ordinal_scaled=StandardScaler().fit_transform(ordinal_data)
mask_ordinal = pd.DataFrame(mask[ordinal_columns], columns=ordinal_columns)


In [None]:
#clean+scale numerical variables
numerical_columns = [] # insert numerical column names here
numerical_data=data_filled[numerical_columns]
def extract_first_integer(x):
    if pd.isnull(x):
        return None
    matches = re.findall(r'\d+', str(x))
    return int(matches[0]) if matches else None
for col in numerical_columns:
    numerical_data[col] = numerical_data[col].apply(extract_first_integer)
for col in numerical_data.columns:
    numerical_data[col] = numerical_data[col].fillna(numerical_data[col].mean()) #fill NaNs with mean
numerical_scaled = StandardScaler().fit_transform(numerical_data[numerical_columns])
numerical_mask_encoded = pd.DataFrame(mask[numerical_columns], columns=numerical_columns)
print(np.isnan(numerical_scaled).sum())

In [None]:
#encode nominal categorical variables
categorical_columns = [col for col in data_filled.columns if col not in ordinal_columns + numerical_columns]
categorical_encoded = pd.get_dummies(data_filled[categorical_columns], drop_first=False)
categorical_mask_encoded = pd.get_dummies(pd.DataFrame(mask[categorical_columns], columns=categorical_columns), drop_first=False)
categorical_mask_encoded = categorical_mask_encoded.reindex(columns=categorical_encoded.columns, fill_value=0)


### prepare final inputs

In [None]:
X = np.concatenate([
    ordinal_scaled,
    categorical_encoded.values,
    numerical_scaled
], axis=1)

print(ordinal_scaled.shape, categorical_encoded.shape, numerical_scaled.shape)

M = np.concatenate([
    mask_ordinal.values,
    categorical_mask_encoded.values,
    numerical_mask_encoded.values
], axis=1)

print(mask_ordinal.shape, categorical_mask_encoded.shape, numerical_mask_encoded.shape)


### define VAE

In [None]:
class VAE(nn.Module):
    def __init__(self, input_dim, latent_dim=10):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU()
        )
        self.z_mean = nn.Linear(64, latent_dim)
        self.z_logvar = nn.Linear(64, latent_dim)
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        h = self.encoder(x)
        mu, logvar = self.z_mean(h), self.z_logvar(h)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decoder(z)
        return x_recon, mu, logvar
    
    def loss_function(recon_x, x, mu, logvar, M_tensor):
        recon=((recon_x-x)**2)*M_tensor
        recon_loss = recon.sum() / M.sum()
        kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / x.size(0)
        return recon_loss + kl_loss

    X_tensor = torch.tensor(X, dtype=torch.float32)
    M_tensor = torch.tensor(M, dtype=torch.float32)


    vae=VAE(input_dim=X.shape[1])
    optimizer = torch.optim.Adam(vae.parameters(), lr=1e-3)
    for epoch in range(100):
        vae.train()
        optimizer.zero_grad()
        recon_batch, mu, logvar = vae(X_tensor)
        loss = loss_function(recon_batch, X_tensor, mu, logvar, M_tensor)
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}')
        

In [None]:
# Imputation
vae.eval()
with torch.no_grad():
    recon_batch, _, _ = vae(X_tensor)
    x_imputed = X_tensor * M_tensor + recon_batch * (1 - M_tensor)
    X_imputed = x_imputed.numpy()

#save imputed data
column_names = (
    list(ordinal_data.columns) +
    list(categorical_encoded.columns) +
    list(numerical_data.columns)
)

imputed_data = pd.DataFrame(X_imputed, columns=column_names)
imputed_data.to_csv('imputed_data.csv', index=False)
print("Imputation completed and saved to 'imputed_data.csv'.")