In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import numpy as np

# 1. Define the Vision Transformer (ViT) component
class ViTEncoder(nn.Module):
    def __init__(self, image_size=64, patch_size=16, hidden_size=256, num_heads=8):
        super().__init__()
        self.num_patches = (image_size // patch_size) ** 2
        self.patch_size = patch_size

        # Patch embedding
        self.patch_embed = nn.Conv2d(3, hidden_size, kernel_size=patch_size, stride=patch_size)

        # Positional embedding
        self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches, hidden_size))

        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size*4
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=6)

    def forward(self, x):
        # x: [batch_size, 3, image_size, image_size]
        x = self.patch_embed(x)  # [batch_size, hidden_size, num_patches_h, num_patches_w]
        x = x.flatten(2)  # [batch_size, hidden_size, num_patches]
        x = x.transpose(1, 2)  # [batch_size, num_patches, hidden_size]
        x = x + self.pos_embed
        x = self.transformer(x)
        return x

# 2. Define the Text Encoder using Transformer
class TextEncoder(nn.Module):
    def __init__(self, vocab_size=1000, hidden_size=256, num_heads=8):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size*4
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=4)

    def forward(self, x):
        # x: [batch_size, seq_length]
        x = self.embedding(x)
        x = self.transformer(x)
        return x.mean(dim=1)  # Global average pooling

# 3. Define the Decoder
class Decoder(nn.Module):
    def __init__(self, hidden_size=256):
        super().__init__()
        self.upsample = nn.Sequential(
            nn.ConvTranspose2d(hidden_size, 128, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, 2, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # x: [batch_size, num_patches, hidden_size]
        batch_size = x.size(0)
        x = x.transpose(1, 2)  # [batch_size, hidden_size, num_patches]
        x = x.view(batch_size, hidden_size, 4, 4)  # Reshape to 4x4 feature map
        x = self.upsample(x)  # Upsample to 64x64
        return x

# 4. Combine into Text2Image model
class Text2Image(nn.Module):
    def __init__(self):
        super().__init__()
        self.hidden_size = 256
        self.text_encoder = TextEncoder()
        self.vit_encoder = ViTEncoder()
        self.decoder = Decoder()

    def forward(self, text, target_img=None):
        # Encode text
        text_features = self.text_encoder(text)  # [batch_size, hidden_size]

        # If training, use target image
        if target_img is not None:
            img_features = self.vit_encoder(target_img)
            # Combine text and image features
            combined = img_features + text_features.unsqueeze(1)
            output = self.decoder(combined)
            return output

        # If inference, generate from text only
        batch_size = text_features.size(0)
        dummy_features = torch.zeros(batch_size, 16, self.hidden_size).to(text_features.device)
        combined = dummy_features + text_features.unsqueeze(1)
        return self.decoder(combined)

# 5. Create dummy dataset
class DummyTextImageDataset(Dataset):
    def __init__(self, num_samples=1000):
        self.num_samples = num_samples
        self.vocab_size = 1000
        self.seq_length = 10
        self.image_size = 64

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # Dummy text (random token IDs)
        text = torch.randint(0, self.vocab_size, (self.seq_length,))
        # Dummy image (random pixels)
        image = torch.rand(3, self.image_size, self.image_size)
        return text, image

# 6. Training and Testing
def train_and_test():
    # Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Model
    model = Text2Image().to(device)

    # Dataset and DataLoader
    dataset = DummyTextImageDataset()
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32)

    # Optimizer and loss
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Training
    num_epochs = 5
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for batch_idx, (text, image) in enumerate(train_loader):
            text, image = text.to(device), image.to(device)

            optimizer.zero_grad()
            output = model(text, image)
            loss = criterion(output, image)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_loader):.4f}")

    # Testing
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for text, image in test_loader:
            text, image = text.to(device), image.to(device)
            output = model(text, image)
            loss = criterion(output, image)
            test_loss += loss.item()

    print(f"Test Loss: {test_loss/len(test_loader):.4f}")

if __name__ == "__main__":
    train_and_test()



NameError: name 'hidden_size' is not defined

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import numpy as np

# 1. Define the Vision Transformer (ViT) component
class ViTEncoder(nn.Module):
    def __init__(self, image_size=64, patch_size=16, hidden_size=256, num_heads=8):
        super().__init__()
        self.num_patches = (image_size // patch_size) ** 2
        self.patch_size = patch_size

        # Patch embedding
        self.patch_embed = nn.Conv2d(3, hidden_size, kernel_size=patch_size, stride=patch_size)

        # Positional embedding
        self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches, hidden_size))

        # Transformer encoder with batch_first=True
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size*4,
            batch_first=True  # Added this to fix the warning
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=6)

    def forward(self, x):
        x = self.patch_embed(x)
        x = x.flatten(2)
        x = x.transpose(1, 2)
        x = x + self.pos_embed
        x = self.transformer(x)
        return x

# 2. Define the Text Encoder
class TextEncoder(nn.Module):
    def __init__(self, vocab_size=1000, hidden_size=256, num_heads=8):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size*4,
            batch_first=True  # Added this to fix the warning
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=4)

    def forward(self, x):
        x = self.embedding(x)
        x = self.transformer(x)
        return x.mean(dim=1)

