<a href="https://colab.research.google.com/github/douglasmasho/MedAlgo/blob/main/Survival.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
import os
import glob

# Mount Google Drive
drive.mount('/content/drive')

data_dir = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training'

survival_data = os.path.join(data_dir, "survival_data.csv")
hgg_images = os.path.join(data_dir, "HGG")
lgg_images = os.path.join(data_dir, "LGG")

print(hgg_images)

Mounted at /content/drive
/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/HGG


In [3]:
!pip install --upgrade scikit-learn


Collecting scikit-learn
  Downloading scikit_learn-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading scikit_learn-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.4/13.4 MB[0m [31m90.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 1.3.2
    Uninstalling scikit-learn-1.3.2:
      Successfully uninstalled scikit-learn-1.3.2
Successfully installed scikit-learn-1.5.1


In [6]:
import numpy as np
import tensorflow as tf
from tensorflow.data import Dataset
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from sklearn.model_selection import train_test_split

def create_2d_cnn_model(input_shape):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(64, activation='relu'),
        Dense(1)  # Change to Dense(num_classes, activation='softmax') for multi-class classification
    ])
    model.compile(optimizer=Adam(learning_rate=1e-4), loss='mse', metrics=['mae'])
    return model

def data_generator(X, ages, y, batch_size):
    def generator():
        for i in range(len(X)):
            yield (X[i], np.array([ages[i]])), y[i]

    dataset = Dataset.from_generator(generator,
                                     output_signature=(
                                         (tf.TensorSpec(shape=(64, 64, 1), dtype=tf.float32),
                                          tf.TensorSpec(shape=(1,), dtype=tf.float32)),
                                         tf.TensorSpec(shape=(), dtype=tf.float32)))
    dataset = dataset.shuffle(buffer_size=1000).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE).repeat()
    return dataset

def main():
    try:
        # Example data loading function (Replace with actual data loading)
        def get_data():
            # Dummy data for example purposes
            X = np.random.rand(32860, 64, 64)  # Replace with actual MRI data loading
            y = np.random.rand(32860)  # Replace with actual labels
            ages = np.random.rand(32860)  # Replace with actual age data
            return X, y, ages

        X, y, ages = get_data()

        if X is None:
            print("Exiting due to lack of valid data.")
        else:
            print(f"Loaded {len(X)} samples")

            # Resize X to add channel dimension
            X = np.expand_dims(X, axis=-1)  # Add channel dimension for grayscale images

            # Split the data
            X_train, X_test, y_train, y_test, ages_train, ages_test = train_test_split(X, y, ages, test_size=0.2, random_state=42)

            # Print dataset sizes
            print(f"Training set size: {len(X_train)}")
            print(f"Test set size: {len(X_test)}")

            # Create and compile the model
            model = create_2d_cnn_model((64, 64, 1))

            # Create data generators
            batch_size = 4
            train_gen = data_generator(X_train, ages_train, y_train, batch_size)
            val_gen = data_generator(X_test, ages_test, y_test, batch_size)

            # Check if generators are not None
            if train_gen is None or val_gen is None:
                raise ValueError("One or both data generators are not initialized properly.")

            # Train the model
            history = model.fit(
                train_gen,
                validation_data=val_gen,
                epochs=10,
                steps_per_epoch=len(X_train) // batch_size,
                validation_steps=len(X_test) // batch_size
            )

            # Evaluate the model
            test_loss, test_mae = model.evaluate(val_gen)
            print(f"Test MAE: {test_mae}")

    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()


NotADirectoryError: [Errno 20] Not a directory: '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/name_mapping.csv'

In [10]:
!pip install monai

Collecting monai
  Downloading monai-1.3.2-py3-none-any.whl.metadata (10 kB)
Downloading monai-1.3.2-py3-none-any.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: monai
Successfully installed monai-1.3.2


In [1]:
import os
import numpy as np
import nibabel as nib
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast

# Custom function to load .nii files and convert to NumPy arrays
def load_nifti(file_path):
    img = nib.load(file_path)
    return img.get_fdata()

# Paths to your data
data_dir_hgg = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/HGG'
data_dir_lgg = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/LGG'
survival_data_path = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/survival_data.csv'

# Load survival data
survival_data = pd.read_csv(survival_data_path)

