# U-Net til segmentering av EO satellittbilder

Krever: torch, torchvision, torchgeo, geopandas, cv2, numpy, matplotlib

### Klargjøring av datasett

En god måte å starte å eksperimentere med maskinlæring på først teste med noen eksisterende datasett. I denne notebooken skal vi bruke [Landcover AI](https://paperswithcode.com/dataset/landcover-ai) som inneholder annoterte EO bilder med segmentering av 4 kategorier: Skog, vei, vann og bygninger. Datasettet kan laststes ned fra deres deres nettside direkte, men finnes også som ett av flere datasett i *torchgeo* biblioteket.

In [None]:
# importere torchgeo
import os
import tempfile

from torch.utils.data import DataLoader
import torch
from torchgeo.datasets import LandCoverAI

In [None]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

# laste ned og opprette trenings og test sett

landcover_train = LandCoverAI("data/landcover",split="train",download=True)
landcover_test = LandCoverAI("data/landcover",split="test",download=True)


## Dersom download direkte ikke virker:

# Last ned versjon 1 herfra : https://landcover.ai.linuxpolska.com/
# Legg .zip fila i "/data/landcover"
# sett download=False og kjør cellen på nytt


Ta en titt i mappen for å se hvordan dataen har lagt seg.

In [None]:
# lage dataloaders
batch_size = 4
trainloader = torch.utils.data.DataLoader(landcover_train, batch_size=batch_size, shuffle=True, num_workers=0)
testloader = torch.utils.data.DataLoader(landcover_test, batch_size=batch_size, shuffle=True, num_workers=0)

In [None]:
import matplotlib.pyplot as plt

image = landcover_train[2000]

plt.imshow(image["image"].permute(1,2,0).int())
plt.matshow(image["mask"])
plt.show()

### Sette opp U-Net

Vi sskal sette opp et klassisk U-Net, bestående av en encoder og en decoder.

![](../../media/Unet.png)

For å gjøre det mer oversiktelig deler vi det opp i flere små biter. Først koder vi en standard konvolusjonsblokk. Så bruker vi blokkene til å lage dekoderen. Også lager vi en dekoder og setter det sammen til det fullstendige nettverket.

In [None]:
import torch.nn  as nn
import torch.nn.functional as F

In [None]:
### sette opp et en blokk for dobbelt 3x3 konvolusjonslag med ReLU aktivering
class Block(nn.Module):
	def __init__(self, inChannels, outChannels):
		super().__init__()
		self.conv1 = nn.Conv2d(inChannels, outChannels, 3,padding=1)
		self.relu = nn.ReLU()
		self.conv2 = nn.Conv2d(outChannels, outChannels, 3, padding=1)
		self.bn1 = nn.BatchNorm2d(outChannels)
		self.bn2 = nn.BatchNorm2d(outChannels)
		
	def forward(self, x):
		return self.relu(self.bn2(self.conv2(self.relu(self.bn1(self.conv1(x))))))

In [None]:
class Encoder(nn.Module):
    def __init__(self,channels=[3,32,64,128]):
        super().__init__()
        # Encoder funksjoner
        self.channels = channels
        self.blocks = nn.ModuleList([Block(channels[i],channels[i+1]) for i in range(len(channels)-1)])
        self.maxpool = nn.MaxPool2d(kernel_size=2,stride=2)


    def forward(self, x):
        # forward for encoder
        block_outputs = []
        for i in range(len(self.blocks)-1):
            x = self.blocks[i](x)
            block_outputs.append(x)
            x = self.maxpool(x)

        x = self.blocks[-1](x)
        block_outputs.append(x)
        return block_outputs

In [None]:
class Decoder(nn.Module):
    def __init__(self,channels=[128,64,32]):
        super().__init__()
        # Decoder funksjoner
        self.channels = channels
        self.blocks = nn.ModuleList([Block(channels[i], channels[i + 1]) for i in range(len(channels) - 1)])  
        self.upConvs = nn.ModuleList([nn.ConvTranspose2d(channels[i], channels[i + 1], 2, 2) for i in range(len(channels) - 1)]) 
    def forward(self,x,block_outputs):
        x = self.upConvs[0](x)
        # forward for decoder
        for i in range(1,len(self.channels)-1):
            x = self.upConvs[i](x)
            x = torch.cat([x,block_outputs[i]],dim=1)
            x = self.blocks[i](x)

        return x
            


Her setter vi sammen encoderen og decoderen. Om man vil eksperimentere med størrelse og antall blokker kan det gjøres med argumentene `decChannels` og `encChannels`. Antall klasser settes med `nbClasses`. Det første tallet i `encChannels` må matche antall kanaler i input-dataen.

In [None]:
class UNet(nn.Module):
	def __init__(self, encChannels=[3, 64, 128, 256],
		decChannels=[256,128,64],
		nbClasses=5,
		outSize=(512,512)):
		super().__init__()
		# initialize encoder
		self.encoder = Encoder(encChannels)
		self.decoder = Decoder(decChannels)
		# initialize decoder
		self.head = nn.Conv2d(decChannels[-1], nbClasses, 1)
		self.outSize = outSize

	def forward(self, x):
		encFeatures = self.encoder(x)

		decFeatures = self.decoder(encFeatures[::-1][0], encFeatures[::-1][1:])

		map = self.head(decFeatures)

		map = F.interpolate(map,(512,512))
		return map

In [None]:
from torchinfo import summary

model = UNet(encChannels=[3,16,32,64,128],decChannels=[128,64,32,16])

model.train()
if torch.cuda.is_available():
    model.to("cuda") 

summary(model,(4,3,512,512))

### Sette opp en treningsloop

Vi er nå klare for å trene nettverket vårt.

In [None]:
# definer tapsfunksjon og optimeringsalgoritme
import torch.optim as optim

lossfunc = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters())

