>- ### Import lib

In [2]:
import geopandas as gpd
import matplotlib.pyplot as plt


from matplotlib.colors import ListedColormap # for define a categorical colormap

import contextily as cx

import rasterio
from pathlib import Path

import folium

>- define path

In [3]:
root_dir = Path("../../")   # Adjust this path to your project root
src_dir = root_dir / "src"  # Path to your source directory

>- filter nessary code

In [6]:
# ORIGINAL_CRS = ""

# geo_tiff_path = root_dir / "temp" / "odm_orthophoto" / "odm_orthophoto.tif"
# # if the file exists,
# if geo_tiff_path.exists():
#     print(f"GeoTIFF file found: {geo_tiff_path}")
# else:
#     print(f"GeoTIFF file not found: {geo_tiff_path}")
    
# with rasterio.open(geo_tiff_path) as src:
#     print("GeoTIFF CRS:", src.crs)
#     ORIGINAL_CRS = src.crs.to_string()
    
# print("Original CRS:", ORIGINAL_CRS)




In [7]:
# Load the GeoJSON file
geo_json_obj = gpd.read_file("sugarcane_growth_patches.geojson")

# Assign the CRS (replace with the correct EPSG code from your GeoTIFF)
geo_json_obj = geo_json_obj.set_crs(ORIGINAL_CRS, allow_override=True)

# Verify CRS and bounds
print("CRS:", geo_json_obj.crs)
print("Total Bounds:", geo_json_obj.total_bounds)
print("Growth Stage Values:", geo_json_obj['growth_stage'].unique())
print("Number of Features:", len(geo_json_obj))

CRS: EPSG:32644
Total Bounds: [575877.31941712 804093.77195026 575944.50054072 804183.34281302]
Growth Stage Values: ['grand_growth' None]
Number of Features: 588


In [9]:
# Create a Folium map centered on the centroid of the patches
centroid = geo_json_obj.to_crs("EPSG:4326").geometry.centroid
m = folium.Map(location=[centroid.y.mean(), centroid.x.mean()], zoom_start=16, tiles="OpenStreetMap")

# Add GeoJSON to the map with style based on growth_stage
folium.GeoJson(
    geo_json_obj.to_crs("EPSG:4326"),
    style_function=lambda x: {
        'fillColor': '#FF0000' if x['properties']['growth_stage'] == 'grand_growth' else '#808080',
        'color': 'black',
        'weight': 0.2,
        'fillOpacity': 0.7
    },
    tooltip=folium.GeoJsonTooltip(fields=['growth_stage', 'row_start', 'col_start'])
).add_to(m)

# define save file path
save_dir = root_dir / "temp_map"
save_dir.mkdir(parents=True, exist_ok=True)  # Create directory if it doesn't exist
save_path = save_dir / "sugarcane_growth_map.html"
# Save and display
m.save(save_path)
print("Interactive map saved as {save_path}")

Interactive map saved as {save_path}



  centroid = geo_json_obj.to_crs("EPSG:4326").geometry.centroid


### GEOJSON create code