# 3. Define the Decoder
class Decoder(nn.Module):
    def __init__(self, hidden_size=256):  # Added hidden_size as parameter
        super().__init__()
        self.hidden_size = hidden_size  # Store hidden_size
        self.upsample = nn.Sequential(
            nn.ConvTranspose2d(hidden_size, 128, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, 2, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        batch_size = x.size(0)
        x = x.transpose(1, 2)  # [batch_size, hidden_size, num_patches]
        x = x.view(batch_size, self.hidden_size, 4, 4)  # Use self.hidden_size instead of hidden_size
        x = self.upsample(x)
        return x

# 4. Combine into Text2Image model
class Text2Image(nn.Module):
    def __init__(self):
        super().__init__()
        self.hidden_size = 256
        self.text_encoder = TextEncoder()
        self.vit_encoder = ViTEncoder()
        self.decoder = Decoder(hidden_size=self.hidden_size)  # Pass hidden_size

    def forward(self, text, target_img=None):
        text_features = self.text_encoder(text)

        if target_img is not None:
            img_features = self.vit_encoder(target_img)
            combined = img_features + text_features.unsqueeze(1)
            output = self.decoder(combined)
            return output

        batch_size = text_features.size(0)
        dummy_features = torch.zeros(batch_size, 16, self.hidden_size).to(text_features.device)
        combined = dummy_features + text_features.unsqueeze(1)
        return self.decoder(combined)

# 5. Create dummy dataset
class DummyTextImageDataset(Dataset):
    def __init__(self, num_samples=1000):
        self.num_samples = num_samples
        self.vocab_size = 1000
        self.seq_length = 10
        self.image_size = 64

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        text = torch.randint(0, self.vocab_size, (self.seq_length,))
        image = torch.rand(3, self.image_size, self.image_size)
        return text, image

# 6. Training and Testing
def train_and_test():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = Text2Image().to(device)

    dataset = DummyTextImageDataset()
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32)

    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    num_epochs = 5
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for batch_idx, (text, image) in enumerate(train_loader):
            text, image = text.to(device), image.to(device)

            optimizer.zero_grad()
            output = model(text, image)
            loss = criterion(output, image)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_loader):.4f}")

    model.eval()
    test_loss = 0
    with torch.no_grad():
        for text, image in test_loader:
            text, image = text.to(device), image.to(device)
            output = model(text, image)
            loss = criterion(output, image)
            test_loss += loss.item()

    print(f"Test Loss: {test_loss/len(test_loader):.4f}")

if __name__ == "__main__":
    train_and_test()

Epoch 1/5, Train Loss: 0.0834
Epoch 2/5, Train Loss: 0.0833
Epoch 3/5, Train Loss: 0.0834
Epoch 4/5, Train Loss: 0.0833
Epoch 5/5, Train Loss: 0.0833
Test Loss: 0.0833


In [3]:
!unzip /content/image_dataset.zip -d /content/image_dataset

Archive:  /content/image_dataset.zip
 extracting: /content/image_dataset/CXR1849_IM-0550-1001.png  
 extracting: /content/image_dataset/CXR42_IM-2063-1001.png  
 extracting: /content/image_dataset/CXR1493_IM-0318-1001.png  
 extracting: /content/image_dataset/CXR2368_IM-0928-1001.png  
 extracting: /content/image_dataset/CXR279_IM-1224-1001-0001.png  
 extracting: /content/image_dataset/CXR3576_IM-1757-1001.png  
 extracting: /content/image_dataset/CXR1248_IM-0168-1001.png  
 extracting: /content/image_dataset/CXR2523_IM-1041-1001.png  
 extracting: /content/image_dataset/CXR1388_IM-0246-1001.png  
 extracting: /content/image_dataset/CXR841_IM-2365-1001.png  
 extracting: /content/image_dataset/CXR2827_IM-1246-1001.png  
 extracting: /content/image_dataset/CXR2546_IM-1055-1001.png  
 extracting: /content/image_dataset/CXR520_IM-2131-1001.png  
 extracting: /content/image_dataset/CXR2373_IM-0934-1001.png  
 extracting: /content/image_dataset/CXR3701_IM-1848-1001.png  
 extracting: /cont

In [4]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd

# Set CUDA_LAUNCH_BLOCKING for debugging
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Define transformations for image preprocessing
image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),  # Resize to match input size
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])  # Normalize to [0,1]
])

# Define custom dataset
class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        # Load and preprocess image
        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Convert text to fixed-length vector (Simple Embedding)
        text_vector = torch.zeros(256, dtype=torch.float32)
        for i, char in enumerate(text[:256]):
            text_vector[i] = ord(char) / 255.0  # Normalize characters

        return image, text_vector

# Transformer Encoder Block
class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super(TransformerEncoderBlock, self).__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

# Vision Transformer (ViT) Block
class VisionTransformer(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super(VisionTransformer, self).__init__()
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.projection = nn.Linear(patch_size * patch_size * 3, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])
        self.fc = nn.Linear(embed_dim, img_size * img_size * 3)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous().view(B, self.num_patches, -1)
        x = self.projection(x)

        cls_tokens = self.cls_token.repeat(B, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1) + self.pos_embedding.to(x.device)

        for layer in self.encoder_layers:
            x = layer(x)

        x = self.fc(x[:, 0]).view(B, 3, H, W)
        return torch.sigmoid(x)

# Full Text-to-Image Model
class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super(Text2ImageModel, self).__init__()
        self.text_encoder = TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
        self.image_generator = VisionTransformer(embed_dim, img_size)

    def forward(self, text):
        text_features = self.text_encoder(text.unsqueeze(1))
        generated_image = self.image_generator(text_features)
        return generated_image

