In [None]:
!pip list

Package                       Version
----------------------------- --------------------
absl-py                       1.4.0
alabaster                     0.7.13
albumentations                1.2.1
altair                        4.2.2
anyio                         3.6.2
appdirs                       1.4.4
argon2-cffi                   21.3.0
argon2-cffi-bindings          21.2.0
array-record                  0.2.0
arviz                         0.15.1
astropy                       5.2.2
astunparse                    1.6.3
attrs                         23.1.0
audioread                     3.0.0
autograd                      1.5
Babel                         2.12.1
backcall                      0.2.0
beautifulsoup4                4.11.2
bleach                        6.0.0
blis                          0.7.9
blosc2                        2.0.0
bokeh                         2.4.3
branca                        0.6.0
build                         0.10.0
CacheControl                  0.12.11
cac

In [None]:
%%capture
!pip install torchgeo==0.4.0
!pip install pygments==2.6.1
!pip install pytorch-lightning==1.9.2
!pip install torch==1.13.1
!pip install torchvision==0.14.1
!pip install kornia

In [None]:
import torchgeo
from torchgeo.datasets import LEVIRCDPlus
from torchgeo.datasets.utils import unbind_samples
from torchgeo.trainers import SemanticSegmentationTask
from torchgeo.datamodules.utils import dataset_split

In [None]:
#import torchvision
from torchvision.transforms import Compose

In [None]:
!pip install kornia

In [None]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

import torch
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
#from collections import OrderedDict


import kornia.augmentation as K
import matplotlib.pyplot as plt
import numpy as np
import os


In [None]:
!pip install tensorboard
%load_ext tensorboard

In [None]:
import torch
num_gpus = torch.cuda.device_count()
print (num_gpus)

In [None]:
print(torch.cuda.is_available())
torch.cuda.empty_cache()

In [None]:
exp_name = "exp_1"
exp_dir = f"Checkpoint/{exp_name}"
os.makedirs(exp_dir, exist_ok=True)

In [None]:
batch_size = 8
lr = 0.01  #learning rate
gpu_id = 0
# device = torch.device(f"cuda:{gpu_id}")
num_workers = 10
patch_size = 256
val_split_pct = 0.2

train_dataset = LEVIRCDPlus(root="LEVIRCDPlus", split="train", download=True, checksum=True)
test_dataset = LEVIRCDPlus(root="LEVIRCDPlus", split="test", download=True, checksum=True)

print(f'train: {len(train_dataset)} images')
print(f'test: {len(test_dataset)} images')

In [None]:
train_path = '/content/LEVIRCDPlus/LEVIR-CD+/train'
test_path = '/content/LEVIRCDPlus/LEVIR-CD+/test'

In [None]:
from keras.preprocessing.image import ImageDataGenerator

image_generator = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    samplewise_center=True,
    samplewise_std_normalization=True
)


In [None]:
train_datagen = ImageDataGenerator(
        rescale=1 / 255.0,
        rotation_range=20,
        zoom_range=0.05,
        width_shift_range=0.05,
        height_shift_range=0.05,
        shear_range=0.05,
        horizontal_flip=True,
        fill_mode="nearest",
        validation_split=0.20)

test_datagen = ImageDataGenerator(rescale=1 / 255.0)

train_generator = train_datagen.flow_from_directory(
    directory=train_path,
    target_size=(180,180),
    color_mode="rgb",
    batch_size=batch_size,
    class_mode="categorical",
    subset='training',
    shuffle=True,
    seed=42
)
valid_generator = train_datagen.flow_from_directory(
    directory=train_path,
    target_size=(180,180),
    color_mode="rgb",
    batch_size=batch_size,
    class_mode="categorical",
    subset='validation',
    shuffle=True,
    seed=42
)
test_generator = test_datagen.flow_from_directory(
    directory=test_path,
    target_size=(180,180),
    color_mode="rgb",
    batch_size=1,
    class_mode=None,
    shuffle=False,
    seed=42
)

