In [None]:
import os
import json
import geojson
import pandas as pd
from tqdm import tqdm
from osgeo import gdal
from sklearn.model_selection import train_test_split
import rasterio
import geopandas as gpd
from shapely.geometry import box
from rasterio.plot import show
import matplotlib.pyplot as plt
import warnings
import re

# custom functions
import sys
sys.path.append('../')
from utils.functions import grab_certain_file
from utils.create_jsons import geojson_to_json_pix_coords
# TODO delete functions.py in data_preperation folder. Use main utils instead.

In [None]:
### ARGS function ###

path = "../Satellite"
train_path = os.path.join(path, "train")
test_path = os.path.join(path, "test")
val_path = os.path.join(path, "val")
geojson_path = os.path.join(path, "split_geojsons")
small_tiles_path = os.path.join(path, "small_tiles")

In [None]:
# Divide dataset and set a random seed for reproducibility of the splits for next script

RANDOM_SEED = 560

# Create JSONs for Detectron2 NO test set
#nso_images = grab_certain_file(".tif", small_tiles_path)
#train, val = train_test_split(nso_images, test_size=0.2, random_state=RANDOM_SEED)

# Create JSONs for Detectron2 WITH test set
# Find this function in utils --> functions.py
images = grab_certain_file(".tif", small_tiles_path)
train, test = train_test_split(images, test_size=0.20, random_state=RANDOM_SEED)
train, val = train_test_split(train, test_size=0.25, random_state=RANDOM_SEED)

In [None]:
# Convert the geojson files to pixel coordinates, this is needed for detectron2 to be able to read the locations

def geojson_to_json_pix_coords(dataset_split, small_tiles_path, geojson_path, dataset_path):
    """
    Converts geojson annotations to JSON format with pixel coordinates.

    Args:
        dataset_split (list): List of image files in the dataset split: train, test, or val.
        small_tiles_path (str): Path to the directory containing the small tiles (.tif).
        geojson_path (str): Path to the directory containing the geojson files of the annotations.
        dataset_path (str): Path to the dataset's train, val, or test directories.

    Returns:
        None

    Description:
        This function iterates over each image in the dataset split and converts the corresponding geojson
        annotations to JSON format, with pixel coordinates calculated using GDAL. It creates a dictionary
        containing image file information and a regions dictionary storing the asset footprints with their
        respective shape attributes. The resulting JSON file is saved as "nso_with_empty_annotations.json" in the dataset path.
        Images with no annotation have "regions= {}"
    """

    # Create an empty dictionary to store the training/test/val set of annotations and their pixel coordinates
    dataset_dict = {}

    # Loop over each image in the dataset split
    for file in tqdm(dataset_split, desc=f"Creating JSONs for Detectron2 on {dataset_path}", ncols=150, bar_format="{l_bar}{bar:10}{r_bar}"):
        file_path = os.path.join(small_tiles_path, file)
        img_id = file.split(".tif")[0]
        geojson_image = os.path.join(geojson_path, f"{img_id}.geojson")

        try:
            # Not all tiles have annotations, thus:
            if os.path.exists(geojson_image):

                # Load the geojson
                with open(geojson_image) as f:
                    gj = json.load(f)

                # Create a dictionary to store the regions (annotations spatial features) for the image
                regions = {}
                num_features = len(gj["features"])

                # Open the image with gdal to get pixel size and origin
                gdal_image = gdal.Open(file_path)

                # Get the pixel width and height and the origin coordinates
                pixel_width, pixel_height = gdal_image.GetGeoTransform()[1], gdal_image.GetGeoTransform()[5]
                originX, originY = gdal_image.GetGeoTransform()[0], gdal_image.GetGeoTransform()[3]

                # Loop over each feature in the image
                for i in range(num_features):

                    # Get the polygon points for the feature
                    points = gj["features"][i]["geometry"]["coordinates"][0]

                    # Save asset type in the dictionary if it exists, else use a default or skip
                    asset_type = gj["features"][i]["properties"].get("type", "unknown")

                    # If there is only one point, unwrap it
                    if len(points) == 1:
                        points = points[0]

                    # Empty lists to store pixel coordinates
                    all_points_x, all_points_y = [], []

                    # Convert the lat/long points to pixel coordinates by subtracting origin
                    for j in range(len(points)):
                        all_points_x.append(int(round((points[j][0] - originX) / pixel_width)))
                        all_points_y.append(int(round((points[j][1] - originY) / pixel_height)))

                    # Create a dictionary to store the feature footprint
                    regions[str(i)] = {
                        "shape_attributes": {
                            "name": "polygon",
                            "all_points_x": all_points_x,
                            "all_points_y": all_points_y,
                            "category": 0
                        },
                        "region_attributes": {
                            "type": asset_type
                        }
                    }

                dictionary = {
                    "file_ref": '',
                    "size": os.path.getsize(file_path),
                    "filename": file.replace(".tif", ".png"),
                    "base64_img_data": '',
                    "file_attributes": {},
                    "regions": regions,
                    "origin_x": originX,
                    "origin_y": originY
                }
                dataset_dict[file.replace(".tif", ".png")] = dictionary
            else:
                # region is empty
                gdal_image = gdal.Open(file_path)
                pixel_width, pixel_height = gdal_image.GetGeoTransform()[1], gdal_image.GetGeoTransform()[5]
                originX, originY = gdal_image.GetGeoTransform()[0], gdal_image.GetGeoTransform()[3]

                dictionary = {
                    "file_ref": '',
                    "size": os.path.getsize(file_path),
                    "filename": file.replace(".tif", ".png"),
                    "base64_img_data": '',
                    "file_attributes": {},
                    "regions": {},
                    "origin_x": originX,
                    "origin_y": originY
                }
                dataset_dict[file.replace(".tif", ".png")] = dictionary

        except Exception as e:
            print(f"Error processing file {file}: {e}")

    jsons_path = os.path.join(dataset_path, "nso_with_empty_annotations.json")
    with open(jsons_path, "w") as f:
        json.dump(dataset_dict, f, indent=2)
    return None