# Training function
def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()  # Using L1Loss for stable training

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors in dataloader:
            images, text_vectors = images.to(device, dtype=torch.float32), text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            outputs = model(text_vectors)

            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataloader)}")

# Example execution
batch_size = 16
csv_path = "/content/image_labels_reports.csv"
extract_path = "/content/image_dataset"
dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
train_model(model, dataloader, epochs=10)

AssertionError: was expecting embedding dimension of 512, but got 256

In [5]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd

# Set CUDA_LAUNCH_BLOCKING for debugging
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

# Define transformations for image preprocessing
image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

# Define custom dataset
class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512  # Match model's embedding dimension

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        # Load and preprocess image
        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Convert text to fixed-length vector with correct embedding dimension
        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector

# Transformer Encoder Block
class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super(TransformerEncoderBlock, self).__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

# Vision Transformer (ViT) Block
class VisionTransformer(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super(VisionTransformer, self).__init__()
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.projection = nn.Linear(patch_size * patch_size * 3, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])
        self.fc = nn.Linear(embed_dim, img_size * img_size * 3)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous().view(B, self.num_patches, -1)
        x = self.projection(x)

        cls_tokens = self.cls_token.repeat(B, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1) + self.pos_embedding.to(x.device)

        for layer in self.encoder_layers:
            x = layer(x)

        x = self.fc(x[:, 0]).view(B, 3, H, W)
        return torch.sigmoid(x)

# Full Text-to-Image Model
class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super(Text2ImageModel, self).__init__()
        self.text_encoder = TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
        self.image_generator = VisionTransformer(embed_dim, img_size)

    def forward(self, text):
        # Ensure text has the right shape for the transformer [batch_size, seq_len, embed_dim]
        text_features = self.text_encoder(text.unsqueeze(1))
        generated_image = self.image_generator(text_features)
        return generated_image

# Training function
def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            outputs = model(text_vectors)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

# Example execution
if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    train_model(model, dataloader, epochs=10)

ValueError: not enough values to unpack (expected 4, got 3)

In [6]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class ImageGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size

        # Project text features to initial spatial representation
        self.initial_projection = nn.Linear(embed_dim, 256 * 4 * 4)  # Start with 4x4 feature map

        # Upsampling layers to generate final image
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Sigmoid()  # Output in range [0,1]
        )

    def forward(self, x):
        # x: [batch_size, seq_len, embed_dim]
        x = x[:, 0, :]  # Take first token [batch_size, embed_dim]
        x = self.initial_projection(x)  # [batch_size, 256*4*4]
        x = x.view(-1, 256, 4, 4)  # [batch_size, 256, 4, 4]
        x = self.decoder(x)  # [batch_size, 3, 64, 64]
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        # Text encoder
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        # Image generator
        self.image_generator = ImageGenerator(embed_dim, img_size)

    def forward(self, text):
        # text: [batch_size, embed_dim]
        x = text.unsqueeze(1)  # [batch_size, 1, embed_dim]
        for layer in self.text_encoder:
            x = layer(x)
        generated_image = self.image_generator(x)
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            outputs = model(text_vectors)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    train_model(model, dataloader, epochs=10)

Epoch 1/10, Loss: 0.1524
Epoch 2/10, Loss: 0.1387
Epoch 3/10, Loss: 0.1381
Epoch 4/10, Loss: 0.1383
Epoch 5/10, Loss: 0.1377
Epoch 6/10, Loss: 0.1378
Epoch 7/10, Loss: 0.1376
Epoch 8/10, Loss: 0.1378
Epoch 9/10, Loss: 0.1367
Epoch 10/10, Loss: 0.1372