In [None]:
class CustomSemanticSegmentationTask(SemanticSegmentationTask):

    def plot(self, sample):

        image1 = sample["image"][:3]
        image2 = sample["image"][3:]
        mask = sample["mask"]
        prediction = sample["prediction"]


        fig, axs = plt.subplots(nrows=1, ncols=4, figsize=(4*5, 5))
        axs[0].imshow(image1.permute(1,2,0)) #(3, 256, 256) --> (256, 256, 3)
        axs[0].axis("off")
        axs[1].imshow(image2.permute(1,2,0))
        axs[1].axis("off")
        axs[2].imshow(mask)  #(1024, 1024)
        axs[2].axis("off")
        axs[3].imshow(prediction) #(1024, 1024)
        axs[3].axis("off")


        axs[0].set_title("image 1")
        axs[1].set_title("image 2")
        axs[2].set_title("mask")
        axs[3].set_title("prediction")

        plt.tight_layout()
        return fig

    def training_step(self, *args, **kwargs):

        batch = args[0]
        batch_idx = args[1]

        x = batch["image"]
        y = batch["mask"]

        y_hat = self.forward(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)

        self.log("train_loss", loss, on_step=True, on_epoch=False)
        self.train_metrics(y_hat_hard , y)

        if batch_idx < 5:
            batch["prediction"] = y_hat_hard

            for key in ["image", "mask", "prediction"]:
                batch[key] = batch[key].cpu()

            sample = unbind_samples(batch)[0]

            fig = self.plot(sample)

            summary_writer = self.logger.experiment
            summary_writer.add_figure(f"image/train/{batch_idx}", fig, global_step = self.global_step)

            plt.close()

        return loss

    def validation_step(self, *args, **kwargs):



        batch = args[0]
        batch_idx = args[1]

        x = batch["image"]
        y = batch["mask"]

        y_hat = self.forward(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)

        self.log("val_loss", loss, on_step=False, on_epoch=True)
        self.val_metrics(y_hat_hard, y)


        if batch_idx < 5:
            batch["prediction"] = y_hat_hard
            for key in ["image", "mask", "prediction"]:
                batch[key] = batch[key].cpu()

            sample = unbind_samples(batch)[0]
            fig = self.plot(sample)
            summary_writer = self.logger.experiment
            summary_writer.add_figure(f"image/val/{batch_idx}", fig, global_step = self.global_step)
            plt.close()

    def test_step(self, *args, **kwargs): #NEW from original

        batch = args[0]
        x = batch["image"]
        y = batch["mask"]
        y_hat = self(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)


        self.log("test_loss", loss, on_step=False, on_epoch=True)
        self.test_metrics(y_hat_hard, y)

In [None]:
import cv2
import torchvision
from PIL import Image
import torch.optim as optim
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

In [None]:
def create_mosaic(ref_image, target_image, mask):
    # Randomly crop patches from reference and target images
    ref_patch = ref_image[np.random.randint(0, ref_image.shape[0]-mask),
                          np.random.randint(0, ref_image.shape[1]-mask), :]
    target_patch = target_image[np.random.randint(0, target_image.shape[0]-mask),
                                np.random.randint(0, target_image.shape[1]-mask), :]

    # Create mosaic image by alternating patches
    mosaic = np.zeros((mask*2, mask*2, 3), dtype=np.uint8)
    mosaic[0:mask, 0:mask, :] = ref_patch
    mosaic[0:mask, mask:, :] = target_patch
    mosaic[mask:, 0:mask, :] = target_patch
    mosaic[mask:, mask:, :] = ref_patch

    return mosaic

