In [None]:
import glob
import numpy as np
import os
import cv2
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchvision.models.segmentation.deeplabv3 import DeepLabV3_ResNet50_Weights
import torchvision.models as models
from pathlib import Path
import torch
import gc
from matplotlib import pyplot as plt
from pathlib import Path
import os
import geopandas as gpd
import pandas as pd
from skimage import measure
from shapely.geometry import Polygon, mapping
from osgeo import gdal, gdalconst
import glob


In [None]:
!pip install rasterio geojson
import geojson
import rasterio
from rasterio.transform import Affine

If using google colab:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Model (DeeplabV3)

In [None]:
class Deeplabv3(nn.Module):
    def __init__(self, num_classes=1):
        super().__init__()


        self.deeplab = models.segmentation.deeplabv3_resnet50(weights=DeepLabV3_ResNet50_Weights.DEFAULT)
        # One class(forest/non-forest)
        self.deeplab.classifier[4] = nn.Conv2d(256, 1, kernel_size=(1, 1), stride=(1, 1))

        self.out = nn.Sigmoid()

    def forward(self, x):
        x1 = self.deeplab(x)['out']
        out = self.out(x1)
        return out

### Inference
- Our data: 70 tiles from Cantabria, 15000x15000pixels each
- We are going to use the model as a moving camera that slides over the large tile.