In [7]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import torchvision.utils as vutils  # For saving images

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class ImageGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.initial_projection = nn.Linear(embed_dim, 256 * 4 * 4)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x[:, 0, :]
        x = self.initial_projection(x)
        x = x.view(-1, 256, 4, 4)
        x = self.decoder(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.image_generator = ImageGenerator(embed_dim, img_size)

    def forward(self, text):
        x = text.unsqueeze(1)
        for layer in self.text_encoder:
            x = layer(x)
        generated_image = self.image_generator(x)
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            outputs = model(text_vectors)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model  # Return trained model

def test_model(model, test_dataloader, save_dir="generated_images"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    criterion = nn.L1Loss()
    total_test_loss = 0

    # Create directory for saving images if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        for i, (real_images, text_vectors) in enumerate(test_dataloader):
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            # Generate images from text
            generated_images = model(text_vectors)

            # Calculate loss
            loss = criterion(generated_images, real_images)
            total_test_loss += loss.item()

            # Save first batch of generated images for inspection
            if i == 0:
                # Denormalize images for saving (assuming input was normalized to [0,1])
                generated_images_denorm = generated_images
                real_images_denorm = real_images

                # Save real vs generated comparison
                comparison = torch.cat([real_images_denorm[:8], generated_images_denorm[:8]])
                vutils.save_image(
                    comparison,
                    os.path.join(save_dir, 'real_vs_generated.png'),
                    nrow=8,
                    normalize=True
                )

                print(f"Saved sample images to {save_dir}/real_vs_generated.png")
                print("Top row: Real images, Bottom row: Generated images")

    avg_test_loss = total_test_loss / len(test_dataloader)
    print(f"Test Loss: {avg_test_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    # Full dataset
    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)

    # Split into train and test
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Initialize and train model
    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=10)

    # Test the model
    test_model(trained_model, test_dataloader)

Epoch 1/10, Train Loss: 0.1548
Epoch 2/10, Train Loss: 0.1388
Epoch 3/10, Train Loss: 0.1395
Epoch 4/10, Train Loss: 0.1393
Epoch 5/10, Train Loss: 0.1383
Epoch 6/10, Train Loss: 0.1376
Epoch 7/10, Train Loss: 0.1392
Epoch 8/10, Train Loss: 0.1382
Epoch 9/10, Train Loss: 0.1387
Epoch 10/10, Train Loss: 0.1379
Saved sample images to generated_images/real_vs_generated.png
Top row: Real images, Bottom row: Generated images
Test Loss: 0.1330


In [8]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import torchvision.utils as vutils

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class VisionTransformer(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super().__init__()
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.projection = nn.Linear(patch_size * patch_size * 3, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(embed_dim)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous().view(B, self.num_patches, -1)
        x = self.projection(x)

        cls_tokens = self.cls_token.repeat(B, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1) + self.pos_embedding.to(x.device)

        for layer in self.encoder_layers:
            x = layer(x)

        return self.norm(x[:, 0])  # Return CLS token embedding

class ImageGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.initial_projection = nn.Linear(embed_dim, 256 * 4 * 4)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x[:, 0, :]
        x = self.initial_projection(x)
        x = x.view(-1, 256, 4, 4)
        x = self.decoder(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.image_generator = ImageGenerator(embed_dim, img_size)
        self.vit_encoder = VisionTransformer(embed_dim, img_size)  # ViT for real images

    def forward(self, text, target_img=None):
        x = text.unsqueeze(1)
        for layer in self.text_encoder:
            x = layer(x)

        generated_image = self.image_generator(x)

        if target_img is not None:
            vit_features = self.vit_encoder(target_img)
            return generated_image, vit_features
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()  # For ViT features

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            generated_images, vit_features = model(text_vectors, images)

            # Combined loss: image reconstruction + feature similarity
            image_loss = image_criterion(generated_images, images)
            text_features = text_vectors  # Use raw text features for simplicity
            feature_loss = feature_criterion(vit_features, text_features)
            loss = image_loss + 0.1 * feature_loss  # Weight feature loss

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model

def test_model(model, test_dataloader, save_dir="generated_images"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()
    total_image_loss = 0
    total_feature_loss = 0

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        for i, (real_images, text_vectors) in enumerate(test_dataloader):
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            generated_images, vit_features = model(text_vectors, real_images)

            image_loss = image_criterion(generated_images, real_images)
            feature_loss = feature_criterion(vit_features, text_vectors)
            total_image_loss += image_loss.item()
            total_feature_loss += feature_loss.item()

            if i == 0:
                comparison = torch.cat([real_images[:8], generated_images[:8]])
                vutils.save_image(
                    comparison,
                    os.path.join(save_dir, 'real_vs_generated.png'),
                    nrow=8,
                    normalize=True
                )
                print(f"Saved sample images to {save_dir}/real_vs_generated.png")
                print("Top row: Real images, Bottom row: Generated images")

    avg_image_loss = total_image_loss / len(test_dataloader)
    avg_feature_loss = total_feature_loss / len(test_dataloader)
    print(f"Test Image Loss: {avg_image_loss:.4f}")
    print(f"Test Feature Loss: {avg_feature_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=10)
    test_model(trained_model, test_dataloader)

Epoch 1/10, Train Loss: 0.2345
Epoch 2/10, Train Loss: 0.2082
Epoch 3/10, Train Loss: 0.1979
Epoch 4/10, Train Loss: 0.1889
Epoch 5/10, Train Loss: 0.1808
Epoch 6/10, Train Loss: 0.1752
Epoch 7/10, Train Loss: 0.1673
Epoch 8/10, Train Loss: 0.1607
Epoch 9/10, Train Loss: 0.1546
Epoch 10/10, Train Loss: 0.1506
Saved sample images to generated_images/real_vs_generated.png
Top row: Real images, Bottom row: Generated images
Test Image Loss: 0.1415
Test Feature Loss: 0.1139


In [9]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import torchvision.utils as vutils
from torchvision.utils import make_grid

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector, text  # Return text string as well

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class VisionTransformer(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super().__init__()
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.projection = nn.Linear(patch_size * patch_size * 3, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(embed_dim)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous().view(B, self.num_patches, -1)
        x = self.projection(x)

        cls_tokens = self.cls_token.repeat(B, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1) + self.pos_embedding.to(x.device)

        for layer in self.encoder_layers:
            x = layer(x)

        return self.norm(x[:, 0])

class ImageGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.initial_projection = nn.Linear(embed_dim, 256 * 4 * 4)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x[:, 0, :]
        x = self.initial_projection(x)
        x = x.view(-1, 256, 4, 4)
        x = self.decoder(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.image_generator = ImageGenerator(embed_dim, img_size)
        self.vit_encoder = VisionTransformer(embed_dim, img_size)

    def forward(self, text, target_img=None):
        x = text.unsqueeze(1)
        for layer in self.text_encoder:
            x = layer(x)

        generated_image = self.image_generator(x)

        if target_img is not None:
            vit_features = self.vit_encoder(target_img)
            return generated_image, vit_features
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors, _ in dataloader:  # Ignore text string here
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            generated_images, vit_features = model(text_vectors, images)

            image_loss = image_criterion(generated_images, images)
            text_features = text_vectors
            feature_loss = feature_criterion(vit_features, text_features)
            loss = image_loss + 0.1 * feature_loss

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model

def test_model(model, test_dataloader, save_dir="generated_images", num_samples=8):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()
    total_image_loss = 0
    total_feature_loss = 0

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        # Collect samples from the first batch
        for i, (real_images, text_vectors, texts) in enumerate(test_dataloader):
            if i >= 1:  # Only process first batch for visualization
                break

            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            generated_images, vit_features = model(text_vectors, real_images)

            image_loss = image_criterion(generated_images, real_images)
            feature_loss = feature_criterion(vit_features, text_vectors)
            total_image_loss += image_loss.item()
            total_feature_loss += feature_loss.item()

            # Save images and text for the first batch (up to num_samples)
            num_to_save = min(num_samples, real_images.size(0))
            real_images_sample = real_images[:num_to_save]
            generated_images_sample = generated_images[:num_to_save]
            texts_sample = texts[:num_to_save]

            # Save comparison image
            comparison = torch.cat([real_images_sample, generated_images_sample])
            vutils.save_image(
                comparison,
                os.path.join(save_dir, 'real_vs_generated.png'),
                nrow=num_to_save,
                normalize=True,
                padding=10
            )
            print(f"Saved comparison image to {save_dir}/real_vs_generated.png")
            print("Top row: Real images, Bottom row: Generated images")

            # Save individual images with text descriptions
            for idx in range(num_to_save):
                # Save real image
                vutils.save_image(
                    real_images_sample[idx],
                    os.path.join(save_dir, f'real_image_{idx}.png'),
                    normalize=True
                )
                # Save generated image
                vutils.save_image(
                    generated_images_sample[idx],
                    os.path.join(save_dir, f'generated_image_{idx}.png'),
                    normalize=True
                )
                # Save text description
                with open(os.path.join(save_dir, f'description_{idx}.txt'), 'w') as f:
                    f.write(texts_sample[idx])

            print(f"Saved {num_to_save} individual real images, generated images, and text descriptions to {save_dir}")

        # Calculate average losses across entire test set
        for real_images, text_vectors, _ in test_dataloader:
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)
            generated_images, vit_features = model(text_vectors, real_images)
            total_image_loss += image_criterion(generated_images, real_images).item()
            total_feature_loss += feature_criterion(vit_features, text_vectors).item()

    avg_image_loss = total_image_loss / len(test_dataloader)
    avg_feature_loss = total_feature_loss / len(test_dataloader)
    print(f"Test Image Loss: {avg_image_loss:.4f}")
    print(f"Test Feature Loss: {avg_feature_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=10)
    test_model(trained_model, test_dataloader, num_samples=8)

Epoch 1/10, Train Loss: 0.2394
Epoch 2/10, Train Loss: 0.2098
Epoch 3/10, Train Loss: 0.1984
Epoch 4/10, Train Loss: 0.1882
Epoch 5/10, Train Loss: 0.1845
Epoch 6/10, Train Loss: 0.1743
Epoch 7/10, Train Loss: 0.1681
Epoch 8/10, Train Loss: 0.1613
Epoch 9/10, Train Loss: 0.1567
Epoch 10/10, Train Loss: 0.1514
Saved comparison image to generated_images/real_vs_generated.png
Top row: Real images, Bottom row: Generated images
Saved 8 individual real images, generated images, and text descriptions to generated_images
Test Image Loss: 0.1511
Test Feature Loss: 0.1207


In [10]:
!rm -rf /content/generated_images

In [12]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image, ImageDraw, ImageFont
import pandas as pd
import torchvision.utils as vutils
import numpy as np

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector, text

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class VisionTransformer(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super().__init__()
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.projection = nn.Linear(patch_size * patch_size * 3, embed_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, embed_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, embed_dim))

        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(embed_dim)

    def forward(self, x):
        B, C, H, W = x.shape
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous().view(B, self.num_patches, -1)
        x = self.projection(x)

        cls_tokens = self.cls_token.repeat(B, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1) + self.pos_embedding.to(x.device)

        for layer in self.encoder_layers:
            x = layer(x)

        return self.norm(x[:, 0])

class ImageGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.initial_projection = nn.Linear(embed_dim, 256 * 4 * 4)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x[:, 0, :]
        x = self.initial_projection(x)
        x = x.view(-1, 256, 4, 4)
        x = self.decoder(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.image_generator = ImageGenerator(embed_dim, img_size)
        self.vit_encoder = VisionTransformer(embed_dim, img_size)

    def forward(self, text, target_img=None):
        x = text.unsqueeze(1)
        for layer in self.text_encoder:
            x = layer(x)

        generated_image = self.image_generator(x)

        if target_img is not None:
            vit_features = self.vit_encoder(target_img)
            return generated_image, vit_features
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors, _ in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            generated_images, vit_features = model(text_vectors, images)

            image_loss = image_criterion(generated_images, images)
            text_features = text_vectors
            feature_loss = feature_criterion(vit_features, text_features)
            loss = image_loss + 0.1 * feature_loss

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model

def test_model(model, test_dataloader, save_dir="generated_images", num_samples=8):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    image_criterion = nn.L1Loss()
    feature_criterion = nn.MSELoss()
    total_image_loss = 0
    total_feature_loss = 0

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        for i, (real_images, text_vectors, texts) in enumerate(test_dataloader):
            if i >= 1:  # Process only first batch for visualization
                break

            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            generated_images, vit_features = model(text_vectors, real_images)

            image_loss = image_criterion(generated_images, real_images)
            feature_loss = feature_criterion(vit_features, text_vectors)
            total_image_loss += image_loss.item()
            total_feature_loss += feature_loss.item()

            # Convert images to PIL for combining with text
            num_to_save = min(num_samples, real_images.size(0))
            real_images_np = real_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)  # [B, H, W, C]
            generated_images_np = generated_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)
            texts_sample = texts[:num_to_save]

            # Create side-by-side visualization for each sample
            for idx in range(num_to_save):
                # Convert tensors to PIL images
                real_img = Image.fromarray((real_images_np[idx] * 255).astype(np.uint8))
                gen_img = Image.fromarray((generated_images_np[idx] * 255).astype(np.uint8))

                # Create text image
                text_img = Image.new('RGB', (200, 64), color=(255, 255, 255))  # White background
                draw = ImageDraw.Draw(text_img)
                try:
                    font = ImageFont.truetype("arial.ttf", 12)
                except:
                    font = ImageFont.load_default()  # Fallback if arial isn't available
                text = texts_sample[idx][:50] + "..." if len(texts_sample[idx]) > 50 else texts_sample[idx]  # Truncate long text
                draw.text((5, 5), text, font=font, fill=(0, 0, 0))  # Black text

                # Combine images horizontally: text | real | generated
                combined_width = text_img.width + real_img.width + gen_img.width
                combined_img = Image.new('RGB', (combined_width, 64))
                combined_img.paste(text_img, (0, 0))
                combined_img.paste(real_img, (text_img.width, 0))
                combined_img.paste(gen_img, (text_img.width + real_img.width, 0))

                # Save combined image
                combined_img.save(os.path.join(save_dir, f'sample_{idx}_text_real_gen.png'))

            print(f"Saved {num_to_save} samples (text | real | generated) to {save_dir}")

        # Calculate average losses across entire test set
        for real_images, text_vectors, _ in test_dataloader:
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)
            generated_images, vit_features = model(text_vectors, real_images)
            total_image_loss += image_criterion(generated_images, real_images).item()
            total_feature_loss += feature_criterion(vit_features, text_vectors).item()

    avg_image_loss = total_image_loss / len(test_dataloader)
    avg_feature_loss = total_feature_loss / len(test_dataloader)
    print(f"Test Image Loss: {avg_image_loss:.4f}")
    print(f"Test Feature Loss: {avg_feature_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=100)
    test_model(trained_model, test_dataloader, num_samples=8)

Epoch 1/100, Train Loss: 0.2366
Epoch 2/100, Train Loss: 0.2076
Epoch 3/100, Train Loss: 0.1983
Epoch 4/100, Train Loss: 0.1902
Epoch 5/100, Train Loss: 0.1816
Epoch 6/100, Train Loss: 0.1745
Epoch 7/100, Train Loss: 0.1701
Epoch 8/100, Train Loss: 0.1619
Epoch 9/100, Train Loss: 0.1562
Epoch 10/100, Train Loss: 0.1515
Epoch 11/100, Train Loss: 0.1473
Epoch 12/100, Train Loss: 0.1448
Epoch 13/100, Train Loss: 0.1436
Epoch 14/100, Train Loss: 0.1415
Epoch 15/100, Train Loss: 0.1419
Epoch 16/100, Train Loss: 0.1420
Epoch 17/100, Train Loss: 0.1406
Epoch 18/100, Train Loss: 0.1402
Epoch 19/100, Train Loss: 0.1410
Epoch 20/100, Train Loss: 0.1404
Epoch 21/100, Train Loss: 0.1399
Epoch 22/100, Train Loss: 0.1401
Epoch 23/100, Train Loss: 0.1412
Epoch 24/100, Train Loss: 0.1402
Epoch 25/100, Train Loss: 0.1397
Epoch 26/100, Train Loss: 0.1415


KeyboardInterrupt: 

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image, ImageDraw, ImageFont
import pandas as pd
import torchvision.utils as vutils
import numpy as np

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector, text

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class VisionTransformerGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.embed_dim = embed_dim

        # Input projection from text embedding to transformer dimension
        self.text_projection = nn.Linear(embed_dim, embed_dim)

        # Positional embedding for patches
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches, embed_dim))

        # Transformer layers
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])

        # Output projection to image patches
        self.patch_projection = nn.Linear(embed_dim, patch_size * patch_size * 3)

        # Final convolution to refine the output
        self.final_conv = nn.Sequential(
            nn.Conv2d(3, 3, kernel_size=3, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # x: [batch_size, seq_len, embed_dim], where seq_len=1 from text encoder
        B = x.size(0)
        x = self.text_projection(x[:, 0, :])  # [batch_size, embed_dim]

        # Expand text features to match number of patches
        x = x.unsqueeze(1).repeat(1, self.num_patches, 1)  # [batch_size, num_patches, embed_dim]
        x = x + self.pos_embedding.to(x.device)

        # Process through transformer layers
        for layer in self.encoder_layers:
            x = layer(x)

        # Project to patch-sized image fragments
        x = self.patch_projection(x)  # [batch_size, num_patches, patch_size*patch_size*3]

        # Reshape to image format
        x = x.view(B, self.num_patches, 3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 4, 1)  # [batch_size, 3, patch_size, patch_size, num_patches]

        # Reshape patches into full image
        grid_size = int(self.num_patches ** 0.5)  # Assuming square grid
        x = x.reshape(B, 3, self.patch_size, grid_size, self.patch_size, grid_size)
        x = x.permute(0, 1, 3, 5, 2, 4).contiguous()
        x = x.view(B, 3, self.img_size, self.img_size)

        # Final refinement
        x = self.final_conv(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.vit_generator = VisionTransformerGenerator(embed_dim, img_size)

    def forward(self, text):
        # Text encoding
        x = text.unsqueeze(1)  # [batch_size, 1, embed_dim]
        for layer in self.text_encoder:
            x = layer(x)

        # Generate image using ViT
        generated_image = self.vit_generator(x)
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors, _ in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            generated_images = model(text_vectors)
            loss = criterion(generated_images, images)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model

def test_model(model, test_dataloader, save_dir="generated_images", num_samples=8):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    criterion = nn.L1Loss()
    total_loss = 0

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        for i, (real_images, text_vectors, texts) in enumerate(test_dataloader):
            if i >= 1:  # Process only first batch for visualization
                break

            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            generated_images = model(text_vectors)

            loss = criterion(generated_images, real_images)
            total_loss += loss.item()

            # Convert images to PIL for combining with text
            num_to_save = min(num_samples, real_images.size(0))
            real_images_np = real_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)
            generated_images_np = generated_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)
            texts_sample = texts[:num_to_save]

            # Create side-by-side visualization for each sample
            for idx in range(num_to_save):
                real_img = Image.fromarray((real_images_np[idx] * 255).astype(np.uint8))
                gen_img = Image.fromarray((generated_images_np[idx] * 255).astype(np.uint8))

                text_img = Image.new('RGB', (200, 64), color=(255, 255, 255))
                draw = ImageDraw.Draw(text_img)
                try:
                    font = ImageFont.truetype("arial.ttf", 12)
                except:
                    font = ImageFont.load_default()
                text = texts_sample[idx][:50] + "..." if len(texts_sample[idx]) > 50 else texts_sample[idx]
                draw.text((5, 5), text, font=font, fill=(0, 0, 0))

                combined_width = text_img.width + real_img.width + gen_img.width
                combined_img = Image.new('RGB', (combined_width, 64))
                combined_img.paste(text_img, (0, 0))
                combined_img.paste(real_img, (text_img.width, 0))
                combined_img.paste(gen_img, (text_img.width + real_img.width, 0))

                combined_img.save(os.path.join(save_dir, f'sample_{idx}_text_real_gen.png'))

            print(f"Saved {num_to_save} samples (text | real | generated) to {save_dir}")

        # Calculate average loss across entire test set
        for real_images, text_vectors, _ in test_dataloader:
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)
            generated_images = model(text_vectors)
            total_loss += criterion(generated_images, real_images).item()

    avg_loss = total_loss / len(test_dataloader)
    print(f"Test Loss: {avg_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=10)
    test_model(trained_model, test_dataloader, num_samples=8)

### Worked

In [20]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image, ImageDraw, ImageFont
import pandas as pd
import torchvision.utils as vutils
import numpy as np

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

image_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class ChestXRayDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.embed_dim = 512

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx]['img'])
        text = self.data.iloc[idx]['text']

        image = Image.open(img_name).convert("RGB")
        if self.transform:
            image = self.transform(image)

        text_vector = torch.zeros(self.embed_dim, dtype=torch.float32)
        for i, char in enumerate(text[:self.embed_dim]):
            text_vector[i] = ord(char) / 255.0

        return image, text_vector, text

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)
        fc_out = self.fc(x)
        x = self.norm2(x + fc_out)
        return x

class VisionTransformerGenerator(nn.Module):
    def __init__(self, embed_dim=512, img_size=64, patch_size=8, num_heads=8, num_layers=6, hidden_dim=1024):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.embed_dim = embed_dim

        # Positional embedding for patches
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches, embed_dim))

        # Transformer layers
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim) for _ in range(num_layers)
        ])

        # Output projection to image patches
        self.patch_projection = nn.Linear(embed_dim, patch_size * patch_size * 3)

        # Final convolution to refine the output
        self.final_conv = nn.Sequential(
            nn.Conv2d(3, 3, kernel_size=3, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # x: [batch_size, seq_len, embed_dim] from text encoder
        B = x.size(0)
        x = x[:, 0, :]  # Take the first (and only) sequence element: [batch_size, embed_dim]

        # Expand text features to match number of patches
        x = x.unsqueeze(1).repeat(1, self.num_patches, 1)  # [batch_size, num_patches, embed_dim]
        x = x + self.pos_embedding.to(x.device)

        # Process through transformer layers
        for layer in self.encoder_layers:
            x = layer(x)

        # Project to patch-sized image fragments
        x = self.patch_projection(x)  # [batch_size, num_patches, patch_size*patch_size*3]

        # Reshape to image format
        x = x.view(B, self.num_patches, 3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 4, 1)  # [batch_size, 3, patch_size, patch_size, num_patches]

        # Reshape patches into full image
        grid_size = int(self.num_patches ** 0.5)
        x = x.reshape(B, 3, self.patch_size, grid_size, self.patch_size, grid_size)
        x = x.permute(0, 1, 3, 5, 2, 4).contiguous()
        x = x.view(B, 3, self.img_size, self.img_size)

        # Final refinement
        x = self.final_conv(x)
        return x

class Text2ImageModel(nn.Module):
    def __init__(self, embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=6, img_size=64):
        super().__init__()
        self.text_encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, hidden_dim)
            for _ in range(num_layers)
        ])
        self.vit_generator = VisionTransformerGenerator(embed_dim, img_size)

    def forward(self, text):
        # Text encoding
        x = text.unsqueeze(1)  # [batch_size, 1, embed_dim]
        for layer in self.text_encoder:
            x = layer(x)

        # Generate image using ViT
        generated_image = self.vit_generator(x)
        return generated_image