# Extract BraTS19ID from survival data
ids = survival_data['BraTS19ID'].tolist()
labels = survival_data['Survival'].tolist()  # Assuming survival is the target label

file_paths = []
labels_list = []

# Function to process data
def process_data(data_dir, label):
    for subject_dir in os.listdir(data_dir):
        subject_path = os.path.join(data_dir, subject_dir)
        if os.path.isdir(subject_path):
            t1ce_file = None
            for file_name in os.listdir(subject_path):
                if file_name.endswith('t1ce.nii'):
                    t1ce_file = os.path.join(subject_path, file_name)
                    break
            if t1ce_file and subject_dir in ids:
                file_paths.append(t1ce_file)
                labels_list.append(label)

# Process HGG and LGG directories
process_data(data_dir_hgg, 1)  # Assuming 1 represents HGG
process_data(data_dir_lgg, 0)  # Assuming 0 represents LGG

# Check if data is loaded correctly
if len(file_paths) == 0 or len(labels_list) == 0:
    raise ValueError("No data found. Check if file paths and labels are correctly processed.")

# Split data
file_paths_train, file_paths_test, labels_train, labels_test = train_test_split(file_paths, labels_list, test_size=0.2, random_state=42)

# Custom dataset class
class NiftiDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        image = load_nifti(img_path)
        label = self.labels[idx]

        # Add channel dimension if missing
        if image.ndim == 3:
            image = np.expand_dims(image, axis=0)

        if self.transform:
            image = self.transform(image)

        return {'image': image, 'label': torch.tensor(label, dtype=torch.float32)}

# Define transforms
transform = transforms.Compose([
    transforms.Resize((64, 64, 64)),  # Resize to smaller size
    transforms.ToTensor()  # Convert to tensor
])

# Create datasets and dataloaders
train_ds = NiftiDataset(file_paths_train, labels_train, transform=transform)
train_loader = DataLoader(train_ds, batch_size=1, shuffle=True, num_workers=4, pin_memory=False)

val_ds = NiftiDataset(file_paths_test, labels_test, transform=transform)
val_loader = DataLoader(val_ds, batch_size=1, num_workers=4, pin_memory=False)

# Define a simple model (you can use a more complex model as needed)
class SimpleCNN(torch.nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = torch.nn.Conv3d(1, 32, kernel_size=3, padding=1)
        self.conv2 = torch.nn.Conv3d(32, 64, kernel_size=3, padding=1)
        self.fc1 = torch.nn.Linear(64 * 16 * 16 * 16, 128)  # Adjusted for smaller image size
        self.fc2 = torch.nn.Linear(128, 1)
        self.relu = torch.nn.ReLU()
        self.pool = torch.nn.MaxPool3d(kernel_size=2, stride=2)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 16 * 16 * 16)  # Adjusted for smaller image size
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize model, loss function, and optimizer
model = SimpleCNN().cuda()
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler()

