In [4]:
#basics
import os
import time
import json
import pickle
import openeo
import numpy as np

# geography
import geopandas as gpd
import rasterio
from rasterio.features import geometry_mask


#download
import pyrosm as pyr
from openeo.rest import OpenEoApiError
from openeo.processes import ProcessBuilder, if_, is_nan



# plotting 
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

import logging
from pyrosm.data import sources
import numpy as np

def setup_logger(level: int = logging.INFO):
    """
    Set up a logger for the pipeline. 
    """
    logger = logging.getLogger()
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s"
    )
    
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    file_handler = logging.FileHandler("main.log")
    file_handler.setFormatter(formatter)

    logger.setLevel(level)
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    return logger


def get_available_cities():
    """
    Return all available cities from pyrosm 
    """
    return sources.cities.available


def stretch_hist(band):
    """
    Apply histogram stretching"""
    p2, p98 = np.percentile(band, (0.5, 99.5))
    return np.clip((band - p2) * 255.0 / (p98 - p2), 0, 255).astype(np.uint8)


class DataHandler: 
    def __init__(self, logger):
        """
        Initialize the DataHandler class and define openeo params.
        """
        self.logger = logger
        self.openeo_temporal_extent = ["2023-05-01", "2023-09-30"]
        self.openeo_bands = ["B04", "B03", "B02", "B08", "B12", "B11", "SCL"]
        self.openeo_max_cloud_cover = 30
        self.openeo_spatial_resolution = 10
        self.openeo_connection = None
        self.openeo_collections = None
        self.openeo_jobs = None
        
        if not os.path.exists("/kaggle/input/building-prediction/"):
            os.makedirs("/kaggle/input/building-prediction/")
            logger.info("Created data directory")
        else:
            logger.info("Data directory already exists")


    def create_directory(self, city: str):
        """
        Create a directory for each city.
        """
        os.makedirs(f"/kaggle/input/building-prediction//{city}", exist_ok=True)
        self.logger.info(f"{city}: Directory available")


    def get_buildings(self, city: str):
        """
        Return buildings for a given city
        """
        self.create_directory(city)
        
        # Check if local data for city is available
        if "buildings.geojson" in os.listdir(f"/kaggle/input/building-prediction/{city}"):
            self.logger.info(f"{city}: Using local building data")
            return gpd.read_file(f"/kaggle/input/building-prediction/{city}/buildings.geojson")

        # Download data for city
        fp = pyr.get_data(city, directory=os.path.join("data", city))
        osm = pyr.OSM(fp)
        self.logger.info(f"{city}: Downloaded data to data/{city}")

        # Get bounding box for city
        boundingbox = self.get_boundingbox(city, osm)

        # Get the buildings of the city
        buildings_geodf = osm.get_buildings()

        # Remove buildings outside of the bounding box of the city
        buildings_geodf = buildings_geodf.cx[boundingbox[0] : boundingbox[2], boundingbox[1] : boundingbox[3]]

        # Save the data of the city
        buildings_path = f"/kaggle/input/building-prediction/{city}/buildings.geojson"
        buildings_geodf.to_file(buildings_path, driver="GeoJSON")
        self.logger.info(f"{city}: Stored data to data/{city}/buildings.geojson")

        return buildings_geodf


    def get_boundingbox(self, city: str, osm = None):
        """
        Get the bounding box for a city.
        """

        # Return bounding box for Berlin as specified in exercise sheet to ensure correct testing results
        if city == "Berlin":
            return [13.294333, 52.454927, 13.500205, 52.574409]

        # Check if local bounds are available
        bounds_path = f"/kaggle/input/building-prediction/{city}/bounds.pkl"
        if os.path.exists(bounds_path):
            with open(bounds_path, "rb") as f:
                boundingbox = pickle.load(f)
            return boundingbox
        
        # Ensure OSM data is available 
        if osm is None:
            self.get_buildings(city=city)

        # Get the boundaries
        geoframe_bounds = osm.get_boundaries()
        boundingbox = geoframe_bounds[geoframe_bounds["name"] == city].total_bounds

        # Check if bounding box is None
        if np.isnan(boundingbox[0]) or np.isnan(boundingbox[1]) or np.isnan(boundingbox[2]) or np.isnan(boundingbox[3]):
            self.logger.info(f"{city}: Bounding box is None. Using total bounds instead")
            boundingbox = geoframe_bounds.total_bounds
        self.logger.info(f"{city}: Bounding box is {boundingbox}")     

        # Save total bounds to pickle file
        with open(bounds_path, "wb") as f:
            pickle.dump(boundingbox, f)
        self.logger.info(f"{city}: Saved bounds to data/{city}/bounds.pkl")

        return boundingbox
    

    def get_satellite_image(self, city: str, return_rasterio_dataset = False): 
        """
        Get satellite images for a city. Use local data if available. Returns an Array with (H, W, C) shape
        """
        if os.path.exists(f"/kaggle/input/building-prediction/{city}/openEO.tif"):
            self.logger.info(f"{city}: Using local satellite image")
            ds = rasterio.open(f"/kaggle/input/building-prediction/{city}/openEO.tif")
            if return_rasterio_dataset:
                return ds
            
            # Read all channels
            sat_data = ds.read()

            # Transpose to (H, W, C)
            sat_data = np.transpose(sat_data, (1, 2, 0))
            return sat_data
        else:
            self.download_satellite_image(city)
            return self.get_satellite_image(city)
    

    def connect_to_openeo(self):
        """
        Connect to the openEO backend and 
        """
        if self.openeo_connection is None:
            connection = openeo.connect("openeo.dataspace.copernicus.eu")
            connection.authenticate_oidc()
            self.openeo_connection = connection

            self.logger.info("Connected to openEO")
        else:
            self.logger.info("Already connected to openEO")


    def download_satellite_image(self, city: str):
        """
        Download satellite images for a city. Retry for 3 times if the job fails or takes longer than 30 min per job.
        """
        self.connect_to_openeo()
        
        # Log the currently running jobs
        self.logger.info("Current jobs:")
        for idx, job in enumerate(self.openeo_connection.list_jobs()):
            self.logger.info(f"{idx} {job['id']} {job['status']}")

        # Retry job up to 3 times. Raise exception after 3 retries.
        job_finished = False
        job_number_of_retries = 0
        while not job_finished : 
            if job_number_of_retries > 3:
                self.logger.error(f"{city}: Job failed after 3 retries")
                raise Exception(f"{city}: Job failed after 3 retries")
            job = self.create_and_start_openeo_job(city)    
            job_finished = self.await_job(city, job)
            job_number_of_retries += 1

        # Get job results and store in data/city
        job_results = self.openeo_connection.job(job.job_id).get_results()
        job_results.download_files(f"/kaggle/input/building-prediction/{city}")
        self.logger.info(f"{city}: Downloaded job results to data/{city}")


    def delete_jobs(self):
        """
        Delete all jobs on the openEO backend. Use only for debugging. 
        """
        self.connect_to_openeo()

        for idx, job in enumerate(self.openeo_connection.list_jobs()):
            self.logger.info(f"Deleting job {idx}, {job['id']}, {job['status']}")
            self.openeo_connection.job(job["id"]).delete_job()


    def create_and_start_openeo_job(self, city: str, collection_id: str = "SENTINEL2_L2A"):
        """
        Creates an openeo processing job for a city and starts it.
        """
        # Transform order in boundingbox to dict
        boundingbox = self.get_boundingbox(city)
        boundingbox = {"west": boundingbox[0], "south": boundingbox[1], "east": boundingbox[2], "north": boundingbox[3]}
        
        # Create datacube
        datacube = self.openeo_connection.load_collection(
            collection_id=collection_id,
            spatial_extent=boundingbox,
            temporal_extent=self.openeo_temporal_extent,
            bands=self.openeo_bands,
            max_cloud_cover=self.openeo_max_cloud_cover,
        ).resample_spatial(self.openeo_spatial_resolution)

        # Create cloud mask
        scl = datacube.band("SCL")

        # Filter out cloud median probability, cloud high probability, and snow/ice
        mask = (scl == 8) | (scl == 9) | (scl == 11)

        # Resample mask to the spatial resolution of the datacube
        mask = mask.resample_cube_spatial(datacube.band("B04"))
        
        # Create the RGB image
        datacube_rgbFU = datacube.filter_bands(self.openeo_bands[:-1])
        
        # Apply cloud mask
        datacube_rgb_masked = datacube_rgbFU.mask(mask)
        
        # Reduce temporal to median 
        datacube_rgb_masked_reduced_t = datacube_rgb_masked.reduce_temporal("median")

        # Define image format 
        datacube_for_submission = datacube_rgb_masked_reduced_t.save_result(format="GTiff")
        
        # Create openEO job with datacube
        job = datacube_for_submission.create_job(title=f"{city}__pic")
        self.logger.info(f"{city}: Created openEO job")

        # Start openEO job
        job.start_job()
        self.logger.info(f"{city}: Started openEO job with ID: {job.job_id}")        

        return job


    def await_job(self, city, job):
        """
        Awaits the processing of a openeo job. 
        Returns when the job is finished or raises an exception if the job failed.
        """

        for i in range(30):
            status = self.openeo_connection.job(job.job_id).status()
            self.logger.debug(f"{city}: Job {job.job_id} status: {status}")
          
            if status == "finished":
                self.logger.info(f"{city}: Job {job.job_id} finished")
                return True
            
            elif status == "error":
                self.logger.warning(f"{city}: Job {job.job_id} failed. Trying again.")
                return False            
            
            time.sleep(60)
        self.logger.error(f"{city}: Job {job.job_id} did not finish in time")
        return False

    def get_building_mask(self, city: str, loaded_buildings = None, all_touched: bool = False):  
        """
        Get the local building mask for buildings in a city.
        """
        if all_touched:
            filename = "building_mask_dense"
        else:
            filename = "building_mask_sparse"
        # Check if the building mask is already available
        if os.path.exists(f"/kaggle/input/building-prediction/{city}/{filename}.tif"):
            self.logger.info(f"{city}: Using local building mask")
            return rasterio.open(f"/kaggle/input/building-prediction/{city}/{filename}.tif").read(1)

        # Create new building mask 
        satellite_image = self.get_satellite_image(city, return_rasterio_dataset=True)

        # Get satellite image metadata
        transform = satellite_image.transform
        out_shape = (satellite_image.height, satellite_image.width)
        crs = satellite_image.crs

        # Read the GeoJSON file with building polygons
        if loaded_buildings is not None:
            buildings = loaded_buildings
        else:
            buildings = self.get_buildings(city)
            buildings = buildings.to_crs(crs)  # Ensure the CRS matches the GeoTIFF

        # Create a mask where pixels inside buildings are True, others are False
        # TODO all_touched paramer nutzen für zweite Maske
        mask = geometry_mask(
            buildings.geometry, transform=transform, invert=True, out_shape=out_shape, all_touched=all_touched,
        )
        
        # Store the mask as a GeoTIFF file
        
        out_meta = satellite_image.meta
        out_meta.update(
            {
                "driver": "GTiff",
                "height": mask.shape[0],
                "width": mask.shape[1],
                # "transform": transform,
                "count": 1,
            }
        )

        # boolmask is automatically being saved as int16 [0,1]
  
        with rasterio.open(f"/kaggle/input/building-prediction/{city}/{filename}.tif", "w", **out_meta) as dest:
            dest.write(mask, indexes=1)

        return mask