def train_model(model, dataloader, epochs=5, lr=0.0001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for images, text_vectors, _ in dataloader:
            images = images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            optimizer.zero_grad()
            generated_images = model(text_vectors)
            loss = criterion(generated_images, images)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

    return model

def test_model(model, test_dataloader, save_dir="generated_images", num_samples=8):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    criterion = nn.L1Loss()
    total_loss = 0

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    with torch.no_grad():
        for i, (real_images, text_vectors, texts) in enumerate(test_dataloader):
            if i >= 1:  # Process only first batch for visualization
                break

            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)

            generated_images = model(text_vectors)

            loss = criterion(generated_images, real_images)
            total_loss += loss.item()

            # Convert images to PIL for combining with text
            num_to_save = min(num_samples, real_images.size(0))
            real_images_np = real_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)
            generated_images_np = generated_images[:num_to_save].cpu().numpy().transpose(0, 2, 3, 1)
            texts_sample = texts[:num_to_save]

            # Create side-by-side visualization for each sample
            for idx in range(num_to_save):
                real_img = Image.fromarray((real_images_np[idx] * 255).astype(np.uint8))
                gen_img = Image.fromarray((generated_images_np[idx] * 255).astype(np.uint8))

                text_img = Image.new('RGB', (200, 64), color=(255, 255, 255))
                draw = ImageDraw.Draw(text_img)
                try:
                    font = ImageFont.truetype("arial.ttf", 12)
                except:
                    font = ImageFont.load_default()
                text = texts_sample[idx][:50] + "..." if len(texts_sample[idx]) > 50 else texts_sample[idx]
                draw.text((5, 5), text, font=font, fill=(0, 0, 0))

                combined_width = text_img.width + real_img.width + gen_img.width
                combined_img = Image.new('RGB', (combined_width, 64))
                combined_img.paste(text_img, (0, 0))
                combined_img.paste(real_img, (text_img.width, 0))
                combined_img.paste(gen_img, (text_img.width + real_img.width, 0))

                combined_img.save(os.path.join(save_dir, f'sample_{idx}_text_real_gen.png'))

            print(f"Saved {num_to_save} samples (text | real | generated) to {save_dir}")

        # Calculate average loss across entire test set
        for real_images, text_vectors, _ in test_dataloader:
            real_images = real_images.to(device, dtype=torch.float32)
            text_vectors = text_vectors.to(device, dtype=torch.float32)
            generated_images = model(text_vectors)
            total_loss += criterion(generated_images, real_images).item()

    avg_loss = total_loss / len(test_dataloader)
    print(f"Test Loss: {avg_loss:.4f}")

