In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import time

# Clean Up Data

In [2]:
data = pd.read_csv("osteoporosis.csv").drop(columns=["Id", "Osteoporosis"])

data["Female"] = data["Gender"].apply(lambda x: 1 if x=="Female" else 0)
data = data.drop(columns=["Gender"])

data["Postmenopausal_hormonal_changes"] = data["Hormonal Changes"].apply(lambda x: 1 if x=="Postmenopausal" else 0)
data = data.drop(columns=["Hormonal Changes"])

data["Family_history_osteoporosis"] = data["Family History"].apply(lambda x: 1 if x=="Yes" else 0)
data = data.drop(columns=["Family History"])

data["Underweight"] = data["Body Weight"].apply(lambda x: 1 if x=="Underweight" else 0)
data = data.drop(columns=["Body Weight"])

data["Low_calcium_intake"] = data["Calcium Intake"].apply(lambda x: 1 if x=="Low" else 0)
data = data.drop(columns=["Calcium Intake"])

data["Insufficient_vitamin_d"] = data["Vitamin D Intake"].apply(lambda x: 1 if x=="Insufficient" else 0)
data = data.drop(columns=["Vitamin D Intake"])

data["Physically_active"] = data["Physical Activity"].apply(lambda x: 1 if x=="Active" else 0)
data = data.drop(columns=["Physical Activity"])

data["Smoke"] = data["Smoking"].apply(lambda x: 1 if x=="Yes" else 0)
data = data.drop(columns=["Smoking"])

data["Moderate_alcohol_consumption"] = data["Alcohol Consumption"].apply(lambda x: 1 if x=="Moderate" else 0)
data = data.drop(columns=["Alcohol Consumption"])

data["Corticosteroids"] = data["Medications"].apply(lambda x: 1 if x=="Corticosteroids" else 0)
data = data.drop(columns=["Medications"])

data["Prior_fractures"] = data["Prior Fractures"].apply(lambda x: 1 if x=="Yes" else 0)
data = data.drop(columns=["Prior Fractures"])

data["Asian"] = data["Race/Ethnicity"].apply(lambda x: 1 if x=="Asian" else 0)
data["Caucasian"] = data["Race/Ethnicity"].apply(lambda x: 1 if x=="Caucasian" else 0)
data["African_american"] = data["Race/Ethnicity"].apply(lambda x: 1 if x=="African American" else 0)
data = data.drop(columns=["Race/Ethnicity"])

data["Rheumatoid_arthritis"] = data["Medical Conditions"].apply(lambda x: 1 if x=="Rheumatoid Arthritis" else 0)
data["Hyperthyroidism"] = data["Medical Conditions"].apply(lambda x: 1 if x=="Hyperthyroidism" else 0)
data["Healthy"] = data["Medical Conditions"].apply(lambda x: 1 if x is np.nan else 0)
data = data.drop(columns=["Medical Conditions"])

# Targets
### Our target outputs for prediction
### Dataframe with features and output values for each target

In [3]:
TARGETS = ["RH_ARTH","HYPER","HEALTHY"]

TARGET_DATA = {"RH_ARTH": data.drop(columns=["Hyperthyroidism", "Healthy"]),
"HYPER": data.drop(columns=["Rheumatoid_arthritis", "Healthy"]),
"HEALTHY": data.drop(columns=["Hyperthyroidism", "Rheumatoid_arthritis"])}

PERSON_INPUT_FEATURES = 15

# Function to get data for a target in tensor format
# Function to train model A with a given target as its output

In [4]:
def get_tensors(out_target):
    # Organize train-test split for this target output
    X = TARGET_DATA[out_target].iloc[:,:-1].values
    y = TARGET_DATA[out_target].iloc[:,-1].values
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Standardize features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Convert numpy arrays to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)
    
    return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor

def standard_FF(out_target, model, thr=0.8, INFO=False, SAVE=False):

    X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor = get_tensors(out_target)

    loss_f = nn.BCELoss()  # Binary Cross-Entropy Loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

    # Train
    start = time.perf_counter()
    num_epochs = 1000
    for epoch in range(num_epochs):
        # Forward pass
        outputs = model(X_train_tensor)
        loss = loss_f(outputs, y_train_tensor)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    end = time.perf_counter()
    train_time = end - start
    
    if (INFO):

        # Evaluate the model
        with torch.no_grad():
            model.eval()
            predicted = model(X_test_tensor)

        pred = pd.DataFrame(predicted).applymap(lambda x: 1 if x > 0.8 else 0)
        target_y = pd.DataFrame(y_test_tensor.detach().numpy())
        acc = np.mean(pred == target_y)
        name = str(type(model)).split(".")[-1].split("\'")[0]

        print(f"Predicting {out_target} with {name}:")
        print(f"Training Time: {train_time}s")
        print(f"Test Accuracy: {acc}")
        print()
        
    if (SAVE):
        torch.save(model.state_dict(), "modelA_"+out_target)

# Model A
### With a breakpoint in the forward pass

