In [1]:
import numpy as np
import rasterio
from rasterio.merge import merge
from scipy.ndimage import label, binary_erosion
from shapely.geometry import Point, MultiPoint
from shapely.ops import nearest_points, transform
import geopandas as gpd
import pyproj
import glob
from rasterio import features
from geopandas.tools import sjoin
from shapely.geometry import shape
from scipy.ndimage import binary_dilation
from shapely.ops import transform as shapely_transform
from rasterio.transform import from_origin

## Merge yearly forest loss layers

In [2]:
def merge_raster_layers(file_pattern, output_file):
    file_list = glob.glob(file_pattern)
    mosaic_list = []
    out_trans = None

    for fp in file_list:
        with rasterio.open(fp) as src:
            mosaic, transform = merge([src])
            mosaic_list.append(mosaic)
            if out_trans is None:
                out_trans = transform

    # Combine the mosaics into a single array
    final_mosaic = np.concatenate(mosaic_list, axis=0)

    # Convert to binary
    final_mosaic_binary = (final_mosaic > 0).astype(rasterio.uint8)

    # Assuming all bands are identical, just keep the first band
    final_mosaic_binary = final_mosaic_binary[0:1, :, :]

    out_meta = src.meta.copy()
    out_meta.update({
        "height": final_mosaic_binary.shape[1],
        "width": final_mosaic_binary.shape[2],
        "transform": out_trans,
        "count": 1,  # Set band count to 1
        "dtype": 'uint8',  # Set data type to uint8 for binary data
        "nodata": 0  # Set nodata value suitable for uint8 type
    })

    with rasterio.open(output_file, "w", **out_meta) as dest:
        dest.write(final_mosaic_binary)

# Example of usage
merge_raster_layers("bb_spot_20[1][1-8].tif", "bb_spot_merged.tif")

## Update forest mask 

In [3]:
def load_raster_data(file_path):
    with rasterio.open(file_path) as src:
        return src.read(1), src.transform, src.crs

bb_spot_merged, forest_transform, forest_crs = load_raster_data("bb_spot_merged.tif")
s50mask, _, _ = load_raster_data("s50mask.tif")

def update_mask(merged_raster_file, mask_file, output_file):
    with rasterio.open(merged_raster_file) as src:
        merged_raster = src.read(1)  # Read the first band of bb_spot_merged.tif
        crs = src.crs
        transform = src.transform

    with rasterio.open(mask_file) as src:
        mask = src.read(1)  # Read the first band of s50mask.tif

    # Wherever merged_raster is 1, set mask to 0
    updated_mask = np.where(merged_raster == 1, 0, mask)

    # Update metadata for output file
    out_meta = src.meta.copy()
    out_meta.update({
        "dtype": str(updated_mask.dtype),
        "nodata": 0  # Use 0 as the nodata value
    })

    with rasterio.open(output_file, "w", **out_meta) as dest:
        dest.write(updated_mask, 1)

# Update mask
update_mask("bb_spot_merged.tif", "s50mask.tif", "s50mask_upd.tif")


## Forest edge mask generation

In [4]:
def identify_and_filter_edges(bb_spot_merged, s50mask):
    labeled_array, num_features = label(bb_spot_merged)

    edge = np.zeros_like(labeled_array, dtype=np.bool_)
    for i in range(1, num_features + 1):
        object_mask = (labeled_array == i)
        object_border = object_mask & np.logical_not(binary_erosion(object_mask))
        edge = np.logical_or(edge, object_border)

    edge = edge & bb_spot_merged.astype(np.bool_) & (bb_spot_merged > 0)
    labeled_edge, num_edge_features = label(edge)

    forest_edge_mask = np.zeros_like(labeled_edge, dtype=np.bool_)
    for i in range(1, num_edge_features + 1):
        object_mask = (labeled_edge == i).astype(bool)
        if np.any(object_mask & s50mask.astype(bool)):
            forest_edge_mask = np.logical_or(forest_edge_mask, object_mask)

    # Using hardcoded transform and crs values from the loaded raster
    hardcoded_transform = forest_transform
    hardcoded_crs = forest_crs

    # Save the forest_edge_mask as a GeoTIFF using the hardcoded values
    with rasterio.open("forest_edge_mask.tif", 'w', driver='GTiff', height=forest_edge_mask.shape[0],
                       width=forest_edge_mask.shape[1], count=1, dtype=rasterio.uint8, crs=hardcoded_crs, transform=hardcoded_transform) as dst:
        dst.write((forest_edge_mask * 255).astype(rasterio.uint8), 1)

    return forest_edge_mask, edge

