In [1]:
#TODO
#1. Add the 64*64 outputs to one image.
#2. For every pixel, convert it into the most probable one.Maybe depend on neighbours??
# Could be multiple overlaps, find a way to handle that.
#3. Convert every pixel to correct polygon json format

In [1]:
import numpy as np
import cv2
import json
#import satlaspretrain_models
import torch
from scipy.signal import convolve2d
from scipy import ndimage

In [2]:
def post_process_optimized(raw_output, patch_size):
    """
    Post-processes the raw output from the model using convolution for faster accumulation,
    and then determines class labels using argmax.
    """
    if not isinstance(raw_output, np.ndarray):
        raise TypeError("Input must be a NumPy array.")
    if raw_output.shape != (1024, 1024, 5):
        raise ValueError("Input must have shape (1024, 1024, 5).")

    kernel = np.ones((patch_size, patch_size), dtype=np.float32).reshape(patch_size, patch_size, 1)
    # Use ndimage.convolve once for all channels
    accumulated_probs = ndimage.convolve(raw_output, kernel, mode='constant', cval=0)

    class_labels = np.argmax(accumulated_probs, axis=-1)
    return class_labels

def test_post_process_optimized():
    positions = np.mgrid[0:1024:256, 0:1024:256].reshape(2, -1).T
    
    probs = np.random.dirichlet(np.ones(5)*2, size=positions.shape[0])
    probs[:,0] = np.clip(probs[:,0], 0.3, 1.0)
    probs = probs / probs.sum(axis=1, keepdims=True)
    
    outputs = [(x, y, p.tolist()) for (x, y), p in zip(positions, probs)]
    print((outputs[0]))
    return post_process_optimized(outputs)

In [3]:
import pickle

with open('reconstructed_image.pkl', 'rb') as f:
    predictions = pickle.load(f)

In [4]:
(predictions.shape)

(1024, 1024, 5)

In [5]:
raw_classes = np.argmax(predictions, axis=-1)
raw_classes

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]])

In [6]:
assigned_labels = post_process_optimized(predictions, 64)

KeyboardInterrupt: 

In [25]:
assigned_labels

array([[0, 0, 0, ..., 3, 3, 3],
       [0, 0, 0, ..., 3, 3, 3],
       [0, 0, 0, ..., 3, 3, 3],
       ...,
       [2, 2, 2, ..., 2, 2, 2],
       [2, 2, 2, ..., 2, 2, 2],
       [2, 2, 2, ..., 2, 2, 2]])

## torch and cuda version

In [8]:
import torch

def post_process_torch(outputs, gamma=0.5):
    device = torch.device("cpu")  # Assuming model outputs are on CUDA
    empty_img = torch.zeros((1024, 1024, 5), device=device)
    
    # Convert outputs to tensor once
    coords = torch.tensor([[x,y] for x,y,_ in outputs], device=device)
    probs = torch.tensor([p for _,_,p in outputs], device=device)
    
    # Batch assignment
    for coord, prob in zip(coords, probs):
        empty_img[coord[0]:coord[0]+64, coord[1]:coord[1]+64] += prob
    
    padded = torch.nn.functional.pad(empty_img, (0,0,1,1,1,1))
    neighbors = gamma * (padded[:-2, 1:-1] + padded[2:, 1:-1] + 
                        padded[1:-1, :-2] + padded[1:-1, 2:])
    
    return torch.argmax(empty_img + neighbors, dim=2)  # Keep on GPU if needed

## From pixel img to polygon

In [61]:

def converter(tensor, size_limit=1000):
    # Dictionary to store polygons for each unique value
    polygons = {}
    # Ensure tensor is numpy and squeeze to remove batch dimension
    if isinstance(tensor, torch.Tensor):
        tensor = tensor.squeeze().numpy().astype(np.uint8)
    
    for val in range(5):  # Iterate over unique values (0-4)
        mask = np.uint8((tensor == val) * 255)  # Create binary mask
        
        # Find contours (polygons) of connected components
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Filter out small connected components
        large_contours = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area >= size_limit:  # Keep only contours with area >= size_limit
                large_contours.append(contour)
        
        # Create a new mask and draw the large contours
        cleaned_mask = np.zeros_like(mask)
        cv2.drawContours(cleaned_mask, large_contours, -1, 255, thickness=cv2.FILLED)
        
        # Update the original mask
        tensor[tensor == val] = 0  # Clear current value in the original mask
        tensor[cleaned_mask == 255] = val  # Add cleaned mask
        
        # Convert contours to a list of polygons
        polygons[val] = [c.reshape(-1, 2).tolist() for c in large_contours]  # Use filtered contours
    
    return polygons