# Example usage:
geojson_to_json_pix_coords(train, small_tiles_path, geojson_path, train_path)
geojson_to_json_pix_coords(test, small_tiles_path, geojson_path, test_path)
geojson_to_json_pix_coords(val, small_tiles_path, geojson_path, val_path)


In [None]:
# Create single via_region_data training dataset => can be skipped if only one .json file.=>But then change file name

for d in ["train", "test", "val"]:
    jsons = [os.path.join(path, d, "nso_with_empty_annotations.json")]
    result = {}
    for file in jsons:
        with open(file, "r") as f:
            loaded = json.load(f)
            
        #https://realpython.com/iterate-through-dictionary-python/
        for key, value in loaded.items():
            result[key] = value
    via_region_p = os.path.join(path, d, "via_region_data_with_empty_annotations.json")
    with open(via_region_p, "w") as file:
        json.dump(result, file)
        
    print(f"Done creating JSONs {d}")

In [None]:
# Check is the regions are well writen

train = "../Satellite/train/via_region_data_with_empty_annotations.json"
val ="../Satellite/val/via_region_data_with_empty_annotations.json"
test = "../Satellite/test/via_region_data_with_empty_annotations.json"
pths = [train, val, test]

dfs = []

for path in pths:
    df = pd.read_json(path, orient='index')
    dfs.append(df)

train_df = dfs[0]
val_df = dfs[1]
test_df = dfs[2]


In [None]:
###Prints one small tile (Valencia) to see what it looks like

def check_tile_metadata(tile_path):
    with rasterio.open(tile_path) as src:
        print(f"CRS: {src.crs}")
        print(f"Bounds: {src.bounds}")
        print(f"Resolution: {src.res}")
        print(f"Number of bands: {src.count}")
        
        # Plotting the tile for visual inspection
        plt.figure(figsize=(8, 8))
        show(src, title="Small Tile Inspection")
        plt.show()

# Example usage
check_tile_metadata('../Satellite/small_tiles_test/3_Valencia_0_0.tif')