In [None]:
class boundingbox:
        def __init__(self, dataset_dir, batch_size=4, num_epochs=10):
            self.dataset_dir = dataset_dir
            self.batch_size = batch_size
            self.num_epochs = num_epochs


            # Define the device to use for training and inference (CPU or GPU)
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

            # Create a dataset object using PyTorch's ImageFolder class
            self.train_dataset = torchvision.datasets.ImageFolder(os.path.join(self.dataset_dir, 'train'), transform=self.transform)
            self.test_dataset = torchvision.datasets.ImageFolder(os.path.join(self.dataset_dir, 'test'), transform=self.transform)

            # Create a data loader for the training and testing datasets
            self.train_loader = torch.utils.data.DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)
            self.test_loader = torch.utils.data.DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

            # Define the model to use for object detection (e.g., Faster R-CNN)
            self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
            num_classes = 4  # A, B, mask, background
            in_features = self.model.roi_heads.box_predictor.cls_score.in_features
            self.model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
            self.model.to(self.device)

            # Define the optimizer and loss function
            self.optimizer = optim.SGD(self.model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)
            self.lr_scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=3, gamma=0.1)
            self.criterion = nn.CrossEntropyLoss()

        def train(self):
            # Train the model
            for epoch in range(self.num_epochs):
                # Set the model to training mode
                self.model.train()

                # Loop over the training data loader
                for i, (images, targets) in enumerate(self.train_loader):
                    # Move the data to the device
                    images = list(image.to(self.device) for image in images)
                    targets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

                    # Zero out the gradients and compute the forward pass
                    self.optimizer.zero_grad()
                    loss_dict = self.model(images, targets)

                    # Compute the total loss
                    loss = sum(loss for loss in loss_dict.values())

                    # Backpropagate the gradients and update the weights
                    loss.backward()
                    self.optimizer.step()

                    # Print the loss every 10 batches
                    if i % 10 == 0:
                        print(f'Epoch {epoch}, Batch {i}: {loss.item()}')

                # Update the learning rate scheduler
                self.lr_scheduler.step()

                # Evaluate the model on the test set every epoch
                self.model.eval()
                test_loss = 0.0
                correct = 0
                total = 0

                with torch.no_grad():
                    for images, targets in self.test_loader:
                        # Move the data to the device
                        images = list(image.to(self.device) for image in images)
                    targets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

                    # Compute the forward pass
                    outputs = self.model(images)

                    # Compute the loss
                    loss_dict = outputs['losses']
                    loss = sum(loss for loss in loss_dict.values())

                    # Update the test loss and accuracy
                    test_loss += loss.item() * images[0].size(0)
                    _, predicted = torch.max(outputs['labels'], 1)
                    total += targets[0]['labels'].size(0)
                    correct += (predicted == targets[0]['labels']).sum().item()

            def predict(self, image_paths):
                # Set the model to evaluation mode
                self.model.eval()

                # Load the images
                images = [self.transform(Image.open(path)) for path in image_paths]
                images = list(image.to(self.device) for image in images)

                # Compute the forward pass
                with torch.no_grad():
                    outputs = self.model(images)

                # Convert the outputs to bounding boxes
                bboxes = []
                for output in outputs:
                    bbox = output['boxes'].cpu().numpy()[0]
                    bboxes.append(bbox)

                return bboxes

def bounding_box(img1, img2, mask):
    try:
        contours, hierarchy = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    except:
        _, contours, hierarchy = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    boxes = []
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        boxes.append((x, y, w, h))

    fig, ax = plt.subplots(1, 3, figsize=(10, 5))
    ax[0].imshow(img1)
    ax[0].set_title('Image 1')
    ax[0].axis('off')

    ax[1].imshow(img2)
    ax[1].set_title('Image 2')
    ax[1].axis('off')

    ax[2].imshow(mask)
    ax[2].set_title('Image 3')
    ax[2].axis('off')
    plt.show()

In [None]:
from kornia.augmentation import AugmentationSequential

