In [115]:
import os
from tqdm import tqdm
from osgeo import gdal
import rasterio
import numpy as np
import cv2
from shapely.geometry import Polygon
import geopandas as gpd
from skimage.measure import find_contours as fc
from shapely.geometry import Polygon, LineString
import geopandas as gpd


from mmengine import Config
from mmseg.apis import init_model, inference_model, show_result_pyplot
import matplotlib.pyplot as plt

In [158]:
class RSI_inferencer:
    def __init__(self, input_path: str, output_path: str, input_file_extension: str = '.tif'):
        """Initializes the RSI_inferencer instance. 
        RSI_inferences is used for make predictions of test/valid data and geoferencing them 
        
        Args:
            input_path (str): The directory path where input images are located.
            output_path (str): The directory path where output images will be saved.
            input_file_extension (str, optional): The file extension of input images. Defaults to '.tif'.
        """
        
        self.input_path = input_path
        self.output_path = output_path
        self.input_file_extension = input_file_extension

        self.input_images = os.listdir(self.input_path)
        self.input_images = [x for x in self.input_images if input_file_extension == x[-len(input_file_extension):]]
        self.input_images.sort()

    def _inference_data(self, model, path_of_an_image):
        """Performs inference on a single image using a specified model.
        
        Args:
            model: The model to use for inference, mm_segmentation model is expected
            path_of_an_image (str): The file path of the image to perform inference on.
            
        Returns:
            tuple: A tuple containing the original image array and the inference results array.
        """
        
        ds = gdal.Open(path_of_an_image)
        new_rsi_image = np.array(ds.ReadAsArray()).astype(np.float32)
        new_rsi_image = new_rsi_image.transpose(1,2,0)

        results = inference_model(model, new_rsi_image)
        results = results.pred_sem_seg.data[0].cpu().numpy()
        
        return new_rsi_image, results
    
    def inference_input_data(self, model):
        """Performs inference on all input images using a specified model.
        
        Args:
            model: The model to use for inference.
        """

        for single_image_name in tqdm(self.input_images):
            input_file = os.path.join(self.input_path, single_image_name)
            
            _, results = self._inference_data(model, input_file)
            
            saving_path = os.path.join(self.output_path, single_image_name)
            output_file = self.write_geotiff(results, input_file, saving_path)

            
    def write_geotiff(self, out, raster_file, output_file):
        """Writes a georeferenced TIFF file using the output from the segmentation model.
        
        Args:
            out (ndarray): The output array from the segmentation model.
            raster_file (str): The file path of the original raster file.
            output_file (str): The file path where the georeferenced TIFF will be saved.
            
        """
        
        with rasterio.open(raster_file) as src:    
            ras_meta = src.profile
            ras_meta.update(count=1)
            with rasterio.open(output_file, 'w', **ras_meta) as dst:   
                dst.write(np.reshape(out, (1,) + out.shape)) 
        