In [5]:
# Architecture A
class netA(nn.Module):
    def __init__(self, input_size):
        super(netA, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64,32)
        self.fc3 = nn.Linear(32,1)
    
    def forward_half1(self, x):
        x = torch.sigmoid(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x
    
    def forward_half2(self, x):
        x = torch.sigmoid(self.fc3(x))
        return x
    
    def forward(self, x, path='all'):
        if path=='all':
            x = self.forward_half1(x)
            x = self.forward_half2(x)
        elif path=='half1':
            x = self.forward_half1(x)
        elif path=='half2':
            x = self.forward_half2(x)
        else:
            raise NotImplementedError
        return x

# Train model A for all 3 targets
### Also save the model that was trained on Rheumatoid Arthritis as the target

In [6]:
modelA = netA(input_size=PERSON_INPUT_FEATURES)

for t in TARGETS:
    standard_FF(t, modelA, INFO=True, SAVE=True)

Predicting RH_ARTH with netA:
Training Time: 2.5633572000078857s
Test Accuracy: 0.6454081632653061

Predicting HYPER with netA:
Training Time: 2.68255849997513s
Test Accuracy: 0.6964285714285714

Predicting HEALTHY with netA:
Training Time: 2.5926266000024043s
Test Accuracy: 0.6224489795918368



# Load a model trained on a given target
### As seen in the second cell below, loading in the model we saved is the same as the model that was trained above on Rheumatoid Arthritis

In [7]:
def load_model(out_target):

    m = netA(input_size=PERSON_INPUT_FEATURES)
    m.load_state_dict(torch.load("modelA_"+out_target))
    
    return m

In [8]:
test_target = "RH_ARTH"
test_loaded_model = load_model(test_target)

X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor = get_tensors(test_target)

with torch.no_grad():
    test_loaded_model.eval()
    predicted = test_loaded_model(X_test_tensor)
        
pred = pd.DataFrame(predicted).applymap(lambda x: 1 if x > 0.8 else 0)
target_y = pd.DataFrame(y_test_tensor.detach().numpy())
acc = np.mean(pred == target_y)
name = str(type(test_loaded_model)).split(".")[-1].split("\'")[0]

print(f"Predicting {test_target} with {name}:")
print(f"Test Accuracy: {acc}")
print()

Predicting RH_ARTH with netA:
Test Accuracy: 0.6454081632653061



## Now modify the training process with latent features as input

In [9]:
def latent_FF(out_target, model, latent_features, thr=0.8, INFO=False):

    # Organize train-test split for this target output
    X = latent_features
    y = TARGET_TENSORS[out_target]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    loss_f = nn.BCELoss()  # Binary Cross-Entropy Loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

    # Train
    start = time.perf_counter()
    num_epochs = 1000
    for epoch in range(num_epochs):
        # Forward pass
        outputs = model(X_train)
        loss = loss_f(outputs, y_train)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    end = time.perf_counter()
    train_time = end - start
    
    if (INFO):

        # Evaluate the model
        with torch.no_grad():
            model.eval()
            predicted = model(X_test)

        pred = pd.DataFrame(predicted).applymap(lambda x: 1 if x > 0.8 else 0)
        target_y = pd.DataFrame(y_test.detach().numpy())
        acc = np.mean(pred == target_y)
        name = str(type(model)).split(".")[-1].split("\'")[0]

        print(f"Predicting {out_target} with {name}:")
        print(f"Training Time: {train_time}s")
        print(f"Test Accuracy: {acc}")
        print()

#### Convert all input data into a tensor format

In [10]:
X = data.iloc[:,:-3].values

# Standardize features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Convert numpy arrays to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)

#### Convert output data to tensors

In [11]:
y = data.iloc[:,-3:].to_numpy()

TARGET_TENSORS = {
    "RH_ARTH": torch.tensor(y[:,0], dtype=torch.float32).view(-1, 1),
    "HYPER": torch.tensor(y[:,0], dtype=torch.float32).view(-1, 1),
    "HEALTHY": torch.tensor(y[:,0], dtype=torch.float32).view(-1, 1)
}

## Load a given target's trained model (Architecture A)
#### Create latent features out of the input data

In [12]:
def get_latent_feats(target):
    
    target_model = load_model(target)
    with torch.no_grad():
        target_model.eval()
        latent_features = target_model(X_tensor, path='half1')
    return latent_features   

In [13]:
# Architecture B
class netB(nn.Module):
    def __init__(self, input_size):
        super(netB, self).__init__()
        self.fc1 = nn.Linear(input_size, 1)
    
    def forward(self, x):
        x = torch.sigmoid(self.fc1(x))
        return x

## Train architecture B on the latent features

In [14]:
LATENT_FEATURES = 32

latent_features = get_latent_feats("RH_ARTH")

for t in TARGETS:
    modelB = netB(input_size=LATENT_FEATURES)
    latent_FF(t, modelB, latent_features, INFO=True)

Predicting RH_ARTH with netB:
Training Time: 0.7738179000443779s
Test Accuracy: 0.6326530612244898

Predicting HYPER with netB:
Training Time: 0.7857378000044264s
Test Accuracy: 0.6275510204081632

Predicting HEALTHY with netB:
Training Time: 0.7776324999867938s
Test Accuracy: 0.6301020408163265



In [15]:
LATENT_FEATURES = 32

latent_features = get_latent_feats("HYPER")

for t in TARGETS:
    modelB = netB(input_size=LATENT_FEATURES)
    latent_FF(t, modelB, latent_features, INFO=True)

Predicting RH_ARTH with netB:
Training Time: 0.7930209999904037s
Test Accuracy: 0.6428571428571429

Predicting HYPER with netB:
Training Time: 0.7866203999728896s
Test Accuracy: 0.6428571428571429

Predicting HEALTHY with netB:
Training Time: 0.7766406000009738s
Test Accuracy: 0.6428571428571429



In [16]:
LATENT_FEATURES = 32

latent_features = get_latent_feats("HEALTHY")

for t in TARGETS:
    modelB = netB(input_size=LATENT_FEATURES)
    latent_FF(t, modelB, latent_features, INFO=True)

Predicting RH_ARTH with netB:
Training Time: 0.8013962999684736s
Test Accuracy: 0.6428571428571429

Predicting HYPER with netB:
Training Time: 0.8160816000308841s
Test Accuracy: 0.6428571428571429

Predicting HEALTHY with netB:
Training Time: 0.801421600044705s
Test Accuracy: 0.6428571428571429