In [None]:
#Dataset
class LEVIRCDPlusDataModule(pl.LightningDataModule):

    def __init__(
        self,
        batch_size=12,
        num_workers=0,
        val_split_pct=0.2,
        patch_size=(256,256),
        **kwargs,
    ):
        super().__init__()

        self.batch_size = batch_size
        self.num_workers = num_workers
        self.val_split_pct = val_split_pct
        self.patch_size = patch_size
        self.kwargs = kwargs

    def on_after_batch_transfer(self, batch, batch_idx):
        if (
            hasattr(self, "trainer")
            and self.trainer is not None
            and hasattr(self.trainer, "training")
            and self.trainer.training
        ):

            x = batch["image"]  #[12, 6, 1024, 1024]
            y = batch["mask"].float().unsqueeze(1)  #[12, 1024, 1024] --> [12, 1, 1024, 1024])

            train_augmentations = K.AugmentationSequential(
                K.RandomRotation(p=0.5, degrees=90),
                K.RandomHorizontalFlip(p=0.5),
                K.RandomVerticalFlip(p=0.5),
                K.RandomCrop(self.patch_size),
                K.RandomSharpness(p=0.5),
                data_keys=["input", "mask"],
            )

            x, y = train_augmentations(x, y)


            batch["image"] = x
            batch["mask"] = y.squeeze(1).long()

        return batch

    def preprocess(self, sample):

        sample["image"] = (sample["image"]/255.0).float() #[2, 3, 1024, 1024]
        sample["image"] = torch.flatten(sample["image"], 0, 1) #[6, 1024, 1024]
        sample["mask"] = sample["mask"].long() #[1024, 1024]

        return sample

    def setup(self, stage=None):

        train_transforms = Compose ([self.preprocess])
        test_transforms = Compose ([self.preprocess])

        train_dataset = LEVIRCDPlus(
            split="train", transforms=train_transforms, **self.kwargs
        )

        if self.val_split_pct > 0.0:

            self.train_dataset, self.val_dataset, _ = dataset_split(
                train_dataset, val_pct=self.val_split_pct, test_pct=0.0
            )
        else:
            self.train_dataset = train_dataset
            self.val_dataset = train_dataset

        self.test_dataset = LEVIRCDPlus(
            split="test", transforms=test_transforms, **self.kwargs
        )

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True,
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )

In [None]:
#import tensorflow as tf

import keras.backend as KB
from keras.optimizers import Adam
from keras.losses import binary_crossentropy
def dice_coef(y_true, y_pred, smooth=1):
    intersection = KB.sum(y_true * y_pred, axis=[1,2,3])
    union = KB.sum(y_true, axis=[1,2,3]) + KB.sum(y_pred, axis=[1,2,3])
    return KB.mean( (2. * intersection + smooth) / (union + smooth), axis=0)
def dice_p_bce(in_gt, in_pred):
    return 0.05*binary_crossentropy(in_gt, in_pred) - dice_coef(in_gt, in_pred)
def true_positive_rate(y_true, y_pred):
    return KB.sum(KB.flatten(y_true)*KB.flatten(KB.round(y_pred)))/KB.sum(y_true)

In [None]:
%%capture
datamodule = LEVIRCDPlusDataModule( root = "LEVIRCDPlus", batch_size = batch_size, num_workers = num_workers, val_split_pct = val_split_pct, patch_size = (patch_size, patch_size),)
task = CustomSemanticSegmentationTask(model="unet", backbone="resnet18", weights="imagenet", in_channels=6, num_classes=2, loss="ce", ignore_index=None, learning_rate=lr, learning_rate_schedule_patience=10)

In [None]:
checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",
    dirpath=exp_dir,
    save_top_k=1,
    save_last=True,
)

early_stopping_callback = EarlyStopping(
    monitor="val_loss",
    min_delta=0.00,
    patience=10,
)


tb_logger = TensorBoardLogger(
    save_dir="logs/",
    name=exp_name
)

In [None]:
img1 = cv2.imread('/content/LEVIRCDPlus/LEVIR-CD+/test/A/train_638.png')
img2 = cv2.imread('/content/LEVIRCDPlus/LEVIR-CD+/test/B/train_638.png')
mask = cv2.imread('/content/LEVIRCDPlus/LEVIR-CD+/test/label/train_638.png',0)
bounding_box(img1, img2, mask)

In [None]:
trainer = pl.Trainer(
    callbacks=[checkpoint_callback, early_stopping_callback],
    logger=[tb_logger],
    default_root_dir=exp_dir,
    min_epochs=1,
    max_epochs=50
)

In [None]:
!pip install basenet

In [None]:
!pip install pylocron

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Mish(nn.Module):
    def __init__(self):
        super(Mish, self).__init__()

    def forward(self, x):
        return x * torch.tanh(F.softplus(x))

