In [1]:

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import seaborn as sns
import random
import joblib

ModuleNotFoundError: No module named 'torch'

In [None]:
# Set random seeds for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
df = pd.read_csv("human_vital_signs_dataset_2024.csv")
df.head()

In [None]:
df.info()

In [None]:
df.describe()

In [None]:
df.shape[0]


In [None]:
df.isna().sum()

In [None]:
df.columns

In [None]:
df['Risk Category'].value_counts().plot.pie()

In [None]:
df['Risk Category'].unique()

In [None]:
df['Risk Category'].value_counts()

In [None]:
df['Respiratory Rate'].isna()

In [None]:
df.head()

In [None]:
df = df.drop_duplicates()
# Drop 'Patient ID' and 'Timestamp' as they are not predictive features
df = df.drop(['Patient ID', 'Timestamp', 'Risk Category'], axis=1)
df.head(10)

In [None]:
# Verify existing derived features (no need to recreate if already correct)
df['Pulse Pressure Check'] = df['Systolic Blood Pressure'] - df['Diastolic Blood Pressure']
df['BMI Check'] = df['Weight (kg)'] / (df['Height (m)'] ** 2)
df['MAP Check'] = (df['Systolic Blood Pressure'] + 2 * df['Diastolic Blood Pressure']) / 3

# Compare with provided derived columns (optional, for validation)
print("Derived feature validation:")
print(df[['Derived_Pulse_Pressure', 'Pulse Pressure Check', 
         'Derived_BMI', 'BMI Check', 
         'Derived_MAP', 'MAP Check']].head())

# Drop check columns if they match the derived ones
df = df.drop(['Pulse Pressure Check', 'BMI Check', 'MAP Check'], axis=1)

In [None]:
df.head()

In [None]:
df = pd.get_dummies(df, columns=['Gender'], drop_first=True)
df['Gender_Male'] = df['Gender_Male'].astype(int)


In [None]:
from sklearn.model_selection import train_test_split

# Split features and target
X = df.drop('Respiratory Rate', axis=1).values  # Exclude Respiratory Rate
y = df['Respiratory Rate'].values  # Target

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)  # Reshape for regression
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

# Create datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# # Create DataLoaders
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
print("Training set shape:", X_train.shape)
print("Test set shape:", X_test.shape)

In [None]:
import torch
import torch.nn as nn

class TransformerEncoder(nn.Module):
    def __init__(self, dim, depth, heads, mlp_dim, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                nn.LayerNorm(dim),
                MultiHeadAttention(dim, heads, dropout),
                nn.LayerNorm(dim),
                FeedForward(dim, mlp_dim, dropout)
            ]))
    
    def forward(self, x):
        for ln1, attn, ln2, ff in self.layers:
            x = attn(ln1(x)) + x
            x = ff(ln2(x)) + x
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, dim, heads=8, dropout=0.1):
        super().__init__()
        self.heads = heads
        self.scale = (dim // heads) ** -0.5
        self.qkv = nn.Linear(dim, dim * 3)
        self.attn_drop = nn.Dropout(dropout)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(dropout)
    
    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.heads, C // self.heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    
    def forward(self, x):
        return self.net(x)

class TabularViTUnetRegression(nn.Module):
    def __init__(self, num_features, embed_dim=768, depth=12, heads=12, mlp_dim=3072, dropout=0.1):
        super().__init__()
        self.vit_embeddings = nn.Linear(1, embed_dim)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.zeros(1, num_features + 1, embed_dim))
        self.dropout = nn.Dropout(dropout)
        self.transformer = TransformerEncoder(embed_dim, depth, heads, mlp_dim, dropout)
        
        self.mlp = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128)
        )
        
        # Regression head: outputs a single value
        self.head = nn.Linear(embed_dim + 128, 1)
    
    def forward(self, x):
        B, F = x.shape
        x_vit = x.unsqueeze(-1)
        x_vit = self.vit_embeddings(x_vit)
        cls_tokens = self.cls_token.expand(B, -1, -1)
        x_vit = torch.cat((cls_tokens, x_vit), dim=1)
        x_vit = x_vit + self.pos_embedding
        x_vit = self.dropout(x_vit)
        x_vit = self.transformer(x_vit)
        vit_out = x_vit[:, 0]
        
        mlp_out = self.mlp(x)
        combined = torch.cat((vit_out, mlp_out), dim=1)
        output = self.head(combined)
        return output

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import random

# seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

# Then define DataLoaders
train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    worker_init_fn=seed_worker,
    generator=torch.Generator().manual_seed(seed)
)

test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    worker_init_fn=seed_worker,
    generator=torch.Generator().manual_seed(seed)
)

In [None]:
print(f"Number of batches in train_loader: {len(train_loader)}")

In [None]:
print(len(train_dataset))
print(len(test_dataset))

In [None]:
import torch.optim as optim

# Initialize model
num_features = X_train.shape[1]  # 13 features
model = TabularViTUnetRegression(num_features)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)


In [None]:
import torch.optim as optim