In [None]:
def predict_export(model = Deeplabv3(), model_path=None, input_img_path=None,output_mask_dir=None, final_mask_name = None, simple_inference_mode = True):
  """
  - Perform inference on images, and export the prediction masks.
  Args:
      model (Deeplabv3 object)
      model_path (str): path to Deeplabv3 model weights
      input_img_path (str): path to input images
      output_mask_dir (str): dir to save the prediction mask
      final_mask_name (str): name for the prediction mask, remember to specify the format (.png, .jpg, etc.)
      simple_inference_mode (bool): True by default. Do not set it to false unless you are familiar with our project.
  Returns:
      None (This function will export the prediction masks to the specified directory)
  """

  # Initialize the model
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  # Our models are stored in a .pt file as a dictionary, the weights can be found under the "model_state_dict" key. Change this section if using a different format.
  model.load_state_dict(torch.load(model_path, map_location=device)["model_state_dict"])
  model.to(device).eval()


  if simple_inference_mode:
    # Input image
    input_img = cv2.imread(input_img_path)/255.0
    original_size = input_img.shape[:2]
    input_img = cv2.resize(input_img,(512,512))
    input_tensor = torch.tensor(np.transpose(input_img, (2,0,1)),dtype=torch.float32).unsqueeze(axis=0).to(device)

    # Inference
    output = model(input_tensor)
    output = np.transpose(output.cpu().detach().numpy()[-1], (1, 2, 0))
    # Final mask
    final_output = np.expand_dims(cv2.resize(output, original_size, interpolation=cv2.INTER_LINEAR), axis=-1)
    final_output = (np.where(final_output>0.5,1,0)*255).astype(np.uint8)
  else:
    input_img = cv2.imread(input_img_path)
    # Calculate the x and y difference for padding
    window_size = 1000 # Size of the sliding window
    dif_y = (np.round(input_img.shape[0]/window_size)*window_size-input_img.shape[0]).astype(int)
    dif_x = (np.round(input_img.shape[1]/window_size)*window_size-input_img.shape[1]).astype(int)

    # Make a grid
    x_list = np.array([i for i in range(np.round(input_img.shape[0]/window_size).astype(int))])
    y_list = np.array([i for i in range(np.round(input_img.shape[1]/window_size).astype(int))])
    X, Y = np.meshgrid(x_list,y_list)
    XY = np.stack((X, Y), axis=-1).flatten().reshape(-1, 2)

    # Create a bigger matrix to place the input image in the center
    padding = 256
    matrix_ones = np.ones((input_img.shape[0]+padding*2+dif_y, input_img.shape[1]+padding*2+dif_x, 3))*255.0  # Create a "ones" matrix with a bigger size than the input image (-256, 256)
    matrix_ones[padding:padding+input_img.shape[0], padding:padding+input_img.shape[1], :] = input_img  # Place the input image in the center of the matrix
    matrix_zeros_output_list = []

    # Inference setup: For each tile, perform 5 inferences, 4 at the corners, 1 at the center
    q_list = [[1,1,1,1],[0,0,0,0],[2,2,0,0],[0,0,2,2],[2,2,2,2]]

    for q_idx, q in enumerate(q_list):
      matrix_zeros_output = np.zeros((input_img.shape[0]+padding*2+dif_y, input_img.shape[1]+padding*2+dif_x, 1)) # Create a zeros matrix with a bigger size than the input image (-256, 256)
      for idx, (x0, y0) in enumerate(XY):
        img_pos = np.array([x0, y0])

        x_start = img_pos[0]*window_size + q_list[q_idx][0]*padding
        x_end = (img_pos[0]+1)*window_size + q_list[q_idx][1]*padding
        y_start = img_pos[1]*window_size + q_list[q_idx][2]*padding
        y_end = (img_pos[1]+1)*window_size + q_list[q_idx][3]*padding

        # Print progress
        if idx%50==0:
          print(f"Processing image position: {idx+1}/{XY.shape[0]} - ({x0}, {y0})")
        torch.cuda.empty_cache()
        gc.collect()

        input_img_slice = matrix_ones[x_start:x_end, y_start:y_end,:]
        torch.cuda.empty_cache()
        gc.collect()
        input_img_slice = cv2.resize(input_img_slice, (512, 512))/255.0
        # Transpose and convert to tensor
        input_img_slice = torch.tensor(np.transpose(input_img_slice, (2, 0, 1)), dtype=torch.float32)
        # Add batch dimension and move to device
        input_img_slice = input_img_slice.unsqueeze(0).to(device)
        # Prediction
        output = model(input_img_slice)
        output = np.transpose(output.cpu().detach().numpy()[-1], (1, 2, 0))
        output = np.expand_dims(cv2.resize(output, (window_size, window_size), interpolation=cv2.INTER_LINEAR), axis=-1)
        matrix_zeros_output[x_start:x_end, y_start:y_end,:] += output
      matrix_zeros_output_list.append(matrix_zeros_output)
      torch.cuda.empty_cache()
      gc.collect()

    # normalization
    avg_output = np.sum(np.array(matrix_zeros_output_list),axis=0)
    avg_output[padding:(padding+input_img.shape[0]+dif_x),padding:(padding+input_img.shape[1]+dif_y),:] /= 2
    avg_output[:,padding*2:(input_img.shape[1]+dif_y),:] *=(2/3)
    avg_output[padding*2:(input_img.shape[0]+dif_x),:,:] *=(2/3)
    avg_output[padding*2:(input_img.shape[0]+dif_x),padding*2:(input_img.shape[1]+dif_y),:] *=(9/10)

    # Final mask
    final_output = avg_output[padding:(padding+input_img.shape[0]), padding:(padding+input_img.shape[1]), :]
    max = final_output.max()
    final_output = (final_output/max*255).astype(np.uint8)
    torch.cuda.empty_cache()
    gc.collect()


  # Export
  cv2.imwrite(Path(output_mask_dir) / final_mask_name,final_output)
  torch.cuda.empty_cache()
  gc.collect()


In [8]:
# setup
input_img_dir = "/content/drive/MyDrive/ML/Cantabria/Cantabria_3000x3000"
model_path_list = glob.glob("/content/drive/MyDrive/ML/Weights/Loveda_Jordan_Models/*")
output_mask_dir = "/content/drive/MyDrive/ML/Masks/Loveda_multi_mask/13"

In [None]:
epoch_list = [13,18,32]
for model_path in model_path_list:
  epoch = model_path.split("_")[-1].split(".")[0]
  output_mask_path = output_mask_dir + "/" + epoch
  for root,_,filenames in os.walk(input_img_dir):
      for filename in filenames:
        if filename.endswith(".png"):
          print(f"image: {filename}")
          predict_export(model = Deeplabv3(),
              model_path = model_path,
              input_img_path=os.path.join(input_img_dir,filename),
              output_mask_dir=output_mask_dir,
              final_mask_name = filename.split(".")[0] + ".tif",
              simple_inference_mode = False)

### Georeferencing