# Training loop with mixed precision
for epoch in range(10):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        images, labels = batch["image"].cuda(), batch["label"].cuda()
        optimizer.zero_grad()

        with autocast():
            outputs = model(images)
            loss = criterion(outputs.squeeze(), labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            images, labels = batch["image"].cuda(), batch["label"].cuda()
            with autocast():
                outputs = model(images)
                loss = criterion(outputs.squeeze(), labels)
            val_loss += loss.item()

    print(f"Epoch {epoch+1}, Validation Loss: {val_loss / len(val_loader)}")

    # Clear memory
    del batch
    torch.cuda.empty_cache()


ValueError: If size is a sequence, it should have 1 or 2 values

In [None]:
# Load survival data
survival_data = pd.read_csv(survival_data_file)

# Display the column names
print(survival_data.columns)


Index(['BraTS19ID', 'Age', 'Survival', 'ResectionStatus'], dtype='object')


In [4]:
import os
import numpy as np
import nibabel as nib
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast
from sklearn.model_selection import train_test_split
from monai.transforms import Compose, LoadImage, ScaleIntensity, Resize, ToTensor

# Custom function to load .nii files and convert to NumPy arrays
def load_nifti(file_path):
    img = nib.load(file_path)
    return img.get_fdata()

# Paths to your data
data_dir_hgg = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/HGG'
data_dir_lgg = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/LGG'
survival_data_path = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training/survival_data.csv'

# Load survival data
survival_data = pd.read_csv(survival_data_path)

# Extract BraTS19ID from survival data
ids = survival_data['BraTS19ID'].tolist()
labels = survival_data['Survival'].tolist()  # Assuming survival is the target label

file_paths = []
labels_list = []

# Function to process data
def process_data(data_dir, label):
    for subject_dir in os.listdir(data_dir):
        subject_path = os.path.join(data_dir, subject_dir)
        if os.path.isdir(subject_path):
            t1ce_file = None
            for file_name in os.listdir(subject_path):
                if file_name.endswith('t1ce.nii'):
                    t1ce_file = os.path.join(subject_path, file_name)
                    break
            if t1ce_file and subject_dir in ids:
                file_paths.append(t1ce_file)
                labels_list.append(label)

# Process HGG and LGG directories
process_data(data_dir_hgg, 1)  # Assuming 1 represents HGG
process_data(data_dir_lgg, 0)  # Assuming 0 represents LGG

# Check if data is loaded correctly
if len(file_paths) == 0 or len(labels_list) == 0:
    raise ValueError("No data found. Check if file paths and labels are correctly processed.")

# Split data
file_paths_train, file_paths_test, labels_train, labels_test = train_test_split(file_paths, labels_list, test_size=0.2, random_state=42)

# Custom dataset class
class NiftiDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        image = load_nifti(img_path)
        label = self.labels[idx]

        # Add channel dimension if missing
        if image.ndim == 3:
            image = np.expand_dims(image, axis=0)

        if self.transform:
            image = self.transform(image)

        return {'image': image, 'label': torch.tensor(label, dtype=torch.float32).unsqueeze(0)}  # Ensure label is of shape [1]

# Define transforms
transform = Compose([
    Resize((64, 64, 64)),  # Resize images to a smaller size
    ScaleIntensity(),  # Scale image intensity
    ToTensor()  # Convert to tensor
])

# Create datasets and dataloaders
train_ds = NiftiDataset(file_paths_train, labels_train, transform=transform)
train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=2)  # Adjust num_workers as needed

val_ds = NiftiDataset(file_paths_test, labels_test, transform=transform)
val_loader = DataLoader(val_ds, batch_size=2, num_workers=2)  # Adjust num_workers as needed

# Define a simple model (you can use a more complex model as needed)
class SimpleCNN(torch.nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = torch.nn.Conv3d(1, 32, kernel_size=3, padding=1)
        self.conv2 = torch.nn.Conv3d(32, 64, kernel_size=3, padding=1)
        self.fc1 = torch.nn.Linear(64 * 16 * 16 * 16, 128)  # Adjusted for smaller image size
        self.fc2 = torch.nn.Linear(128, 1)  # Single output unit
        self.relu = torch.nn.ReLU()
        self.pool = torch.nn.MaxPool3d(kernel_size=2, stride=2)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 16 * 16 * 16)  # Adjusted for smaller image size
        x = self.relu(self.fc1(x))
        x = self.fc2(x)  # Output logits
        return x

# Initialize model, loss function, and optimizer
model = SimpleCNN().cuda()
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler()

# Training loop with mixed precision
for epoch in range(10):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        images, labels = batch["image"].cuda(), batch["label"].cuda()
        optimizer.zero_grad()

        with autocast():
            outputs = model(images)
            loss = criterion(outputs.squeeze(), labels.squeeze())  # Ensure both are squeezed to same shape

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            images, labels = batch["image"].cuda(), batch["label"].cuda()
            with autocast():
                outputs = model(images)
                loss = criterion(outputs.squeeze(), labels.squeeze())  # Ensure both are squeezed to same shape
            val_loss += loss.item()

    print(f"Epoch {epoch+1}, Validation Loss: {val_loss / len(val_loader)}")

    # Clear memory
    del batch
    torch.cuda.empty_cache()


  scaler = GradScaler()
  self.pid = os.fork()
  with autocast():
  self.pid = os.fork()


Epoch 1, Loss: 0.011005751862149336


  with autocast():