forest_edge_mask, edge = identify_and_filter_edges(bb_spot_merged, s50mask)

## Pheromone trap installation

In [5]:
def generate_and_filter_points(forest_edge_mask, s50mask, forest_transform, forest_crs):
    rows, cols = np.where(forest_edge_mask == 1)
    geometries_mask = [Point(forest_transform * (col + 0.5, row + 0.5)) for col, row in zip(cols, rows)]
    gdf_mask = gpd.GeoDataFrame(geometry=geometries_mask)
    gdf_mask.set_crs(forest_crs, inplace=True)

    mask_polygon_features = [shape(shape_val) for shape_val, val in features.shapes(s50mask.astype(np.uint8), transform=forest_transform) if val == 1]

    if not mask_polygon_features:
        raise ValueError("No valid geometries found in s50mask!")

    mask_gdf = gpd.GeoDataFrame({'geometry': mask_polygon_features})
    mask_gdf.crs = forest_crs

    gdf_mask_within = sjoin(gdf_mask, mask_gdf, how="inner", op="within")
    all_centroids_mask_filtered = gdf_mask_within['geometry'].values
    
    return all_centroids_mask_filtered

# Generate and filter points
all_centroids_mask_filtered = generate_and_filter_points(forest_edge_mask, s50mask, forest_transform, forest_crs)

# Filter the centroids based on the 50m distance criterion
def determine_utm_zone(centroid):
    zone_number = (int((centroid.x + 180) / 6) % 60) + 1
    if centroid.y > 0:
        return f"EPSG:326{zone_number:02}"  # Northern hemisphere
    else:
        return f"EPSG:327{zone_number:02}"  # Southern hemisphere

def generate_filtered_centroids_shp(edge, forest_transform, forest_crs, all_centroids_mask_filtered, output_file_path):
    """
    Generate a shapefile of filtered centroids based on the provided edge and mask raster data.

    Parameters:
        edge (np.array): A 2D numpy array representing the edge raster.
        forest_transform (Affine): Affine transformation for converting pixel to spatial coordinates.
        forest_crs (CRS): Coordinate reference system for spatial data.
        all_centroids_mask_filtered (list of shapely.Point): List of filtered centroid points.
        output_file_path (str): File path to save the resulting shapefile.
    """
    
    # Generate all_centroids.shp based on the edge
    rows, cols = np.where(edge == 1)
    geometries = [Point(forest_transform * (col + 0.5, row + 0.5)) for col, row in zip(cols, rows)]
    gdf = gpd.GeoDataFrame(geometry=geometries)
    gdf.set_crs(forest_crs, inplace=True)

    # List to hold all centroid coordinates
    all_centroids = [(geometry.x, geometry.y) for geometry in gdf.geometry]

    # Get UTM Zone
    utm_zone = determine_utm_zone(Point(all_centroids[0]))

    # Filter the centroids based on the 50m distance criterion
    project_to_utm = pyproj.Transformer.from_crs(forest_crs, utm_zone, always_xy=True).transform
    project_to_wgs84 = pyproj.Transformer.from_crs(utm_zone, forest_crs, always_xy=True).transform
    all_centroids_mask_utm = [transform(project_to_utm, Point(c)) for c in all_centroids_mask_filtered]

    # Filter based on distance criterion
    final_centroids_mask_utm = []
    for point in all_centroids_mask_utm:
        nearest = nearest_points(point, MultiPoint(final_centroids_mask_utm)) if final_centroids_mask_utm else [None, Point(9999, 9999)]
        if point.distance(nearest[1]) > 50:
            final_centroids_mask_utm.append(point)
    final_centroids_mask = [transform(project_to_wgs84, point) for point in final_centroids_mask_utm]

    # Perform dilation on the final_centroids_mask
    mask_shape = edge.shape
    final_mask = np.zeros(mask_shape, dtype=bool)

    for point in final_centroids_mask_utm:
        col, row = ~forest_transform * point.coords[0]
        if 0 <= row < mask_shape[0] and 0 <= col < mask_shape[1]:
            final_mask[int(row), int(col)] = True

    # Apply binary dilation and identify isolated regions
    dilated_mask = binary_dilation(final_mask, structure=np.ones((3,3)))
    
    # [NOTE]: Replace `s50mask` with your actual mask array here. 
    isolated_mask = dilated_mask & np.logical_not(s50mask)
    isolated_rows, isolated_cols = np.where(isolated_mask)
    isolated_points = [forest_transform * (col + 0.5, row + 0.5) for col, row in zip(isolated_cols, isolated_rows)]
    isolated_geoms = [Point(p) for p in isolated_points]

    # Drop isolated points from final_centroids_mask
    final_centroids_mask = [point for point in final_centroids_mask if point not in isolated_geoms]

    # Save the filtered centroids to a shapefile
    geometry_mask = [Point(point.x, point.y) for point in final_centroids_mask]
    gdf_filtered_mask = gpd.GeoDataFrame(geometry=geometry_mask)
    gdf_filtered_mask.set_crs(forest_crs, inplace=True)
    #gdf_filtered_mask.to_file(output_file_path)