import rasterio
import numpy as np
import torch
from torch.utils.data import random_split, DataLoader, Dataset
import matplotlib.pyplot as plt



def create_tensor_of_windows(image, mask, patch_size=128):
    """
    Create tensor with dimensions [N, H, W, C+1] from the satellite image of the city.
    image should be of shape (H, W, C)
    mask should be of shape (H, W, 1)
    """
    # Merge Mask onto Image
    image_with_mask = np.dstack((image, mask))

    # cut of edges so image shape is divisible by patch size
    reduced_image = image_with_mask[:-(image_with_mask.shape[0]%patch_size), :-(image_with_mask.shape[1]%patch_size)]


    # calculate number of patches
    N = reduced_image.shape[0]//patch_size*reduced_image.shape[1]//patch_size

    # initialize target array
    target_array = np.zeros((N, patch_size, patch_size, reduced_image.shape[-1]), dtype=np.uint16)

    # fill target array
    for row in range(patch_size):
        for col in range(patch_size):
            # calculate row and column indices
            row_filter = range(row,reduced_image.shape[0]+row,patch_size)
            col_filter = range(col,reduced_image.shape[1]+col,patch_size)

            # write values into target array
            target_array[:, row, col, :] = reduced_image[row_filter][:,col_filter,:].reshape(-1, reduced_image.shape[-1])

    return target_array


   