In [None]:
###Prints one big tile (Valencia) with annotations boundaries and the labels of the classes 


def plot_raster_with_corrected_annotations(raster_path, annotations_path, label_column):
    with rasterio.open(raster_path) as src:
        #raster_crs = 'EPSG:3857' If you get an error caused by rasterio force the coordinate system
        raster_crs = src.crs
        annotations = gpd.read_file(annotations_path)
        
        # Check if annotation CRS matches raster CRS and transform if not
        if annotations.crs != raster_crs:
            annotations = annotations.to_crs(raster_crs)
        
        # Calculating centroids for placing labels
        annotations['centroid'] = annotations.geometry.centroid
        centroids = annotations.set_geometry('centroid')
        
        fig, ax = plt.subplots(figsize=(10, 10))
        show(src, ax=ax, title="Tile with Corrected Annotations")
        annotations.plot(ax=ax, facecolor='none', edgecolor='red', linewidth=2)
        
        # Adding labels
        for idx, row in centroids.iterrows():
            ax.annotate(text=row[label_column], xy=(row['centroid'].x, row['centroid'].y),
                        horizontalalignment='center', fontsize=9, color='white')
        
        # Display CRS and bounds information
        print(f"Raster CRS: {raster_crs}")
        print(f"Annotations CRS: {annotations.crs}")
        print(f"Raster bounds: {src.bounds}")
        print(f"Annotations bounds: {annotations.total_bounds}")
        
        plt.show()

#Plot 
plot_raster_with_corrected_annotations('../Satellite/big_tiles_test/Valencia.tif', '../Satellite/geojsons_test/Valencia.geojson', 'type')

In [None]:
### Shows one big tile split into smaller tiles to show how tiling works 

def parse_coordinates(filename):
    """Extract numerical parts from the filename for sorting."""
    numbers = re.findall(r'\d+', filename)
    if numbers:
        return tuple(map(int, numbers))  # Convert strings to integers
    return (0, 0)

def check_all_valencia_tiles(tiles_directory):
    # List all files in the directory and sort them based on their numerical coordinates
    tile_files = [f for f in os.listdir(tiles_directory) if 'Valencia' in f and f.endswith('.tif')]
    tile_files.sort(key=parse_coordinates)  # Sort files based on numerical coordinates
    
    num_tiles = len(tile_files)
    cols = 7  # Columns depend on size of the image 
    rows = (num_tiles + cols - 1) // cols
    fig, axs = plt.subplots(rows, cols, figsize=(cols * 5, rows * 5))
    axs = axs.flatten()
    
    for idx, tile_filename in enumerate(tile_files):
        tile_path = os.path.join(tiles_directory, tile_filename)
        with rasterio.open(tile_path) as src:
            show(src, ax=axs[idx], title=tile_filename)

    for ax in axs[len(tile_files):]:  # Ensure you hide exactly the unused axes
        ax.set_visible(False)

    plt.tight_layout()
    plt.show()

tiles_directory = '../Satellite/small_tiles_test'
check_all_valencia_tiles(tiles_directory)

In [None]:
###Create two dataframes, one contains of all the tiles with annotation and one of all the tiles without annotation