generate_filtered_centroids_shp(edge, forest_transform, forest_crs, all_centroids_mask_filtered, "filtered_centroids_mask_upd.shp")

def remove_single_points(s50mask_upd_filepath, bb_spot_merged_filepath, gdf_filtered_mask, output_filepath):
    """
    Removes single points based on conditions applied to two raster masks and saves the 
    filtered centroids to a shapefile.

    Parameters:
        s50mask_upd_filepath (str): Filepath to the "s50mask_upd.tif" raster.
        bb_spot_merged_filepath (str): Filepath to the "bb_spot_merged.tif" raster.
        gdf_filtered_mask (GeoDataFrame): GeoDataFrame containing points to be potentially removed.
        output_filepath (str): Filepath to store the resulting shapefile with removed points.
    """

    with rasterio.open(s50mask_upd_filepath) as src:
        s50mask_upd = src.read(1)
        raster_transform = src.transform

    with rasterio.open(bb_spot_merged_filepath) as src_bb:
        bb_mask = src_bb.read(1)

    # Function to check if a point's corresponding pixel and its surroundings meet the conditions
    def should_drop(row, col):
        condition1 = bb_mask[row][col] == 1 and all(
            bb_mask[i][j] == 0 for i in range(row-1, row+2) for j in range(col-1, col+2) if (i, j) != (row, col)
        )

        condition2 = s50mask_upd[row][col] == 0 and all(
            s50mask_upd[i][j] == 1 for i in range(row-1, row+2) for j in range(col-1, col+2) if (i, j) != (row, col)
        )

        return condition1 and condition2

    # Find the centroids that meet the conditions and remove them
    to_drop = []
    for idx, point in gdf_filtered_mask.iterrows():
        col, row = ~raster_transform * (point.geometry.x, point.geometry.y)
        col, row = int(col), int(row)
        if should_drop(row, col):
            to_drop.append(idx)
    gdf_filtered_mask = gdf_filtered_mask.drop(index=to_drop)
    #gdf_filtered_mask.to_file(output_filepath)

gdf_filtered_mask = gpd.read_file("filtered_centroids_mask_upd.shp")
remove_single_points("s50mask_upd.tif", "bb_spot_merged.tif", gdf_filtered_mask, "filtered_centroids_mask_upd_cleaned.shp")


# South feature
def has_south_neighbor_of_value(row, col, raster, value):
    """
    Check if the pixel south to (row, col) in 'raster' has the given 'value'.
    """
    # Check if the south pixel is within the raster bounds
    if row + 1 < raster.shape[0]:
        return raster[row + 1, col] == value
    return False