def divide_into_test_training(data, train_ratio=0.8):
    """
    Divide the data into test and training split.
    """
    
    # Define the split ratio
    test_ratio = 1 - train_ratio

    # Calculate the sizes for training and test sets
    train_size = int(train_ratio * len(data))
    test_size = len(data) - train_size

    # Split the dataset
    train_dataset, test_dataset = random_split(data, [train_size, test_size])

    return train_dataset, test_dataset


def create_data_loaders(train_dataset, test_dataset, batch_size = 64):
    """
    Create DataLoaders.
    """
    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader


def apply_preprocessing_pipeline(images, masks, patch_size = 128, train_ratio = 0.8, batch_size = 64):
    """
    applies windowing, deviding into train and test and creating data loaders.
    """

    # for each city create patched images
    patched_images = []
    for image, mask in zip(images, masks):
        patched_images.append(create_tensor_of_windows(image, mask, patch_size=patch_size))

    # concatenate all patched images
    patched_images_merged = np.concatenate(patched_images, axis=0)

    # reorder axis to [N, C, H, W] for torch
    patched_images_merged = np.transpose(patched_images_merged, (0,3,1,2))

    # devide into train and test
    train_ds, test_ds = divide_into_test_training(patched_images_merged,train_ratio=train_ratio)

    # create data loaders
    train_loader, test_loader = create_data_loaders(train_ds, test_ds, batch_size=batch_size)

    return train_loader, test_loader