# Loss and optimizer
criterion = nn.MSELoss()  # For regression
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)  # (batch_size, 1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}")

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Evaluate on training set
model.eval()
train_loss = 0.0
train_preds = []
train_targets = []

with torch.no_grad():
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        train_loss += loss.item()
        train_preds.extend(outputs.cpu().numpy().flatten())
        train_targets.extend(targets.cpu().numpy().flatten())

# Compute training metrics
train_loss = train_loss / len(train_loader)
train_mse = mean_squared_error(train_targets, train_preds)
train_mae = mean_absolute_error(train_targets, train_preds)
train_r2 = r2_score(train_targets, train_preds)

# Evaluate on test set
test_loss = 0.0
test_preds = []
test_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_loss += loss.item()
        test_preds.extend(outputs.cpu().numpy().flatten())
        test_targets.extend(targets.cpu().numpy().flatten())

# Compute test metrics
test_loss = test_loss / len(test_loader)
test_mse = mean_squared_error(test_targets, test_preds)
test_mae = mean_absolute_error(test_targets, test_preds)
test_r2 = r2_score(test_targets, test_preds)

# Print metrics
print("Final Model Performance:")
print(f"Train MSE Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, Train R²: {train_r2:.4f}")
print(f"Test MSE Loss: {test_loss:.4f}, Test MAE: {test_mae:.4f}, Test R²: {test_r2:.4f}")

In [None]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.inspection import permutation_importance
from sklearn.linear_model import LinearRegression  # For permutation importance

# Ensure model is on the correct device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Evaluate on test set to collect predictions
model.eval()
test_preds = []
test_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        test_preds.extend(outputs.cpu().numpy().flatten())
        test_targets.extend(targets.cpu().numpy().flatten())

# Convert to numpy arrays
test_preds = np.array(test_preds)
test_targets = np.array(test_targets)

# Compute metrics
test_mse = mean_squared_error(test_targets, test_preds)
test_mae = mean_absolute_error(test_targets, test_preds)
test_r2 = r2_score(test_targets, test_preds)
print("Test Metrics:")
print(f"MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R²: {test_r2:.4f}")

# Assume df has original data (for feature plots)
# Columns for features
feature_columns = ['Heart Rate', 'Body Temperature', 'Oxygen Saturation', 
                   'Systolic Blood Pressure', 'Diastolic Blood Pressure', 'Age', 
                   'Weight (kg)', 'Height (m)', 'Derived_HRV', 'Derived_Pulse_Pressure', 
                   'Derived_BMI', 'Derived_MAP', 'Gender_Male']

# 1. Predicted vs. Actual Scatter Plot
plt.figure(figsize=(8, 6))
plt.scatter(test_targets, test_preds, alpha=0.5, color='blue')
plt.plot([test_targets.min(), test_targets.max()], [test_targets.min(), test_targets.max()], 'r--', lw=2)
plt.xlabel('Actual Respiratory Rate')
plt.ylabel('Predicted Respiratory Rate')
plt.title('Predicted vs. Actual Respiratory Rate')
plt.show()

# 2. Error Distribution Histogram
errors = test_preds - test_targets
plt.figure(figsize=(8, 6))
sns.histplot(errors, bins=30, kde=True, color='purple')
plt.xlabel('Prediction Error (Predicted - Actual)')
plt.ylabel('Frequency')
plt.title('Distribution of Prediction Errors')
plt.axvline(0, color='red', linestyle='--')
plt.show()

# 3. Feature vs. Respiratory Rate Scatter Plots (select key features)
key_features = ['Heart Rate', 'Age', 'Derived_BMI', 'Oxygen Saturation']
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for i, feature in enumerate(key_features):
    sns.scatterplot(data=df, x=feature, y='Respiratory Rate', alpha=0.5, ax=axes[i])
    axes[i].set_title(f'Respiratory Rate vs. {feature}')
plt.tight_layout()
plt.show()

# 4. Residual Plot
plt.figure(figsize=(8, 6))
plt.scatter(test_preds, errors, alpha=0.5, color='green')
plt.axhline(0, color='red', linestyle='--')
plt.xlabel('Predicted Respiratory Rate')
plt.ylabel('Residual (Predicted - Actual)')
plt.title('Residual Plot')
plt.show()

# 5. Feature Importance Plot
# Use a simple model (e.g., LinearRegression) for permutation importance
X_test_np = X_test  # Assuming X_test is available from train_test_split
lr = LinearRegression()
lr.fit(X_test_np, y_test)
perm_importance = permutation_importance(lr, X_test_np, y_test, n_repeats=10, random_state=42)
feature_importance = pd.DataFrame({
    'Feature': feature_columns,
    'Importance': perm_importance.importances_mean
}).sort_values('Importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feature_importance, color='teal')
plt.title('Feature Importance for Predicting Respiratory Rate')
plt.show()

In [None]:
plt.figure(figsize=(8, 6))
sns.boxplot(x='Gender_Male', y='Respiratory Rate', data=df)
plt.title('Respiratory Rate by Gender')
plt.show()

In [None]:
plt.figure(figsize=(10, 8))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm', fmt='.2f')
plt.title('Correlation Heatmap')
plt.show()