In [48]:
from shapely.geometry import Polygon
import pandas as pd

# Import imageloader
import rasterio
import cv2

import matplotlib.pyplot as plt
import numpy as np

from ipywidgets import interact, interactive, fixed, interact_manual

import os

%matplotlib inline
%reload_ext autoreload

## Read the dataset

In [49]:
# labels: germany_dataset/labels
# images: germany_dataset/images

# For every image, there is a corresponding label file with the same name

# The labels file is a space separated file with the following columns:
# 1. category
# 2. x-center
# 3. y-center
# 4. x-width
# 5. y-width
def load_image_and_labels(file):
    labels = pd.read_csv('germany_dataset/labels/' + file, sep=' ', header=None)
    labels.columns = ['category', 'x_center', 'y_center', 'x_width', 'y_width']
    
    # Load the image
    image_name = file.replace('txt', 'tif')
    crs = "EPSG:32633"
    
    with rasterio.open('germany_dataset/images/' + image_name, crs=crs) as src:
        image = src.read()
        image = np.moveaxis(image, 0, -1)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # image size = 832 x 832
    # The x-center and y-center are in the range [0, 1]

    # Create a list of polygons
    polygons = []
    for i in range(labels.shape[0]):
        x_center = labels.iloc[i, 1]
        y_center = labels.iloc[i, 2]
        x_width = labels.iloc[i, 3]
        y_width = labels.iloc[i, 4]

        x1 = (x_center - x_width / 2) * 832
        x2 = (x_center + x_width / 2) * 832
        y1 = (y_center - y_width / 2) * 832
        y2 = (y_center + y_width / 2) * 832

        polygons.append(Polygon([(x1, y1), (x2, y1), (x2, y2), (x1, y2)]))
        
    return image, polygons

In [50]:
# Plot the image with the polygons drawn in
file = os.listdir('germany_dataset/labels')[0]

def plot_image_and_labels(file):
    image, polygons = load_image_and_labels(file)

    # Convert the image to a numpy array
    image = np.array(image)

    for polygon in polygons:
        x, y = polygon.exterior.xy
        plt.plot(x, y, c='r')
        
    # Plot the image
    plt.imshow(image)
    plt.show()
    
interact(plot_image_and_labels, file=os.listdir('germany_dataset/labels'))

interactive(children=(Dropdown(description='file', options=('solarpanels_hd_1__x0_5629_y0_21002_dxdy_832.txt',…

<function __main__.plot_image_and_labels(file)>

In [51]:
# Convert the polygons to a mask to be used in the model
# The mask will be a 832 x 832 x 1 array
def polygons_to_mask(polygons):
    mask = np.zeros((832, 832, 1))
    for polygon in polygons:
        x, y = polygon.exterior.xy
        x = np.array(x, dtype=np.int32)
        y = np.array(y, dtype=np.int32)
        mask = cv2.fillPoly(mask, [np.column_stack((x, y))], (255))
    return mask

# Convert the mask to a polygon
def mask_to_polygons(mask):
    mask = mask.astype(np.uint8)
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    polygons = []
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        polygons.append(Polygon([(x, y), (x + w, y), (x + w, y + h), (x, y + h)]))
    return polygons

### Load the dataset

In [52]:
import torch
from torch.utils.data import Dataset, DataLoader

In [53]:
dataset = os.listdir('germany_dataset/labels')

class GermanyDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        image, polygons = load_image_and_labels(self.dataset[idx])
        mask = polygons_to_mask(polygons)
        
        return torch.tensor(image, dtype=torch.float32), torch.tensor(mask, dtype=torch.float32)

In [54]:
# From the dataset, create a train and test set
from sklearn.model_selection import train_test_split

train, test = train_test_split(dataset, test_size=0.2)

train_dataset = GermanyDataset(train)
test_dataset = GermanyDataset(test)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

### Train the model

In [55]:
import pytorch_lightning as pl
from models.base import BaseModel
from models.architectures.deep_lab import DeepLabModel

In [56]:
model = DeepLabModel(input_size=(832, 832), num_classes=1)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

base_model = BaseModel(model, loss_fn, optimizer)
trainer = pl.Trainer(max_epochs=10)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [57]:
trainer.fit(base_model, train_loader)

/home/manuel/TuE/InterdisciplinaryProject/.venv/lib/python3.11/site-packages/pytorch_lightning/trainer/configuration_validator.py:74: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.

  | Name    | Type             | Params
---------------------------------------------
0 | model   | DeepLabModel     | 63.6 M
1 | loss_fn | CrossEntropyLoss | 0     
---------------------------------------------
63.6 M    Trainable params
0         Non-trainable params
63.6 M    Total params
254.363   Total estimated model params size (MB)
/home/manuel/TuE/InterdisciplinaryProject/.venv/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>


  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>


  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>


TypeError: cross_entropy_loss(): argument 'input' (position 1) must be Tensor, not collections.OrderedDict