In [None]:
mask_path_list = glob.glob("/content/drive/MyDrive/ML/Masks/Loveda_multi_mask/final/*.tif")
pgw_path_list = glob.glob("/content/drive/MyDrive/ML/Cantabria/Cantabria_3000x3000/*.pgw")
def get_transform_from_pgw(pgw_path):
    with open(pgw_path, 'r') as f:
        lines = f.readlines()
        pixel_size_x = float(lines[0].strip())
        rotation_x = float(lines[1].strip())
        rotation_y = float(lines[2].strip())
        pixel_size_y = float(lines[3].strip())
        top_left_x = float(lines[4].strip())
        top_left_y = float(lines[5].strip())
    return (top_left_x, pixel_size_x, rotation_x, top_left_y, rotation_y, pixel_size_y)

for idx, pgw_path in enumerate(pgw_path_list):
  gt = get_transform_from_pgw(pgw_path)
  # Set georeferenced info
  ds2 = gdal.Open(mask_path_list[idx])
  ds2.SetGeoTransform(gt)
  ds2.FlushCache()
  ds2 = None  # Close the dataset to save changes
  torch.cuda.empty_cache()

gc.collect()

### Conversion to vector

In [None]:
import rasterio
import numpy as np
from shapely.geometry import Polygon, mapping, LinearRing
import geojson
from rasterio.transform import Affine
from pathlib import Path
from shapely.ops import unary_union
from shapely.prepared import prep
os.environ["OPENCV_IO_MAX_IMAGE_PIXELS"] = pow(2,40).__str__()
import cv2 # import after setting OPENCV_IO_MAX_IMAGE_PIXELS

def find_contours(img_tif_path,output_path):
  idx = img_tif_path.split("_")[-1].split(".")[0]
  epoch = img_tif_path.split("_")[-3]
  # Load tiff
  with rasterio.open(img_tif_path) as src:
    mask = src.read(1)  # read first band
    transform = src.transform  # Affine transform: maps pixel -> world coords

  # Check if mask is binary (0 or 1)
  binary_mask = (mask > 0).astype(np.uint8) * 255  # values 0 or 255

  # Find contours
  contours, hierarchy = cv2.findContours(binary_mask, cv2.RETR_CCOMP , cv2.CHAIN_APPROX_SIMPLE)

  polygons = []
  used_as_hole = set()
  for i, contour in enumerate(contours):
    if hierarchy[0][i][3] == -1:  # No parent: it's an outer contour
        contour = contour.squeeze()  # shape: (N, 2)
        if len(contour.shape) != 2 or contour.shape[0] < 3:
            continue
        shell = [rasterio.transform.xy(transform, y, x, offset="center") for x,y in contour]
        shell_poly = Polygon(shell)
        prepared_shell = prep(shell_poly)

        holes = []
        for j, child in enumerate(contours):
            if hierarchy[0][j][3] == i:  # Child of contour i
                child = child.squeeze()
                hole_coords = [rasterio.transform.xy(transform, y, x, offset="center") for x,y in child]
                hole_ring = LinearRing(hole_coords)

                if prepared_shell.contains(Polygon(hole_ring)):
                    holes.append(hole_coords)
                    used_as_hole.add(j)

        polygon = Polygon(shell, holes)
        polygons.append(polygon)





  gdf = gpd.GeoDataFrame({"geometry": polygons},crs=25830)
  gdf.to_crs(epsg=4326, inplace=True)  # Set the coordinate reference system to WGS84
  gdf.to_csv(os.path.join(output_path,f"deeplabv3_loveda_epoch{epoch}_output{idx}.csv"))

coordinates_path = "/content/drive/MyDrive/ML/Coordinates/loveda_final"
for mask_path in mask_path_list:
    find_contours(mask_path,coordinates_path)

### Unify tables

In [None]:
root = "/content/drive/MyDrive/ML/Coordinates/loveda_final"
path_list = glob.glob(str(Path(root)/"*"),recursive=True)
name_list = [path.split("output")[-1].split(".")[0] for path in path_list]
df_list = []
for idx,path in enumerate(path_list):
  new_df = pd.read_csv(path)
  new_df.iloc[:,0]=np.int8(name_list[idx])
  df_list.append(new_df)
df_final = pd.concat(df_list,ignore_index=True)
df_final.rename(columns={"Unnamed: 0": "id"}, inplace=True)
df_final.head()

In [None]:
df_final.to_csv(Path(root)/"final.csv")
print(Path(root)/"loveda_final.csv")