Epoch 1, Validation Loss: 1.6142662719029734e-11
Epoch 2, Loss: 1.7220227325542622e-11
Epoch 2, Validation Loss: 1.6132923147433927e-11
Epoch 3, Loss: 1.720666749722317e-11
Epoch 3, Validation Loss: 1.6132923147433927e-11
Epoch 4, Loss: 1.7207559254161606e-11
Epoch 4, Validation Loss: 1.6132923147433927e-11
Epoch 5, Loss: 1.7467367286261578e-11
Epoch 5, Validation Loss: 1.6132923147433927e-11
Epoch 6, Loss: 1.7241933911015e-11
Epoch 6, Validation Loss: 1.6132923147433927e-11
Epoch 7, Loss: 1.720868684566197e-11
Epoch 7, Validation Loss: 1.6132923147433927e-11
Epoch 8, Loss: 1.7206667627014682e-11
Epoch 8, Validation Loss: 1.6132923147433927e-11
Epoch 9, Loss: 1.7207177215848904e-11
Epoch 9, Validation Loss: 1.6132923147433927e-11
Epoch 10, Loss: 1.7207117237315595e-11
Epoch 10, Validation Loss: 1.6132923147433927e-11


In [None]:
import numpy as np
import nibabel as nib
import os
import pandas as pd
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Concatenate
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Simplified Data Generator
class MRIDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, image_dirs, survival_data, batch_size=16, image_shape=(128, 128, 1), shuffle=True):
        self.image_dirs = image_dirs
        self.survival_data = survival_data
        self.batch_size = batch_size
        self.image_shape = image_shape
        self.shuffle = shuffle
        self.indices = list(range(len(self.image_dirs)))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.image_dirs) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index*self.batch_size:(index+1)*self.batch_size]
        batch_dirs = [self.image_dirs[i] for i in batch_indices]
        batch_images = []
        batch_survival = []
        batch_age = []

        for img_dir in batch_dirs:
            img, subject_id = self.load_image_from_folder(img_dir)
            if img is not None:
                batch_images.append(img)
                survival, age = self.get_survival_data(subject_id)
                batch_survival.append(survival)
                batch_age.append(age)

        batch_images = np.array(batch_images, dtype=np.float32)
        batch_survival = np.array(batch_survival, dtype=np.float32)
        batch_age = np.array(batch_age, dtype=np.float32)

        # Ensure shapes are correct
        batch_images = np.reshape(batch_images, (batch_images.shape[0], *self.image_shape))
        batch_age = np.reshape(batch_age, (-1, 1))  # Ensure age is 2D

        return {'image_input': batch_images, 'age_input': batch_age}, batch_survival

    def load_image_from_folder(self, folder):
        for file_name in os.listdir(folder):
            if 't1ce.nii' in file_name:
                img_path = os.path.join(folder, file_name)
                img = nib.load(img_path).get_fdata()
                img = np.expand_dims(img, axis=-1)  # Add channel dimension
                img = img / np.max(img)  # Normalize
                img = np.resize(img, self.image_shape)  # Resize to consistent shape
                return img, os.path.basename(folder)
        return None, None

    def get_survival_data(self, subject_id):
        survival = self.survival_data[self.survival_data['BraTS19ID'] == subject_id]['Survival']
        age = self.survival_data[self.survival_data['BraTS19ID'] == subject_id]['Age']
        return survival.values[0] if not survival.empty else np.nan, age.values[0] if not age.empty else 0

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

# Preprocess survival data
def preprocess_survival(value):
    if 'ALIVE' in str(value):
        parts = str(value).split()
        if len(parts) > 1 and parts[1].isdigit():
            return float(parts[1])
        else:
            return 10000  # or another suitable high value
    elif pd.isna(value) or str(value).strip() == '':
        return np.nan  # or a placeholder like 0
    else:
        try:
            return float(value)
        except ValueError:
            return np.nan  # Handle any unexpected formats

# Load and preprocess survival data
data_dir = '/content/drive/MyDrive/BRATS/MICCAI_BraTS_2019_Data_Training'
survival_data_file = os.path.join(data_dir, 'survival_data.csv')
survival_data = pd.read_csv(survival_data_file)

# Preprocess survival data
survival_data['Survival'] = survival_data['Survival'].apply(preprocess_survival)
survival_data['Survival'].fillna(survival_data['Survival'].median(), inplace=True)
survival_data['Age'] = StandardScaler().fit_transform(survival_data[['Age']])