def filter_centroids_without_south_neighbor(raster_path, input_shp_path, output_shp_path, value):
    """
    Filters out points from a shapefile that have a southern neighbor with a specific value in a raster.

    Parameters:
        raster_path (str): Path to the raster file.
        input_shp_path (str): Path to the input shapefile.
        output_shp_path (str): Path to save the filtered shapefile.
        value (int or float): The value to check in the southern neighbor in the raster.
    """
    with rasterio.open(raster_path) as src:
        raster_data = src.read(1)
        raster_transform = src.transform
    
    gdf = gpd.read_file(input_shp_path)
    
    to_keep = []
    for _, point in gdf.iterrows():
        col, row = ~raster_transform * (point.geometry.x, point.geometry.y)
        col, row = int(col), int(row)
        
        if not has_south_neighbor_of_value(row, col, raster_data, value):
            to_keep.append(True)
        else:
            to_keep.append(False)
    
    gdf_filtered = gdf.loc[to_keep]
    gdf_filtered.to_file(output_shp_path)

filter_centroids_without_south_neighbor(
    "s50mask_upd.tif", 
    "filtered_centroids_mask_upd_cleaned.shp", 
    "pheromone_trap.shp", 
    1
)

  exec(code_obj, self.user_global_ns, self.user_ns)


## Antiattractant installation

In [6]:
# Function to read a raster file
def read_raster(file_path):
    with rasterio.open(file_path) as src:
        return src.read(1), src.profile

# Function to write a raster file
def write_raster(output_path, data, profile):
    profile.update(
        dtype=rasterio.uint8,
        count=1,
        compress='lzw')
    with rasterio.open(output_path, 'w', **profile) as dst:
        dst.write(data.astype(rasterio.uint8), 1)

# Read the updated forest mask and forest loss rasters
forest_mask, forest_profile = read_raster("s50mask_upd.tif")
loss_mask, _ = read_raster("bb_spot_merged.tif")

# Ensure the masks are boolean
forest_mask = forest_mask.astype(bool)
loss_mask = loss_mask.astype(bool)

# Create a dilation of the loss mask
dilated_loss_mask = binary_dilation(loss_mask, iterations=1)

# Find the edge pixels: pixels that are in the dilated loss mask AND in the updated forest mask
edge_pixels = dilated_loss_mask & forest_mask

# Convert the edge pixels mask back to uint8 for raster output
edge_pixels = edge_pixels.astype(np.uint8)

# Write the result
write_raster("forest_edge_buffer.tif", edge_pixels, forest_profile)

print("Buffer creation around the forest loss adjacent areas in the updated forest mask is completed.")

# Read the forest_edge_buffer raster
forest_edge_buffer, buffer_profile = read_raster("forest_edge_buffer.tif")

# Read and merge all bands of bb_spot_merged.tif into a binary mask
with rasterio.open("bb_spot_merged.tif") as src:
    # Read all bands and stack them into a single ndarray (assumes all bands are of the same dimensions)
    bb_spot_merged = src.read().astype(bool)
    # Reduce to a binary mask where if any band has a pixel value of True, it results in True
    bb_spot_merged_combined = np.any(bb_spot_merged, axis=0)

# Make sure we have a boolean array for the forest edge buffer
forest_edge_buffer_bool = forest_edge_buffer.astype(bool)

# Initialize the output mask with the current forest edge buffer
antiattractant_mask = np.copy(forest_edge_buffer_bool)

# Iterate over each pixel, skipping the first row to avoid index errors
for row in range(1, forest_edge_buffer.shape[0]):
    for col in range(forest_edge_buffer.shape[1]):
        # If the pixel in forest_edge_buffer is True and the pixel to the north in bb_spot_merged_combined is also True
        if forest_edge_buffer_bool[row, col] and bb_spot_merged_combined[row-1, col]:
            antiattractant_mask[row, col] = False

# Convert the boolean mask back to an integer type suitable for writing to a TIFF
antiattractant = antiattractant_mask.astype(np.uint8)

# Write the antiattractant mask to a new file
write_raster("antiattractant.tif", antiattractant, buffer_profile)

print("Filtering of forest_edge_buffer complete, resulting in antiattractant.tif.")

# Counting pixels with value of 1
num_pixels = np.sum(antiattractant == 1)
print(f"Number of pixels with value 1 in the final raster: {num_pixels}")


Buffer creation around the forest loss adjacent areas in the updated forest mask is completed.
Filtering of forest_edge_buffer complete, resulting in antiattractant.tif.
Number of pixels with value 1 in the final raster: 641