In [None]:
def process_field_for_mapping(image_path: Path, ml_model, growth_stages: list,
                              patch_size: int = 64, min_pixel_sum_threshold: int = 1000) -> Path:
    """
    Processes a multispectral GeoTIFF, extracts valid patches,
    predicts growth stages in batch, and generates a GeoJSON file.

    Args:
        image_path (Path): Path to the input multispectral GeoTIFF.
        ml_model: Loaded scikit-learn compatible ML model.
        growth_stages (list): List of growth stage names corresponding to model's labels.
        patch_size (int): Size of the square patches (e.g., 64).
        min_pixel_sum_threshold (int): Patches with a total pixel sum below this
                                       threshold will be considered "informationless" (black)
                                       and skipped. Tune this value.

    Returns:
        Path: Path to the generated GeoJSON file.
    """
    print(f">>>>>>>>>>--------- Starting processing for: {image_path.name} ---------<<<<<<<<<<", flush=True)
    
    patches_to_process = [] # Stores (patch_data, r_start, c_start)
    
    with rasterio.open(image_path) as src:
        # Read all image data once for memory efficiency in patch extraction
        # This assumes the image fits in memory. For extremely large images,
        # you'd need to process by window.
        image_data = src.read() 
        profile = src.profile
        transform = src.transform
        nodata_val = src.nodata
        
        bands, h, w = image_data.shape
        print(f"Image dimensions: {h}x{w} pixels, {bands} bands.", flush=True)
        
        num_patches_skipped = 0
        total_possible_patches = 0
        i = 0  # Patch index for debugging

        for r_start in range(0, h, patch_size):
            for c_start in range(0, w, patch_size):
                total_possible_patches += 1
                r_end = r_start + patch_size
                c_end = c_start + patch_size

                # Ensure patch fits exactly within image bounds
                # if r_end > h or c_end > w:
                #     # Skip partial patches at the edges for simplicity.
                #     # Alternatively, you could pad them to PATCH_SIZE if your model handles it.
                #     num_patches_skipped += 1
                #     continue

                patch_data = image_data[:, r_start:r_end, c_start:c_end]
                #print(patch_data.shape)
                
                # --- Filtering for "informationless" patches ---
                # 1. Check for nodata values (if defined in GeoTIFF)
                if nodata_val is not None and np.all(patch_data == nodata_val):
                    #print(f"----->{i} Skipping patch at {r_start},{c_start}: All pixels are nodata ({nodata_val}).", flush=True)
                    patches_to_process.append((patch_data, r_start, c_start, "skip"))
                    num_patches_skipped += 1
                    #print(f"----->{i} num_patches_skipped: {num_patches_skipped} <-----", flush=True)
                    continue
                
                # # 2. Check if the patch is mostly black/very low intensity
                if np.sum(patch_data) < min_pixel_sum_threshold:
                    #print(f"----->{i} Skipping patch at {r_start},{c_start}: Total pixel sum is too low ({np.sum(patch_data)}).", flush=True)
                    patches_to_process.append((patch_data, r_start, c_start, "skip"))
                    num_patches_skipped += 1
                    #print(f"----->{i} num_patches_skipped: {num_patches_skipped} <-----", flush=True)
                    continue
                
                # # 3. Basic check for sufficient band data for feature extraction
                if patch_data.shape[0] < max(BAND_MAPPING.values()) + 1:
                    #print(f"----->{i} Skipping patch at {r_start},{c_start}: Not enough bands ({patch_data.shape[0]}) for required indexing.", flush=True)
                    patches_to_process.append((patch_data, r_start, c_start, "skip"))
                    num_patches_skipped += 1
                    #print(f"----->{i} num_patches_skipped: {num_patches_skipped} <-----", flush=True)
                    continue

                patches_to_process.append((patch_data, r_start, c_start, "valid"))
                i+= 1  # Increment patch index for debugging

    valid_patches = [p for p in patches_to_process if p[3] == "valid"]           
    #print(f"valid patches to process: {len(valid_patches)} out of {total_possible_patches} possible patches.", flush=True)
                
    print(f"===> Found {len(valid_patches)} valid patches to process out of {total_possible_patches} possible patches.", flush=True)
    print(f"===> Skipped {num_patches_skipped} informationless/partial patches.", flush=True)
    print(f"===> Total patches to process: {len(patches_to_process)}", flush=True)
    
    if not patches_to_process:
        print("No valid patches found to process. Exiting.", flush=True)
        return None
    
    # --- Batch Feature Extraction ---
    print("===> Extracting features in batch...", flush=True)
    # Extract features for all valid patches
    # Use list comprehension for efficient feature extraction
    all_features = []
    for patch_data, r_start, c_start, status in patches_to_process:
        if status == "valid":
            try:
                features = extract_features_from_patch_array(patch_data)
                #print(f"Extracted features for patch at {r_start},{c_start}: {features}", flush=True)
                # p[0] = features  # Replace patch data with extracted features
                all_features.append(features)
            except ValueError as e:
                print(f"Error extracting features from patch at {p[1]},{p[2]}: {e}", flush=True)
    
    #all_features = [extract_features_from_patch_array(p[0]) for p in patches_to_process if p[3] == "valid"]
    
    # Filter out patches where feature extraction failed (returned NaNs)
    valid_features_and_indices = []
    for i, features in enumerate(all_features):
        if not any(np.isnan(f) for f in features): # Check if any feature is NaN
            valid_features_and_indices.append((features, i))
        else:
            print(f"Skipping patch {i} due to invalid features (NaN/Inf).", flush=True)
    #valid_features_and_indices.append((features, i))
    # print(f"Extracted features for {len(valid_features_and_indices)} valid patches.", flush=True)
    # print(valid_features_and_indices)

    if not valid_features_and_indices:
        print("No valid features extracted after filtering. Exiting.", flush=True)
        return None

    # Separate features and original indices
    features_for_prediction = np.array([item[0] for item in valid_features_and_indices])
    original_patch_indices = [item[1] for item in valid_features_and_indices]
    
    print(f"===> Successfully extracted features for {len(features_for_prediction)} patches.", flush=True)
    
    # --- Batch Prediction ---
    print("===> Performing batch prediction...", flush=True)
    # print(f"Features shape: {features_for_prediction.shape}", flush=True)
    # print(features_for_prediction)
    predictions_labels = []
    for i, features in enumerate(features_for_prediction):
        # print(f"Predicting for patch {i} with features: {features}", flush=True)
        try:
            label = ml_model.predict([features])[0]  # Predict single patch
            predictions_labels.append(label)
        except Exception as e:
            print(f"Error predicting for patch {i}: {e}", flush=True)
            predictions_labels.append(-1)
    # predictions_labels = ml_model.predict(features_for_prediction)
    predictions_stages = [growth_stages[label] for label in predictions_labels]
    
    #print(predictions_stages)
    
    print("===> Prediction complete. Next generating GeoJSON.", flush=True)
    
    # --- GeoJSON Generation ---
    geojson_features = []
    with rasterio.open(image_path) as src:
        transform = src.transform

        prediction_index_map = dict(zip(original_patch_indices, predictions_stages))  # Map index → predicted stage
        # print(f"----> Prediction index map: {prediction_index_map}", flush=True)
        # print(f"----> Generating GeoJSON features for {len(patches_to_process)} patches.", flush=True)

        for i, patch in enumerate(patches_to_process):
            _patch_data, r_start, c_start, status = patch

            # print(f"{i} -- Patch data shape: {_patch_data.shape}", flush=True)
            # print(f"{i} -- Processing patch at {r_start},{c_start} with status: {status}", flush=True)

            # Calculate geo-coordinates of patch corners
            ul_lon, ul_lat = transform * (c_start, r_start)
            ur_lon, ur_lat = transform * (c_start + patch_size, r_start)
            lr_lon, lr_lat = transform * (c_start + patch_size, r_start + patch_size)
            ll_lon, ll_lat = transform * (c_start, r_start + patch_size)

            patch_polygon = Polygon([
                (ul_lon, ul_lat),
                (ur_lon, ur_lat),
                (lr_lon, lr_lat),
                (ll_lon, ll_lat),
                (ul_lon, ul_lat)  # Close the polygon
            ])

            # Set growth stage from prediction or None if skipped
            growth_stage = prediction_index_map.get(i, None)
            # print(f"Patch {i} at {r_start},{c_start} has growth stage: {growth_stage}", flush=True)
            # print(f"Patch Status: {status}", flush=True)

            geojson_features.append({
                "type": "Feature",
                "geometry": mapping(patch_polygon),
                "properties": {
                    "growth_stage": growth_stage,
                    "row_start": r_start,
                    "col_start": c_start,
                }
            })

            
    # print(f"===> Generated {len(geojson_features)} GeoJSON features.", flush=True)
    # print(geojson_features)
    
    
    output_geojson_data = {
        "type": "FeatureCollection",
        "crs": {
            "type": "name",
            "properties": {"name": f"EPSG:{src.crs.to_epsg()}"}
        },
        "features": geojson_features
    }

    with open(OUTPUT_GEOJSON_PATH, "w") as f:
        json.dump(output_geojson_data, f, indent=2)

    print(f"----> GeoJSON data saved to {OUTPUT_GEOJSON_PATH}", flush=True)
    print("===> Processing complete. GeoJSON file generated.", flush=True)
    return Path(OUTPUT_GEOJSON_PATH)                