In [None]:
# lage en accuracy test
import numpy as np
def test_accuracy(batch_lim=None):
    acc = []
    with torch.no_grad():
        for i,batch in enumerate(testloader):
            images = batch["image"]/255
            labels = batch["mask"]
            if torch.cuda.is_available():
                out = model(images.to("cuda")).to("cpu")
            else:
                out = model(images)
            _, prediction = torch.max(torch.softmax(out,dim=1),dim=1)
            pixel_accuracy = torch.sum(torch.eq(prediction,labels))/prediction.numel()
            acc.append(pixel_accuracy)
            if batch_lim is None:
                pass
            elif i > batch_lim:
                break
    return np.mean(np.array(acc))

In [None]:
from torch.utils.tensorboard import SummaryWriter
current_best = 0.2
exp_number = 1

run = f"runs/landcover/UNet/{exp_number}/"
writer = SummaryWriter(log_dir=run)


for e in range(5):
    # lage variabler for totalt tap
    running_loss = 0
    for i,batch in enumerate(trainloader):
        optimizer.zero_grad()

        images = batch["image"]/255
        labels = batch["mask"].long()

        if torch.cuda.is_available():
            images = images.to("cuda")
            labels = labels.to("cuda")
        
        out = model(images)
        loss = lossfunc(out,labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if i % 200 == 199 or i == 0:
            if i == 0:
                avg_loss = running_loss
            else:
                avg_loss = running_loss/(200)
            print(f"ep: {e+1}: batch: {(i+1)} Avg batch loss {avg_loss}")
            writer.add_scalar("loss",avg_loss,e*len(trainloader) + i)
            running_loss = 0
        
            accuracy = test_accuracy()
            print(f"Accuracy {accuracy}")
            writer.add_scalar("val/acc",accuracy,e*len(trainloader) + i)
            if accuracy > current_best:
                torch.save(model.state_dict(),run+"best.pt")
                current_best = accuracy
                print("new model saved")

Og inspiser noen resultater

In [None]:
import torchvision
import numpy as np
import matplotlib.pyplot as plt


def view_batch(batch):
    image = batch["image"]/255
    labels = batch["mask"]
    if torch.cuda.is_available():
        out = model(image.to("cuda")).to("cpu")
    else:
        out = model(image)
        
    _, prediction = torch.max(torch.softmax(out,dim=1),dim=1)
    accuracy = torch.sum(torch.eq(prediction,labels))/prediction.numel()    

    labels = labels.unsqueeze(1)
    prediction = prediction.unsqueeze(1)

    fig, axs = plt.subplots(3,1,figsize=(30,10),dpi=150,gridspec_kw = {'hspace':0})

    axs = axs.flatten()

    axs[0].imshow(torchvision.utils.make_grid((image*255).int()).permute(1,2,0))
    axs[0].set_xticks([])
    axs[0].set_yticks([])
    axs[1].matshow(torchvision.utils.make_grid(labels)[0],vmin=0,vmax=4)
    axs[1].set_xticks([])
    axs[1].set_yticks([])
    axs[2].matshow(torchvision.utils.make_grid(prediction)[0],vmin=0,vmax=4)
    axs[2].set_xticks([])
    axs[2].set_yticks([])
    plt.show()

    print(f"Accuracy: {accuracy*100:.2f}%")

Last inn den beste modellen

In [None]:
# last inn beste modell
if torch.cuda.is_available():
    model.load_state_dict(torch.load("best.pt",map_location="cuda"))
else:
    model.load_state_dict(torch.load("best.pt",map_location="cpu"))

# print(test_accuracy(10))


In [None]:
loop = iter(testloader)

In [None]:
batch = next(loop)

view_batch(batch)

Hva om vi vil kjøre nettverket vårt på georeferert satellittdata?
Vi kan ta en av GeoTiffene fra mappen *images*, og laste den inn som et `GeoImage`.

In [None]:
# legg til src i path
from pathlib import Path
import sys

src_path = Path(".").absolute().parents[1] / Path('src')
sys.path.append(src_path.__str__())

from geoutils import GeoImage

In [None]:
fil = "data/landcover/images/M-33-7-A-d-3-2.tif"

image = GeoImage(fil)

#finn høyde og bredde
c,H,W = image.data.shape

# lag en maske som er like stor som bildet
mask_data = np.zeros((1,H,W))

Vårt nettverk tar inn bilder av størrelse 512x512, vi må derfor gå gjennom det originale bildet og gjøre nettverket på såkalte "tiles" eller "chips" av den størrelsen. Etter hvert som vi regner ut segmenteringsmaskene, fyller vi dem inn i `mask_data`.

In [None]:
for i in range(0,H-512,512):
    for j in range(0,W-512,512):
        local_image=image.data[:,i:i+512,j:j+512]/255
        local_image = torch.from_numpy(local_image).unsqueeze(0).float()
        if torch.cuda.is_available():
            out = model(local_image.to("cuda")).to("cpu")
        else:
            out = model(local_image)
        
        _, prediction = torch.max(torch.softmax(out,dim=1),dim=1)
        mask_data[0,i:i+512,j:j+512] = prediction



Nå kan vi skrive ut masken vi har laget til koordinetene til input-bildet

In [None]:
from geoutils import CopyGeoTransform

mask_image = CopyGeoTransform(image,mask_data,"data/segmentert_maske.tif")