# Define directories
hgg_images_dir = os.path.join(data_dir, 'HGG')
lgg_images_dir = os.path.join(data_dir, 'LGG')

# Prepare data
hgg_dirs = [os.path.join(hgg_images_dir, d) for d in os.listdir(hgg_images_dir)]
lgg_dirs = [os.path.join(lgg_images_dir, d) for d in os.listdir(lgg_images_dir)]
all_dirs = hgg_dirs + lgg_dirs

# Split data
train_dirs, test_dirs = train_test_split(all_dirs, test_size=0.2, random_state=42)

# Create data generators
train_generator = MRIDataGenerator(train_dirs, survival_data, batch_size=16)
test_generator = MRIDataGenerator(test_dirs, survival_data, batch_size=16)

# Define a simpler model for debugging
image_input = Input(shape=(128, 128, 1), name='image_input')
age_input = Input(shape=(1,), name='age_input')

# Define CNN part for image processing
x = Conv2D(32, (3, 3), activation='relu')(image_input)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(128, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)

# Concatenate image features with age
x = Concatenate()([x, age_input])
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)
output = Dense(1)(x)  # Output layer for regression

# Define the model
model = Model(inputs=[image_input, age_input], outputs=output)
model.compile(optimizer=Adam(), loss='mean_squared_error', metrics=['mae'])

# Print model summary
model.summary()

# Train model
history = model.fit(
    train_generator,
    epochs=20,
    validation_data=test_generator
)

# Evaluate model
loss, mae = model.evaluate(test_generator)
print(f"Mean Absolute Error on test data: {mae}")

# Prediction function
def predict_survival(image, age):
    image = np.expand_dims(image, axis=0)
    age = np.expand_dims(age, axis=0)
    survival = model.predict({'image_input': image, 'age_input': age})
    return survival

# Example prediction
example_batch = next(iter(test_generator))
example_image = example_batch[0]['image_input'][0]
predicted_survival = predict_survival(example_image, np.array([30]))  # Assuming age 30
print(f"Predicted survival: {predicted_survival}")


Epoch 1/20


ValueError: Exception encountered when calling Functional.call().

[1mInvalid input shape for input Tensor("data_1:0", shape=(None, 1), dtype=float32). Expected shape (None, 128, 128, 1), but input has incompatible shape (None, 1)[0m

Arguments received by Functional.call():
  • inputs={'image_input': 'tf.Tensor(shape=(None, 128, 128, 1), dtype=float32)', 'age_input': 'tf.Tensor(shape=(None, 1), dtype=float32)'}
  • training=True
  • mask={'image_input': 'None', 'age_input': 'None'}

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

# Define a minimal model
def create_model():
    image_input = Input(shape=(128, 128, 1), name='image_input')
    age_input = Input(shape=(1,), name='age_input')

    x = Conv2D(32, (3, 3), activation='relu')(image_input)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)

    # Combine features
    combined = Concatenate()([x, age_input])
    x = Dense(64, activation='relu')(combined)
    x = Dropout(0.5)(x)
    output = Dense(1)(x)

    model = Model(inputs=[image_input, age_input], outputs=output)
    model.compile(optimizer=Adam(), loss='mean_squared_error', metrics=['mae'])

    return model

# Dummy data generator to tf.data.Dataset
def create_dataset(batch_size=16):
    num_samples = 100
    image_data = np.random.rand(num_samples, 128, 128, 1).astype(np.float32)
    age_data = np.random.rand(num_samples, 1).astype(np.float32)
    survival_data = np.random.rand(num_samples).astype(np.float32)

    dataset = tf.data.Dataset.from_tensor_slices(((image_data, age_data), survival_data))
    dataset = dataset.batch(batch_size)
    return dataset

# Create and compile model
model = create_model()
model.summary()

# Create dataset
train_dataset = create_dataset()

# Train the model
try:
    history = model.fit(
        train_dataset,
        epochs=2,
        steps_per_epoch=10
    )
except Exception as e:
    print("Error during training:", e)


Epoch 1/2
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 139ms/step - loss: 398.3152 - mae: 10.7971
Epoch 2/2
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 1.3174 - mae: 0.9568 


  self.gen.throw(typ, value, traceback)