ACTIVATIONS = {
    'mish': Mish(),
    'linear': nn.Identity()
}

class Conv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, activation='mish'):
        super(Conv, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size//2, bias=False),
            nn.BatchNorm2d(out_channels),
            ACTIVATIONS[activation]
        )

    def forward(self, x):
        return self.conv(x)

class CSPBlock(nn.Module):
    def __init__(self, in_channels, out_channels, hidden_channels=None, residual_activation='linear'):
        super(CSPBlock, self).__init__()

        if hidden_channels is None:
            hidden_channels = out_channels

        self.block = nn.Sequential(
            Conv(in_channels, hidden_channels, 1),
            Conv(hidden_channels, out_channels, 3)
        )

        self.activation = ACTIVATIONS[residual_activation]

    def forward(self, x):
        return self.activation(x+self.block(x))

class CSPFirstStage(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(CSPFirstStage, self).__init__()

        self.downsample_conv = Conv(in_channels, out_channels, 3, stride=2)

        self.split_conv0 = Conv(out_channels, out_channels, 1)
        self.split_conv1 = Conv(out_channels, out_channels, 1)

        self.blocks_conv = nn.Sequential(
            CSPBlock(out_channels, out_channels, in_channels),
            Conv(out_channels, out_channels, 1)
        )

        self.concat_conv = Conv(out_channels*2, out_channels, 1)

    def forward(self, x):
        x = self.downsample_conv(x)

        x0 = self.split_conv0(x)
        x1 = self.split_conv1(x)

        x1 = self.blocks_conv(x1)

        x = torch.cat([x0, x1], dim=1)
        x = self.concat_conv(x)

        return x

class CSPStage(nn.Module):
    def __init__(self, in_channels, out_channels, num_blocks):
        super(CSPStage, self).__init__()

        self.downsample_conv = Conv(in_channels, out_channels, 3, stride=2)

        self.split_conv0 = Conv(out_channels, out_channels//2, 1)
        self.split_conv1 = Conv(out_channels, out_channels//2, 1)

        self.blocks_conv = nn.Sequential(
            *[CSPBlock(out_channels//2, out_channels//2) for _ in range(num_blocks)],
            Conv(out_channels//2, out_channels//2, 1)
        )

        self.concat_conv = Conv(out_channels, out_channels, 1)

    def forward(self, x):
        x = self.downsample_conv(x)

        x0 = self.split_conv0(x)
        x1 = self.split_conv1(x)

        x1 = self.blocks_conv(x1)

        x = torch.cat([x0, x1], dim=1)
        x = self.concat_conv(x)

        return x

class CSPDarknet53(nn.Module):
    def __init__(self, stem_channels=32, feature_channels=[64, 128, 256, 512, 1024], num_features=1):
        super(CSPDarknet53, self).__init__()

        self.stem_conv = Conv(3, stem_channels, 3)

        self.stages = nn.ModuleList([
            CSPFirstStage(stem_channels, feature_channels[0]),
            CSPStage(feature_channels[0], feature_channels[1], 2),
            CSPStage(feature_channels[1], feature_channels[2], 8),
            CSPStage(feature_channels[2], feature_channels[3], 8),
            CSPStage(feature_channels[3], feature_channels[4], 4)
        ])

        self.feature_channels = feature_channels
        self.num_features = num_features

    def forward(self, x):
        x = self.stem_conv(x)

        features = []
        for stage in self.stages:
            x = stage(x)
            features.append(x)

        return features[-self.num_features:]

def _BuildCSPDarknet53(num_features=3):
    model = CSPDarknet53(num_features=num_features)

    return model, model.feature_channels[-num_features:]

if __name__ == '__main__':
    model = CSPDarknet53()
    x = torch.randn(1, 3, 224, 224)
    y = model(x)

In [None]:
import torchvision.models as resnet50

In [None]:
def outS(i):
    i = int(i)
    i = (i + 1) / 2
    i = int(np.ceil((i + 1) / 2.0))
    i = (i + 1) / 2
    return i


def conv3x3(in_planes, out_planes, stride=1):
    "3x3 convolution with padding"
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

In [None]:
import numpy as np
import torch
import math
import basenet
import torch.nn as nn
from torch.nn import Module, Sequential, Conv2d, ReLU,AdaptiveMaxPool2d, AdaptiveAvgPool2d, \
    NLLLoss, BCELoss, CrossEntropyLoss, AvgPool2d, MaxPool2d, Parameter, Linear, Sigmoid, Softmax, Dropout, Embedding
from torch.nn import functional as F
from torch.autograd import Variable

torch_ver = torch.__version__[:3]


class PCAM_Module(Module):
    """ Position attention module"""
    def __init__(self, in_dim):
        super(PCAM_Module, self).__init__()
        self.chanel_in = in_dim

        self.query_conv = Conv2d(in_channels=in_dim, out_channels=in_dim//8, kernel_size=1)
        self.key_conv = Conv2d(in_channels=in_dim, out_channels=in_dim//8, kernel_size=1)
        self.value_conv = Conv2d(in_channels=in_dim, out_channels=in_dim, kernel_size=1)
        self.gamma = Parameter(torch.zeros(1))

        self.softmax = Softmax(dim=-1)

    def forward(self, x):
        """
            inputs :
                x : input feature maps( B X C X H X W)
            returns :
                out : attention value + input feature
                attention: B X (HxW) X (HxW)
        """
        create_mosaic(img1, img2, mask)
        m_batchsize, C, height, width = x.size()
        proj_query = self.query_conv(x).view(m_batchsize, -1, width*height).permute(0, 2, 1)
        proj_key = self.key_conv(x).view(m_batchsize, -1, width*height)
        energy = torch.bmm(proj_query, proj_key)
        attention = self.softmax(energy)
        proj_value = self.value_conv(x).view(m_batchsize, -1, width*height)

        out = torch.bmm(proj_value, attention.permute(0, 2, 1))
        out = out.view(m_batchsize, C, height, width)

        out = self.gamma*out + x
        return out


class CCAM_Module(Module):
    """ Channel attention module"""
    def __init__(self, in_dim):
        super(CCAM_Module, self).__init__()
        self.chanel_in = in_dim


        self.gamma = Parameter(torch.zeros(1))
        self.softmax  = Softmax(dim=-1)

class ChangeDetectionModel(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(ChangeDetectionModel, self).__init__()

        # Load the CSPDarknet-53 backbone
        self.backbone = CSPDarknet53([1, 2, 8, 8, 4], num_classes=in_channels)

        # Change detection heads
        self.detection_head_s = CCAM_Module(in_channels, num_classes)
        self.detection_head_m = CCAM_Module(in_channels, num_classes)
        self.detection_head_l = CCAM_Module(in_channels, num_classes)

    def forward(self, x):
        # Get features from the shared backbone model
        features = self.backbone(x)

        # Get features from the PAFPN model
        features_s, features_m, features_l = resnet50(features)

        # Get change detection predictions from each detection head
        change_detection_s = self.detection_head_s(features_s)
        change_detection_m = self.detection_head_m(features_m)
        change_detection_l = self.detection_head_l(features_l)

        # Upsample change detection predictions to the input size
        change_detection_s = nn.functional.interpolate(change_detection_s, size=x.size()[2:], mode='bilinear', align_corners=True)
        change_detection_m = nn.functional.interpolate(change_detection_m, size=x.size()[2:], mode='bilinear', align_corners=True)
        change_detection_l = nn.functional.interpolate(change_detection_l, size=x.size()[2:], mode='bilinear', align_corners=True)

        # Combine change detection predictions from all heads
        change_detection = change_detection_s + change_detection_m + change_detection_l

        return change_detection

class DCA_det(nn.Module):
    def __init__(self, in_channels, out_channels, norm_layer):
        super(DCA_det, self).__init__()
        inter_channels = in_channels // 4
        self.conv5a = nn.Sequential(nn.Conv2d(in_channels, inter_channels, 3, padding=1, bias=False),
                                    norm_layer(inter_channels),
                                    nn.ReLU())

        self.conv5c = nn.Sequential(nn.Conv2d(in_channels, inter_channels, 3, padding=1, bias=False),
                                    norm_layer(inter_channels),
                                    nn.ReLU())

        self.sa = PCAM_Module(inter_channels)
        self.sc = CCAM_Module(inter_channels)
        self.conv51 = nn.Sequential(nn.Conv2d(inter_channels, inter_channels, 3, padding=1, bias=False),
                                    norm_layer(inter_channels),
                                    nn.ReLU())
        self.conv52 = nn.Sequential(nn.Conv2d(inter_channels, inter_channels, 3, padding=1, bias=False),
                                    norm_layer(inter_channels),
                                    nn.ReLU())

        self.conv6 = nn.Sequential(nn.Dropout2d(0.1, False), nn.Conv2d(512, out_channels, 1))
        self.conv7 = nn.Sequential(nn.Dropout2d(0.1, False), nn.Conv2d(512, out_channels, 1))

        self.conv8 = nn.Sequential(nn.Dropout2d(0.1, False), nn.Conv2d(512, out_channels, 1))

    def forward(self, x):
        feat1 = self.conv5a(x)
        sa_feat = self.sa(feat1)
        sa_conv = self.conv51(sa_feat)
        sa_output = self.conv6(sa_conv)
        feat2 = self.conv5c(x)
        sc_feat = self.sc(feat2)
        sc_conv = self.conv52(sc_feat)
        sc_output = self.conv7(sc_conv)

        feat_sum = sa_conv + sc_conv

        sasc_output = self.conv8(feat_sum)

        return sa_output,sc_output,sasc_output


class DCAM(nn.Module):
    #expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(DCAM, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes, affine=True)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes, affine=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out



class UNet(nn.Module):
    def __init__(self,norm_flag = 'l2'):
        super(UNet, self).__init__()
        self.CNN = model()
        if norm_flag == 'l2':
           self.norm = F.normalize
        if norm_flag == 'exp':
            self.norm = nn.Softmax2d()
    '''''''''
    def forward(self,t0,t1):
        out_t0_embedding = self.CNN(t0)
        out_t1_embedding = self.CNN(t1)
        #out_t0_conv5_norm,out_t1_conv5_norm = self.norm(out_t0_conv5),self.norm(out_t1_conv5)
        #out_t0_fc7_norm,out_t1_fc7_norm = self.norm(out_t0_fc7),self.norm(out_t1_fc7)
        out_t0_embedding_norm,out_t1_embedding_norm = self.norm(out_t0_embedding),self.norm(out_t1_embedding)
        return [out_t0_embedding_norm,out_t1_embedding_norm]
    '''''''''

    def forward(self,t0,t1):

        out_t0_conv5,out_t0_fc7,out_t0_embedding = self.CNN(t0)
        out_t1_conv5,out_t1_fc7,out_t1_embedding = self.CNN(t1)
        out_t0_conv5_norm,out_t1_conv5_norm = self.norm(out_t0_conv5,2,dim=1),self.norm(out_t1_conv5,2,dim=1)
        out_t0_fc7_norm,out_t1_fc7_norm = self.norm(out_t0_fc7,2,dim=1),self.norm(out_t1_fc7,2,dim=1)
        out_t0_embedding_norm,out_t1_embedding_norm = self.norm(out_t0_embedding,2,dim=1),self.norm(out_t1_embedding,2,dim=1)
        return [out_t0_conv5_norm,out_t1_conv5_norm],[out_t0_fc7_norm,out_t1_fc7_norm],[out_t0_embedding_norm,out_t1_embedding_norm]

In [None]:
#torch.set_float32_matmul_precision('High')
_ = trainer.fit(model=task, datamodule=datamodule)

In [None]:
trainer.test(model=task, datamodule=datamodule)

In [None]:
path2 = '/content/my_model.ckpt'
my_model = torch.load(path2)

In [None]:
%tensorboard --logdir='/content/logs/exp_1/version_0'

In [None]:
from google.colab import files
files.download('/content/logs/exp_1/version_0/events.out.tfevents.1682142165.c6e04740d553.2251.0')