# Test tensor with one square of 1s in top left corner of size 50x50
# Rest of the tensor is zero
test_tensor = torch.zeros((1, 1024, 1024))
# Create a square of 1s
test_tensor[0, 0:120, 0:120] = 1

# Apply the converter with a large size_limit
#final_prediction = converter(tensor=test_tensor, size_limit=10000)
final_prediction = converter(assigned_labels)


def write_json(polygons, filename:str):
    #Function that takes list of polygons and writes them to a json file.
    #The list of polygons is a list of lists of polygons, where each polygon is a list of points.
    # Dictionary to map unique values to class names
    numper_2_class={1:"plantation",2:"grassland_shrubland",3:"mining",4:"logging"}
    with open(f"{filename}.json", "w") as file:
        images_overview={"images":[]}
        # Iterate over images in polygons
        for i in range(len(polygons)):

            curr={"file_name":f"evaluation_{i}.tif","annotations":[]}
            #Iterate over types of polygons in image
            for j in range(1,len(polygons[i])):
                # Convert polygons to list of coordinates

                for k in range(len(polygons[i][j])):
                    listed_polygons=[]

                    for p in range(len(polygons[i][j][k])):
                        listed_polygons.append(polygons[i][j][k][p][0])
                        listed_polygons.append(polygons[i][j][k][p][1])
                    curr["annotations"].append({"class":numper_2_class[j],"segmentation":listed_polygons})
            images_overview["images"].append(curr)
        file.write(json.dumps(images_overview, ensure_ascii=False, indent=4))
write_json([final_prediction],"test")
# Now, polygons[val] contains a list of polygons for each unique value.


In [26]:
from PIL import Image



# Assuming y_test is a numpy array or torch tensor containing the
# class labels for the entire 1024x1024 image.
# Example:
# y_test = ... # Your test labels (numpy array or torch tensor)

# If y_test is a torch tensor, convert it to a numpy array
if isinstance(assigned_labels, torch.Tensor):
    assigned_labels = assigned_labels.cpu().numpy()  # Move to CPU if it's on GPU

# Ensure y_test is of integer type
assigned_labels = assigned_labels.astype(np.uint8)

# Create a color mapping for the classes
color_map = {
    0: [0, 0, 0],      # background: black
    1: [255, 0, 0],    # plantation: red
    2: [0, 255, 0],    # grassland_shrubland: green
    3: [0, 0, 255],    # mining: blue
    4: [255, 255, 0]     # logging: yellow
}

# Create an RGB image where each class is represented by its color
height, width = assigned_labels.shape
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
for i in range(height):
    for j in range(width):
        class_id = assigned_labels[i, j]
        rgb_image[i, j] = color_map[class_id]

# Create a PIL image from the numpy array
img = Image.fromarray(rgb_image)

# Save the image to a file
img.save("rasterized_prediction.png")

print("Rasterized image saved to rasterized_prediction.png")

Rasterized image saved to rasterized_prediction.png


In [8]:
from PIL import Image



# Assuming y_test is a numpy array or torch tensor containing the
# class labels for the entire 1024x1024 image.
# Example:
# y_test = ... # Your test labels (numpy array or torch tensor)

# If y_test is a torch tensor, convert it to a numpy array
if isinstance(raw_classes, torch.Tensor):
    raw_classes = raw_classes.cpu().numpy()  # Move to CPU if it's on GPU

# Ensure y_test is of integer type
raw_classes = raw_classes.astype(np.uint8)

# Create a color mapping for the classes
color_map = {
    0: [0, 0, 0],      # background: black
    1: [255, 0, 0],    # plantation: red
    2: [0, 255, 0],    # grassland_shrubland: green
    3: [0, 0, 255],    # mining: blue
    4: [255, 255, 0]     # logging: yellow
}

# Create an RGB image where each class is represented by its color
height, width = raw_classes.shape
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
for i in range(height):
    for j in range(width):
        class_id = raw_classes[i, j]
        rgb_image[i, j] = color_map[class_id]

# Create a PIL image from the numpy array
img = Image.fromarray(rgb_image)

# Save the image to a file
img.save("rasterized_prediction.png")

print("Rasterized image saved to rasterized_prediction.png")

Rasterized image saved to rasterized_prediction.png
