In [None]:
import os
import random
import shutil

def split_dataset(dataset_path, part1_path, part2_path):
    # Create directories if they don't exist
    os.makedirs(part1_path, exist_ok=True)
    os.makedirs(part2_path, exist_ok=True)

    # List all images in the dataset
    images = [f for f in os.listdir(dataset_path) if os.path.isfile(os.path.join(dataset_path, f))]
    random.shuffle(images)

    # Split the dataset into two equal parts
    split_index = len(images) // 2
    part1_images = images[:split_index]
    part2_images = images[split_index:]

    # Move images to their respective directories
    for img in part1_images:
        shutil.move(os.path.join(dataset_path, img), os.path.join(part1_path, img))
    for img in part2_images:
        shutil.move(os.path.join(dataset_path, img), os.path.join(part2_path, img))

    print(f"Dataset split into {part1_path} and {part2_path}")

# Example usage
split_dataset('Datasets\places2', 'Datasets\split1', 'Datasets/split2')

In [None]:
%pip install torch torchvision torchaudio

In [None]:
pip install matplotlib pandas numpy 

In [None]:
%pip install opencv-python

In [None]:
%pip show torch

In [None]:
%pip install tqdm

In [None]:
!python -m ipykernel install --user --name=your_env --display-name "Python (venv)"


In [None]:
import torch
print(torch.__version__)

In [None]:
import torch
import torchvision
import PIL
import numpy
import matplotlib.pyplot
print("All imports work!")

In [None]:
# To run on CUDA
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image

import os
from tqdm import tqdm
import random
from PIL import Image
import numpy as np

# Define Generator Network (simplified)
class Generator(nn.Module):
    def __init__(self, channels=3):
        super(Generator, self).__init__()
        
        # Initial layer
        self.initial = nn.Sequential(
            nn.Conv2d(channels + 1, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        )
        
        # Encoder
        self.down1 = self._down_block(64, 128)
        self.down2 = self._down_block(128, 256)
        self.down3 = self._down_block(256, 512)
        
        # Bottleneck
        self.bottleneck = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True)
        )
        
        # Decoder with skip connections
        self.up1 = self._up_block(512, 512)
        self.up2 = self._up_block(512 * 2, 256)
        self.up3 = self._up_block(256 * 2, 128)
        self.up4 = self._up_block(128 * 2, 64)
        
        # Final layer
        self.final = nn.Sequential(
            nn.ConvTranspose2d(64 * 2, channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )
        
    def _down_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2, inplace=True)
        )
    
    def _up_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x, mask):
        # Concatenate image with mask
        x = torch.cat([x, mask], dim=1)
        
        # Encoder
        d1 = self.initial(x)
        d2 = self.down1(d1)
        d3 = self.down2(d2)
        d4 = self.down3(d3)
        bottleneck = self.bottleneck(d4)
        
        # Decoder with skip connections
        u1 = self.up1(bottleneck)
        u2 = self.up2(torch.cat([u1, d4], dim=1))
        u3 = self.up3(torch.cat([u2, d3], dim=1))
        u4 = self.up4(torch.cat([u3, d2], dim=1))
        
        return self.final(torch.cat([u4, d1], dim=1))

# Simplified Discriminator
class Discriminator(nn.Module):
    def __init__(self, channels=3):
        super(Discriminator, self).__init__()
        
        # Simplified PatchGAN discriminator
        self.model = nn.Sequential(
            nn.Conv2d(channels + 1, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)
        )
        
    def forward(self, x, mask):
        x = torch.cat([x, mask], dim=1)
        return self.model(x)