def plot_sub_image( image_data):
    """
    Plot sub image.
    """
    fig, ax = plt.subplots(1, 2, figsize=(10, 5))

    ax[0].imshow(stretch_hist(image_data[:,:,:3]))
    ax[1].imshow(stretch_hist(image_data[:,:,-1]))
    return fig





In [3]:
!pip install openeo pyrosm

Collecting openeo
  Downloading openeo-0.30.0-py3-none-any.whl.metadata (7.3 kB)
Collecting pyrosm
  Downloading pyrosm-0.6.2.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting pystac>=1.5.0 (from openeo)
  Downloading pystac-1.10.1-py3-none-any.whl.metadata (6.4 kB)
Collecting python-rapidjson (from pyrosm)
  Downloading python_rapidjson-1.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (21 kB)
Collecting shapely>=1.6.4 (from openeo)
  Downloading shapely-2.0.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.0 kB)
Collecting cykhash (from pyrosm)
  Downloading cykhash-2.0.1.tar.gz (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.9/44.9 

In [6]:
# basics
import numpy as np
from tqdm.notebook import tqdm 


# torch
import torch
from torch.utils.data import Dataset
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter




# Configure logging for the pipeline
logger = setup_logger(level='INFO')

2024-06-28 12:04:04.283577: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-28 12:04:04.283699: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-28 12:04:04.472173: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [8]:
cities = ['Aachen', 'CapeTown', 'Hamburg', 'Johannesburg', 'London', 'Montreal', 'Paris', 'Seoul', 'Singapore', 'Sydney']

datahandler = DataHandler(logger)


2024-06-28 12:04:15,909 - root - INFO - __init__ - Data directory already exists


In [10]:
# load images and mask for all specified cites

import os
images = []
sparse_masks=[]
dense_masks=[]

for city in tqdm(cities):
    buildings = None
    images.append(datahandler.get_satellite_image(city))
    sparse_masks.append(datahandler.get_building_mask(city, all_touched=False, loaded_buildings=buildings))
    dense_masks.append(datahandler.get_building_mask(city, all_touched=True, loaded_buildings=buildings))

  0%|          | 0/10 [00:00<?, ?it/s]

2024-06-28 12:04:36,008 - root - INFO - get_satellite_image - Aachen: Using local satellite image
2024-06-28 12:04:36,919 - root - INFO - get_building_mask - Aachen: Using local building mask
2024-06-28 12:04:37,001 - root - INFO - get_building_mask - Aachen: Using local building mask
2024-06-28 12:04:37,092 - root - INFO - get_satellite_image - CapeTown: Using local satellite image
2024-06-28 12:04:39,440 - root - INFO - get_building_mask - CapeTown: Using local building mask
2024-06-28 12:04:39,704 - root - INFO - get_building_mask - CapeTown: Using local building mask
2024-06-28 12:04:40,020 - root - INFO - get_satellite_image - Hamburg: Using local satellite image
2024-06-28 12:04:42,234 - root - INFO - get_building_mask - Hamburg: Using local building mask
2024-06-28 12:04:42,487 - root - INFO - get_building_mask - Hamburg: Using local building mask
2024-06-28 12:04:42,754 - root - INFO - get_satellite_image - Johannesburg: Using local satellite image
2024-06-28 12:04:48,917 - roo

In [12]:
# apply training pipeline
# TODO make train test split consistent so we can train with multiple sizes, dont know if there is an advantage though
train_loader, test_loader = apply_preprocessing_pipeline(images, sparse_masks, patch_size = 128, train_ratio = 0.8, batch_size = 64)

In [13]:
patch_size = 128
train_ratio = 0.8
batch_size = 64

In [15]:
patched_images = []
masks = sparse_masks
for image, mask in tqdm(zip(images, masks)):
    patched_images.append(create_tensor_of_windows(image, mask, patch_size=patch_size))

# concatenate all patched images
patched_images_merged = np.concatenate(patched_images, axis=0)

0it [00:00, ?it/s]

In [29]:
 patched_images_merged_reduced = patched_images_merged[~(patched_images_merged[...,-1].sum(axis=1).sum(axis=1)<1)]

In [30]:
# reorder axis to [N, C, H, W] for torch
patched_images_merged_reduced = np.transpose(patched_images_merged_reduced, (0,3,1,2))

# devide into train and test
train_ds, test_ds = divide_into_test_training(patched_images_merged_reduced,train_ratio=train_ratio)

# create data loaders
train_loader, test_loader = create_data_loaders(train_ds, test_ds, batch_size=batch_size)

In [31]:
# initialize model, taken from exercise pdf
model = nn.Sequential(
    nn.Conv2d(6, 32, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(128, 1, kernel_size=1, padding=0),
    nn.Sigmoid())

# initialize tensorboard writer
writer = SummaryWriter()

AttributeError: 'DataLoader' object has no attribute 'to'

In [32]:
# Instantiate the model, loss function, and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
num_epochs = 50

model.train()
for epoch in tqdm(range(num_epochs)):
    for batch in train_loader:
        # splid in inputs and labels
        inputs = batch[:,:-1].to(torch.float32)
        labels = batch[:,-1, np.newaxis].to(torch.float32)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward pass
        outputs = model(inputs)

        # calculate loss
        loss = criterion(outputs, labels)

        # write to tensorboard
        writer.add_scalar("Loss/train", loss, epoch)

        # backward pass
        loss.backward()

        # optimizer step
        optimizer.step()
    


  0%|          | 0/50 [00:00<?, ?it/s]

TypeError: can't convert np.ndarray of type numpy.uint16. The only supported types are: float64, float32, float16, complex64, complex128, int64, int32, int16, int8, uint8, and bool.

## Save Model

In [9]:
import os

os.makedirs("saved_models", exist_ok=True)
torch.save(model.state_dict(), "saved_models/model1")

# Evaluation

In [None]:
t  = torch.Tensor(test_loader.dataset)

# splid in inputs and labels
test_inputs = t[:,:-1]#.to(torch.float32)
test_labels = t[:,-1, np.newaxis]#.to(torch.float32)

test_results = model(test_inputs).detach()

# see how many percnet where predicted right
threshold = 0.5
((test_results>threshold)==test_labels).sum()/np.prod(test_labels.shape)



In [None]:
from sklearn.metrics import RocCurveDisplay

RocCurveDisplay.from_predictions(
   test_labels.flatten(), test_results.flatten())

In [None]:
writer.flush()

# Download

In [None]:


buildings = []
sat_images = []
building_masks = []

for city in cities: 
    buildings.append(datahandler.get_buildings(city))
    sat_images.append(datahandler.get_satellite_image(city))
    building_masks.append(datahandler.get_building_mask(city))

# Plot the expected results for the first city 
datahandler.plot(city[0])

In [None]:
import data_preparation

for city in cities:
    data_preparation.create_tensor(city)

# Download

In [None]:
# Download 

for city in cities: 
    sat_image = datahandler.get_satellite_image(city)
    mask = datahandler.get_building_mask(city)

# Plot the expected results for the first city 
datahandler.plot(city[0])