<a href="https://colab.research.google.com/github/KeneKing12/Kenechukwu/blob/main/CNN(U_NET).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import os
import zipfile
import urllib.request

# 1. Define URLs and Paths
dataset_url = "https://doi.org/10.5281/zenodo.5151941"
dataset_zip = "MERGED_DATA.zip"
data_dir = "plasticnet/data/train"



# New Section

In [6]:
os.makedirs(data_dir, exist_ok=True)

In [7]:
if not os.path.exists(dataset_zip):
    print("📥 Downloading dataset...")
    urllib.request.urlretrieve(dataset_url, dataset_zip)
else:
    print("✅ Dataset already downloaded.")

✅ Dataset already downloaded.


# Custom Dataset Loader with Patching

In [None]:
class MarineDebrisDataset(Dataset):
    def __init__(self, img_paths, patch_size=128, stride=128):
        self.data = []
        for img_path in img_paths:
            mask_path = img_path.replace('images', 'masks')

            with rasterio.open(img_path) as src:
                img = src.read().astype(np.float32) / 10000.0  # Normalize

            with rasterio.open(mask_path) as src:
                mask = src.read(1).astype(np.uint8)

            # Extract patches
            C, H, W = img.shape
            for i in range(0, H - patch_size + 1, stride):
                for j in range(0, W - patch_size + 1, stride):
                    img_patch = img[:3, i:i+patch_size, j:j+patch_size]  # RGB only
                    mask_patch = mask[i:i+patch_size, j:j+patch_size]
                    if mask_patch.sum() > 0:  # optional: filter empty
                        self.data.append((img_patch, (mask_patch > 0).astype(np.float32)))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img, mask = self.data[idx]
        return torch.tensor(img), torch.tensor(mask).unsqueeze(0)  # [1, H, W]


#3. U-Net Model (Simplified)

In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.ReLU()
        )

    def forward(self, x):
        return self.double_conv(x)

class UNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.down1 = DoubleConv(3, 64)
        self.pool1 = nn.MaxPool2d(2)
        self.down2 = DoubleConv(64, 128)
        self.pool2 = nn.MaxPool2d(2)

        self.middle = DoubleConv(128, 256)

        self.up1 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.conv1 = DoubleConv(256, 128)
        self.up2 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.conv2 = DoubleConv(128, 64)

        self.out = nn.Conv2d(64, 1, 1)

    def forward(self, x):
        x1 = self.down1(x)
        x2 = self.down2(self.pool1(x1))
        x3 = self.middle(self.pool2(x2))

        x = self.up1(x3)
        x = self.conv1(torch.cat([x, x2], dim=1))
        x = self.up2(x)
        x = self.conv2(torch.cat([x, x1], dim=1))

        return torch.sigmoid(self.out(x))


# Load Data & Split

In [None]:
all_images = glob.glob('plasticnet/data/train/images/*.tif')
dataset = MarineDebrisDataset(all_images)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=8)


#Training Setup

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


#Train Loop

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)

        preds = model(images)
        loss = criterion(preds, masks)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Validation loss
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device)
            preds = model(images)
            val_loss += criterion(preds, masks).item()

    print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")


#Visualize Prediction

In [None]:
def show_prediction():
    model.eval()
    image, mask = val_ds[0]
    with torch.no_grad():
        pred = model(image.unsqueeze(0).to(device)).squeeze().cpu().numpy()

    plt.figure(figsize=(12,4))
    plt.subplot(1,3,1); plt.imshow(np.moveaxis(image.numpy(), 0, -1)); plt.title("Image")
    plt.subplot(1,3,2); plt.imshow(mask.numpy()[0], cmap='gray'); plt.title("Ground Truth")
    plt.subplot(1,3,3); plt.imshow(pred > 0.5, cmap='gray'); plt.title("Prediction")
    plt.show()

show_prediction()