# Simplified mask creation function
def create_mask(image_shape, min_percentage=0.10, max_percentage=0.30):
    height, width = image_shape[:2]
    total_pixels = height * width
    
    # Calculate masked area
    target_pixels = int(total_pixels * random.uniform(min_percentage, max_percentage))
    
    # Create rectangular mask
    mask = np.zeros((height, width), dtype=np.float32)
    
    # Random mask type
    mask_type = random.choice(['rectangle', 'multiple'])
    
    if mask_type == 'rectangle':
        # Single rectangular region
        max_side = int(np.sqrt(target_pixels))
        rect_width = random.randint(max_side // 2, max_side)
        rect_height = target_pixels // rect_width
        
        x = random.randint(0, width - rect_width)
        y = random.randint(0, height - rect_height)
        
        mask[y:y+rect_height, x:x+rect_width] = 1.0
    else:
        # Multiple masked regions
        num_regions = random.randint(2, 4)
        pixels_per_region = target_pixels // num_regions
        
        for _ in range(num_regions):
            region_size = int(np.sqrt(pixels_per_region))
            x = random.randint(0, width - region_size)
            y = random.randint(0, height - region_size)
            
            mask[y:y+region_size, x:x+region_size] = 1.0
            
    return mask

# Simplified Dataset (assumes pre-processed images)
class InpaintingDataset(Dataset):
    def __init__(self, image_dir, image_size=256, min_mask_percentage=0.10, max_mask_percentage=0.30, max_images=None):
        self.image_dir = image_dir
        self.image_size = image_size
        self.min_mask_percentage = min_mask_percentage
        self.max_mask_percentage = max_mask_percentage
        
        # Get image paths
        self.image_paths = []
        for root, _, files in os.walk(image_dir):
            for filename in files:
                if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
                    self.image_paths.append(os.path.join(root, filename))
        
        # Limit dataset size if specified
        if max_images:
            self.image_paths = self.image_paths[:min(len(self.image_paths), max_images)]
        
        # Basic normalization only
        self.transform = transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        # Load image (assuming already preprocessed)
        image = Image.open(self.image_paths[idx]).convert('RGB')
        image_tensor = self.transform(image)
        
        # Create mask
        mask = create_mask(
            (self.image_size, self.image_size),
            self.min_mask_percentage,
            self.max_mask_percentage
        )
        mask_tensor = torch.from_numpy(mask).unsqueeze(0).float()
        
        # Apply mask
        corrupted = image_tensor * (1 - mask_tensor)
        
        return {
            'original': image_tensor,
            'corrupted': corrupted,
            'mask': mask_tensor,
            'path': self.image_paths[idx]
        }

# Simplified training function
def train_gan(input_dir, output_dir, epochs=30, batch_size=8, lr=0.0002, 
              min_mask=0.10, max_mask=0.30, max_images=None):
    
    # Create output directories
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'models'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'samples'), exist_ok=True)
    
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Initialize models
    generator = Generator().to(device)
    discriminator = Discriminator().to(device)
    
    # Optimizers
    g_optimizer = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    d_optimizer = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
    
    # Loss functions
    adversarial_loss = nn.BCEWithLogitsLoss()
    l1_loss = nn.L1Loss()
    
    # Create dataset and dataloader
    dataset = InpaintingDataset(
        input_dir, 
        min_mask_percentage=min_mask,
        max_mask_percentage=max_mask,
        max_images=max_images
    )
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    
    print(f"Training with {len(dataset)} images")
    
    # Training loop
    for epoch in range(epochs):
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}")
        
        for batch_idx, batch in enumerate(progress_bar):
            # Get data
            real_images = batch['original'].to(device)
            corrupted_images = batch['corrupted'].to(device)
            masks = batch['mask'].to(device)
            
            # Labels for adversarial loss
            valid = torch.ones((real_images.size(0), 1, 16, 16), device=device)
            fake = torch.zeros((real_images.size(0), 1, 16, 16), device=device)
            
            # Train Generator
            g_optimizer.zero_grad()
            
            # Generate inpainted images
            gen_images = generator(corrupted_images, masks)
            combined = gen_images * masks + real_images * (1 - masks)
            
            # Discriminator evaluation
            pred_fake = discriminator(combined, masks)
            
            # Generator losses
            g_adv_loss = adversarial_loss(pred_fake, valid)
            g_l1_loss = l1_loss(combined, real_images)
            g_loss = g_adv_loss + 100 * g_l1_loss
            
            g_loss.backward()
            g_optimizer.step()
            
            # Train Discriminator (less frequently)
            if batch_idx % 3 == 0:
                d_optimizer.zero_grad()
                
                # Real loss
                pred_real = discriminator(real_images, masks)
                d_real_loss = adversarial_loss(pred_real, valid)
                
                # Fake loss
                pred_fake = discriminator(combined.detach(), masks)
                d_fake_loss = adversarial_loss(pred_fake, fake)
                
                d_loss = (d_real_loss + d_fake_loss) / 2
                d_loss.backward()
                d_optimizer.step()
            else:
                d_loss = torch.tensor(0.0)
            
            # Update progress bar
            progress_bar.set_postfix(
                g_loss=g_loss.item(), 
                d_loss=d_loss.item(), 
                l1_loss=g_l1_loss.item()
            )
        
        # Save model checkpoints and samples
        if (epoch + 1) % 5 == 0 or epoch == epochs - 1:
            # Save model
            torch.save({
                'generator': generator.state_dict(),
                'discriminator': discriminator.state_dict(),
                'epoch': epoch
            }, os.path.join(output_dir, 'models', f'inpainting_epoch_{epoch+1}.pth'))
            
            # Save sample images
            with torch.no_grad():
                sample_idx = random.randint(0, len(dataset) - 1)
                sample = dataset[sample_idx]
                
                real = sample['original'].unsqueeze(0).to(device)
                corrupted = sample['corrupted'].unsqueeze(0).to(device)
                mask = sample['mask'].unsqueeze(0).to(device)
                
                gen = generator(corrupted, mask)
                combined = gen * mask + real * (1 - mask)
                
                # Save samples
                samples = torch.cat([
                    corrupted,  # Masked image
                    combined,   # Inpainted result
                    real        # Original
                ], dim=0)
                samples = (samples * 0.5 + 0.5).clamp(0, 1)
                
                save_image(
                    samples,
                    os.path.join(output_dir, 'samples', f'epoch_{epoch+1}.png'),
                    nrow=3,
                    normalize=False
                )