if __name__ == "__main__":
    batch_size = 16
    csv_path = "/content/image_labels_reports.csv"
    extract_path = "/content/image_dataset"

    full_dataset = ChestXRayDataset(csv_path, extract_path, transform=image_transforms)
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    model = Text2ImageModel(embed_dim=512, hidden_dim=1024, num_heads=8, num_layers=2, img_size=64)
    trained_model = train_model(model, train_dataloader, epochs=300)
    test_model(trained_model, test_dataloader, num_samples=2)

Epoch 1/300, Train Loss: 0.4654
Epoch 2/300, Train Loss: 0.3869
Epoch 3/300, Train Loss: 0.3544
Epoch 4/300, Train Loss: 0.3412
Epoch 5/300, Train Loss: 0.3368
Epoch 6/300, Train Loss: 0.3343
Epoch 7/300, Train Loss: 0.3341
Epoch 8/300, Train Loss: 0.3323
Epoch 9/300, Train Loss: 0.3308
Epoch 10/300, Train Loss: 0.3318
Epoch 11/300, Train Loss: 0.3310
Epoch 12/300, Train Loss: 0.3299
Epoch 13/300, Train Loss: 0.3290
Epoch 14/300, Train Loss: 0.3304
Epoch 15/300, Train Loss: 0.3289
Epoch 16/300, Train Loss: 0.3293
Epoch 17/300, Train Loss: 0.3292
Epoch 18/300, Train Loss: 0.3290
Epoch 19/300, Train Loss: 0.3288
Epoch 20/300, Train Loss: 0.3274
Epoch 21/300, Train Loss: 0.3272
Epoch 22/300, Train Loss: 0.3278
Epoch 23/300, Train Loss: 0.3269
Epoch 24/300, Train Loss: 0.3251
Epoch 25/300, Train Loss: 0.3252
Epoch 26/300, Train Loss: 0.3257
Epoch 27/300, Train Loss: 0.3250
Epoch 28/300, Train Loss: 0.3243
Epoch 29/300, Train Loss: 0.3250
Epoch 30/300, Train Loss: 0.3245
Epoch 31/300, Train

In [16]:
!rm -rf /content/generated_images