class Vectorizer:
    """
    This class is used for vectorizing predicted segmentation masks
    """
    
    def _load_image(self, image_name: str) -> tuple:
        """Loads an image using rasterio and retrieves its transform and CRS (Coordinate Reference System).
        
        Args:
            image_name (str): The file path of the image to load.
        Returns:
            tuple: A tuple containing the image array, transform, and CRS.
        """
        
        with rasterio.open(image_name) as src:
            image = src.read(1) 
            transform = src.transform
            crs = src.crs

        return image, transform, crs

    def _apply_transforms(self, all_cons, transform) -> list:
        """Applies affine transformations to contours.
        
        Args:
            all_cons: The contours to transform.
            transform: The affine transformation to apply.
            
        Returns:
            list: A list of transformed geometries.
        """
        geoms = [Polygon(c) for c in all_cons]
        geoms_transformed = [shapely.affinity.affine_transform(geom, [transform.b, transform.a, transform.e, transform.d, transform.xoff, transform.yoff]) for geom in geoms]

        return geoms_transformed

    def get_geopandas(self, crs, input_geom):
        """Creates a GeoDataFrame from geometries and a specified CRS.
        
        Args:
            crs: The Coordinate Reference System to use.
            input_geom: The geometries to include in the GeoDataFrame.
            
        Returns:
            GeoDataFrame: A GeoDataFrame containing the specified geometries and CRS.
        """
        
        return gpd.GeoDataFrame(crs=crs, geometry=input_geom)

    def _save_polygons(self, gdf, output_file: str): 
        """Saves a GeoDataFrame of polygons to a GeoJSON file.
        
        Args:
            gdf (GeoDataFrame): The GeoDataFrame containing polygons.
            output_file (str): The file path where the GeoJSON will be saved.
        """
        gdf.to_file(output_file, driver='GeoJSON')

    def _indentify_inner_polygons(self, gdf):
        """Identifies inner polygons within a GeoDataFrame.
        
        Args:
            gdf (GeoDataFrame): The GeoDataFrame to analyze.
            
        Returns:
            tuple: A tuple containing two GeoDataFrames - one for outer polygons and one for inner polygons.
        """

        gdf_inner_polygons = gpd.GeoDataFrame().reindex_like(gdf)
        
        
        for index, polygon in gdf.iterrows():
            is_within_other = gdf.geometry.apply(lambda x: polygon.geometry.within(x) if x != polygon.geometry else False)

            if is_within_other.any():
                row_to_move = gdf.loc[index]    
                gdf = gdf.drop(index)
                gdf_inner_polygons.loc[len(gdf_inner_polygons.index)] = row_to_move
                
        return gdf, gdf_inner_polygons

    def clean_inner_polygons(self, gdf):
        """Removes inner polygons from a GeoDataFrame by performing a geometric difference operation.
        
        Args:
            gdf (GeoDataFrame): The GeoDataFrame to clean.
            
        Returns:
            GeoDataFrame: A cleaned GeoDataFrame with inner polygons removed.
        """
        
        gdf, innver_poly =  self._indentify_inner_polygons(gdf)

        combined_geometry = gdf.unary_union
        combined_geometry_invert = innver_poly.unary_union

        combined_geometry = combined_geometry.difference(combined_geometry_invert)
        gdf = gpd.GeoDataFrame(geometry=[poly for poly in combined_geometry.geoms], crs = gdf.crs)
        
        return gdf
        
    def get_polygons(self, image_name: str, output_path = 'output.geojson', threshold: float = 0.5):
         """Converts an image to polygons, cleans the polygons, and saves them to a GeoJSON file.
        
        Args:
            image_name (str): The file path of the image to convert.
            output_path (str, optional): The file path where the GeoJSON will be saved. Defaults to 'output.geojson'.
            threshold (float, optional): The threshold to use when identifying contours. Defaults to 0.5.
        """
        
        image, transform, crs = self._load_image(image_name)
        all_cons = fc(image, threshold)

        geoms_transformed = self._apply_transforms(all_cons, transform)
        gdf = self.get_geopandas(crs, geoms_transformed) 
        print('created geopandas')
        gdf = self.clean_inner_polygons(gdf)
        print('polygon cleaned')
        self._save_polygons(gdf, output_path)

    

In [117]:
input_dir_path = r"F:\Data For Fujirah Palm Segmentation\Multispectral data\img_dir\test"
output_dir_path = r"C:\Users\202391\mmsegmentation_MMS_latest\mmsegmentation\work_dirs\mask2former_swin-t_8xb2-160k_ade20k-512x512\Prediction"

In [118]:
path_config = r'C:\Users\202391\mmsegmentation_MMS_latest\mmsegmentation\configs\mask2former\mask2former_swin-t_8xb2-160k_ade20k-512x512.py'
path_checkpoint = r"C:\Users\202391\mmsegmentation_MMS_latest\mmsegmentation\work_dirs\mask2former_swin-t_8xb2-160k_ade20k-512x512\best_mIoU_iter_100000.pth"


In [None]:
cfg = Config.fromfile(path_config)
cfg.work_dir = "./work_dirs"
model = init_model(cfg, path_checkpoint, 'cuda:0')

In [18]:
inferencer = RSI_inferencer(input_dir_path, output_dir_path)


In [None]:
inferencer.inference_input_data(model)

# Make mosaic

In [None]:
from osgeo import gdal
import glob
import subprocess

# list all files in directory that match pattern
Pred_List = glob.glob(r"C:\Users\202391\mmsegmentation_MMS_latest\mmsegmentation\work_dirs\mask2former_swin-t_8xb2-160k_ade20k-512x512\Prediction\*.tif")
print(Pred_List)


#Make a virtual mosaic from all TIFF files contained in a directory :

#!gdalbuildvrt mosaic.vrt tiles_512_output/*.tif 
#!gdal_translate -of Gtiff -ot Byte mosaic.vrt Subset_fixed_out.tif

vrt = gdal.BuildVRT("UoS74.vrt", Pred_List)

New_Raster_name="UoS74.tif"

gdal.Translate(New_Raster_name, vrt, xRes = 0.30, yRes = -0.30)
#vrt = None

In [9]:
!gdalbuildvrt mosaic_input.vrt "C:\Users\202391\mmsegmentation_MMS_latest\mmsegmentation\work_dirs\mask2former_swin-t_8xb2-160k_ade20k-512x512\Prediction\*.tif" 
!gdal_translate -of Gtiff -ot Byte mosaic_input.vrt mosaic_input.tif

In [10]:
!gdalbuildvrt mosaic.vrt /media/bolci_ssd/mm_segmentation/multispectral_inferencer/outputs/*.tif 
!gdal_translate -of Gtiff -ot Byte mosaic.vrt Subset_fixed_out.tif

In [159]:
current_path = ""
raster_path = os.path.join(current_path, 'UoS74.tif')
result_path = os.path.join(current_path, 'polygon.geojson')

vectorizer = Vectorizer()
vectorizer.get_polygons(raster_path, result_path)


created geopandas
polygon cleaned