# Simplified inference function
def inpaint_images(model_path, input_dir, output_dir, min_mask=0.10, max_mask=0.30, max_images=None):
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load model
    generator = Generator().to(device)
    checkpoint = torch.load(model_path, map_location=device)
    generator.load_state_dict(checkpoint['generator'])
    generator.eval()
    
    # Transform
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    # Get images
    image_files = []
    for root, _, files in os.walk(input_dir):
        for filename in files:
            if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
                image_files.append(os.path.join(root, filename))
    
    # Limit images
    if max_images:
        image_files = image_files[:min(len(image_files), max_images)]
    
    # Process images
    for image_path in tqdm(image_files, desc="Inpainting"):
        try:
            # Load image
            original = Image.open(image_path).convert('RGB')
            original_size = original.size
            
            # Transform
            img_tensor = transform(original).unsqueeze(0).to(device)
            
            # Create mask
            mask_np = create_mask((256, 256), min_mask, max_mask)
            mask_tensor = torch.from_numpy(mask_np).unsqueeze(0).unsqueeze(0).to(device).float()
            
            # Corrupt image
            corrupted = img_tensor * (1 - mask_tensor)
            
            # Generate inpainting
            with torch.no_grad():
                gen_img = generator(corrupted, mask_tensor)
                result = gen_img * mask_tensor + img_tensor * (1 - mask_tensor)
            
            # Convert to image
            result_img = (result.squeeze(0).cpu() * 0.5 + 0.5).clamp(0, 1)
            result_img = transforms.ToPILImage()(result_img)
            result_img = result_img.resize(original_size)
            
            # Save
            filename = os.path.basename(image_path)
            result_img.save(os.path.join(output_dir, f'inpainted_{filename}'))
            
        except Exception as e:
            print(f"Error processing {image_path}: {str(e)}")

# Main function
def main():
    # Train
    train_gan(
        input_dir='Datasets/split2',  # Change to your input directory
        output_dir='Models/gan_inpaint_model/models',
        epochs=30,
        batch_size=8,
        min_mask=0.10,
        max_mask=0.30,
        max_images=2500
    )
    
    # Inpaint
    inpaint_images(
        model_path='Models/gan_inpaint_model/models/inpainting_epoch_30.pth',
        input_dir='Datasets/split2',  # Change to your test directory
        output_dir='Datasets/gan_in_split2',
        min_mask=0.10,
        max_mask=0.30,
        max_images=2500
    )

if __name__ == '__main__':
    main()

In [None]:
import os
import random
import shutil

def label_and_shuffle(part1_dir, part2_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    labeled_images = []
    for img_name in os.listdir(part1_dir):
        labeled_images.append((os.path.join(part1_dir, img_name), 0))
    for img_name in os.listdir(part2_dir):
        labeled_images.append((os.path.join(part2_dir, img_name), 1))
    
    random.shuffle(labeled_images)
    
    for i, (img_path, label) in enumerate(labeled_images):
        img_name = f"{i}_{label}.png"
        shutil.copy(img_path, os.path.join(output_dir, img_name))
    
    print("Dataset labeled and shuffled")

# Example usage
label_and_shuffle('path/to/part1', 'path/to/part2_inpainted', 'path/to/shuffled_dataset')

In [None]:
import os
import random
import shutil

def split_train_test(shuffled_dataset_dir, train_dir, test_dir):
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    
    images = [f for f in os.listdir(shuffled_dataset_dir) if os.path.isfile(os.path.join(shuffled_dataset_dir, f))]
    random.shuffle(images)
    
    split_index = len(images) // 2
    train_images = images[:split_index]
    test_images = images[split_index:]
    
    for img in train_images:
        shutil.move(os.path.join(shuffled_dataset_dir, img), os.path.join(train_dir, img))
    for img in test_images:
        shutil.move(os.path.join(shuffled_dataset_dir, img), os.path.join(test_dir, img))
    
    print(f"Dataset split into training and testing sets")

# Example usage
split_train_test('path/to/shuffled_dataset', 'path/to/train_set', 'path/to/test_set')

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define the model
def create_model():
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 3)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(2, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Data generators for training and testing sets
train_datagen = ImageDataGenerator(rescale=0.5)
test_datagen = ImageDataGenerator(rescale=0.5)

train_generator = train_datagen.flow_from_directory(
    'path/to/train_set',
    target_size=(128, 128),
    batch_size=32,
    class_mode='binary'
)

test_generator = test_datagen.flow_from_directory(
    'path/to/test_set',
    target_size=(128, 128),
    batch_size=32,
    class_mode='binary'
)

# Create and train the model
model = create_model()
model.fit(train_generator, epochs=10, validation_data=test_generator)

# Save the model as an H5 file
model.save('path/to/save/model.h5')
print("Model saved as model.h5")