def check_annotations_in_tiles(tiles_directory, annotations_directory):
    annotation_files = [f for f in os.listdir(annotations_directory) if f.endswith('.geojson')]
    tiles_with_annotations = []
    tiles_without_annotations = []
    
     # Suppress the CRS mismatch warning
    warnings.filterwarnings("ignore", message="CRS mismatch between the CRS of left geometries and the CRS of right geometries.")


    # Process each tile
    for tile_filename in os.listdir(tiles_directory):
        if tile_filename.endswith('.tif'):
            tile_path = os.path.join(tiles_directory, tile_filename)
            with rasterio.open(tile_path) as tile:
                annotations_found = False

                # Loop through each annotation file
                for annotation_filename in annotation_files:
                    annotations_path = os.path.join(annotations_directory, annotation_filename)
                    annotations = gpd.read_file(annotations_path)
                    
                    #print(str(annotations.crs))
                    #print(str(tile.crs))
                    if annotations.crs != tile.crs:
                        if tile.crs == 'LOCAL_CS["WGS 84 / Pseudo-Mercator"]': 
                            value = 'EPSG:3857'
                        else: 
                            value = tile.crs
                        transformed_annotations = annotations.to_crs(value)
                    else:
                        transformed_annotations = annotations

                    tile_geom = box(*tile.bounds)
                    tile_gdf = gpd.GeoDataFrame([1], geometry=[tile_geom], crs=tile.crs)
                    intersection = gpd.overlay(transformed_annotations, tile_gdf, how='intersection')

                    if not intersection.empty:
                        annotations_found = True
                        break  # Stop checking other annotation files if one match is found

                if annotations_found:
                    tiles_with_annotations.append(tile_filename)
                else:
                    tiles_without_annotations.append(tile_filename)

    # Create DataFrames from the lists
    df_annot = pd.DataFrame(tiles_with_annotations, columns=['Tile_Name'])
    df_empty_annot = pd.DataFrame(tiles_without_annotations, columns=['Tile_Name'])

    # Optionally, print the DataFrames
    print("Tiles with annotations:")
    print(df_annot)
    print("Tiles without annotations:")
    print(df_empty_annot)

    return df_annot, df_empty_annot

# Define your paths
tiles_directory = '../Satellite/small_tiles'
annotations_directory = '../Satellite/geojsons'

# Run the function and store DataFrames
df_annot, df_empty_annot = check_annotations_in_tiles(tiles_directory, annotations_directory)

# Display the first few rows of each DataFrame

print("DataFrame with no annotations:")
print(df_empty_annot.head())

print("\nDataFrame with annotations:")
print(df_annot.head())




In [None]:
### Use this in case of class imbalance / data scarcity and creates undersampling
### I did not use this but would definetly try it in future research

# ### Creates three different sizes of training batches
# train_path = "../Satellite/train"

# from sklearn.model_selection import train_test_split
# import os
# import pandas as pd

# test_sizes = [0.25, 0.5, 0.75]
# train_dic = {}

# for perc in test_sizes:
#     train_name = f"train_{int((1 - perc) * 100)}"
#     test_name = f"test_{int((perc) * 100)}"
    
#     # Split the DataFrame into training and test sets
#     train_set, test_set = train_test_split(df_empty_annot, test_size=perc, random_state=42)
    
#     # Reset index before concatenation
#     train_set.reset_index(drop=True, inplace=True)
#     df_annot_reset = df_annot.reset_index(drop=True)
    
#     train_dic[train_name] = pd.concat([df_annot_reset, train_set], ignore_index=True)
    
#     # Convert to JSON
#     data = train_dic[train_name].to_json(orient='index')
#     with open(os.path.join(train_path, f"via_region_data_{train_name}_empty_annotations.json"), "w") as outfile:
#         outfile.write(data)

# print(f"train_25: {len(train_dic['train_25'])}, train_50 {len(train_dic['train_50'])}, train_75: {len(train_dic['train_75'])}")
 

In [None]:
###Checks for data set distribution 

# Define the directory where your JSON files are located
directory = '../Satellite/'

# Define the categories and initialize a dictionary to store counts
categories = ['container', 'oil_gas', 'raw', 'refinery', 'roro', 'warehouse']
counts = {category: 0 for category in categories}

# Function to count entries in a JSON file
def count_entries(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
        return len(data)
    
# Loop through directories 
subdirs = ['train', 'test', 'val']
for subdir in subdirs:
    subdir_path = os.path.join(directory, subdir)
    if os.path.exists(subdir_path):
        print(f"\nCounts for {subdir.capitalize()}:")
        for category in categories:
            file_name = f'via_region_data_{category}_annotations.json'
            file_path = os.path.join(subdir_path, file_name)
            if os.path.exists(file_path):
                counts[category] = count_entries(file_path)
            else:
                counts[category] = 0
                print(f"File not found: {file_name}")
        for category, count in counts.items():
            print(f"{category.capitalize()}: {count} entries")
    else:
        print(f"Subdirectory not found: {subdir_path}")
