In [None]:
import glob
import os
import rasterio
import geopandas as gpd
from rasterstats import zonal_stats
from shapely.ops import transform as shapely_transform
import pyproj
from shapely.geometry import box, Polygon, MultiPolygon
import h3

In [None]:
PATH = r"D:\CLMS\GRA_2018_010m_pl_03035_v010\DATA"
AOI = "metropolia_gornoslasko-zaglebiowska_8269826.geojson" # geojson or None
MAP_HEX_SIZE = 9
europe = pyproj.CRS("EPSG:3035")
wgs84 = pyproj.CRS("EPSG:4326")
ZONAL_STATS = ["sum"]

In [None]:
files = glob.glob(os.path.join(PATH, "*.tif"))
files

In [None]:
def hex_to_polygon(h3_index):
    boundary = h3.cell_to_boundary(h3_index)
    return Polygon([(lon, lat) for lat, lon in boundary])

In [None]:
def get_hexes_for_polygon(polygon, target_hex_size=7, fill_hex_size=10):
    polygon_points_lon_lat = polygon.exterior.coords
    polygon_points_lat_lon = tuple(coord[::-1] for coord in polygon_points_lon_lat)
    h3_polygon = h3.LatLngPoly(polygon_points_lat_lon)
    fill_hexes = h3.h3shape_to_cells(h3_polygon, fill_hex_size)
    target_hexes = {h3.cell_to_parent(h, target_hex_size) for h in fill_hexes}
    return target_hexes

In [None]:
if AOI:
    print("Generating hexes for area of interest")
    gdf_aoi = gpd.read_file(AOI)
    geometry = gdf_aoi["geometry"].iloc[0]
    
    aoi_hexes = set()
    if isinstance(geometry, Polygon):
        aoi_hexes = get_hexes_for_polygon(polygon=geometry, target_hex_size=MAP_HEX_SIZE)
    elif isinstance(geometry, MultiPolygon):
        for geom_polygon in geometry.geoms:
            aoi_hexes.update(get_hexes_for_polygon(polygon=geom_polygon, target_hex_size=MAP_HEX_SIZE))
    print(f"Found {len(aoi_hexes)} hexes")
    
    hex_polygons = [hex_to_polygon(h) for h in aoi_hexes]

for file in files:
    print(f"Processing {file}")
    with rasterio.open(file) as src:
        bounds = src.bounds
        crs = src.crs
        raster_data = src.read(1)
        transform = src.transform
      
    if not AOI:
        print("Generating hexes for whole tile")

        project = pyproj.Transformer.from_crs(crs_from=europe, crs_to=wgs84, always_xy=True).transform
        bbox = box(bounds.left, bounds.bottom, bounds.right, bounds.top)
        bbox_wgs84 = shapely_transform(project, bbox)
        bbox_coords = [(x, y) for x, y in bbox_wgs84.exterior.coords]
        geometry = Polygon(shell=bbox_coords)
    
        aoi_hexes = set()
        if isinstance(geometry, Polygon):
            aoi_hexes = get_hexes_for_polygon(polygon=geometry, target_hex_size=MAP_HEX_SIZE)
        elif isinstance(geometry, MultiPolygon):
            for geom_polygon in geometry.geoms:
                aoi_hexes.update(get_hexes_for_polygon(polygon=geom_polygon, target_hex_size=MAP_HEX_SIZE))
        print(f"Found {len(aoi_hexes)} hexes")
        
        hex_polygons = [hex_to_polygon(h) for h in aoi_hexes]

    gdf = gpd.GeoDataFrame({"h3_index": list(aoi_hexes)}, geometry=hex_polygons, crs=wgs84)

    gdf = gdf.to_crs(europe)

    cmap = {
        0: "other",
        1: "grassland",
        254: "unclassifiable",
        255: "outside area",
    }
    stats = zonal_stats(gdf, file, categorical=True, category_map=cmap, geojson_out=True, all_touched=True)
    
    gdf_stats = gpd.GeoDataFrame.from_features(stats)
    
    data_columns = [
        "other",
        "grassland",
        "unclassifiable",
        "outside area",
    ]
    for col in data_columns:
        if col not in gdf_stats.columns:
            gdf_stats[col] = 0
    
    gdf_output = gdf_stats[["h3_index"] + data_columns]
    gdf_output = gdf_output.dropna()
    
    if gdf_output.size > 0:
        gdf_output[data_columns] = gdf_output[data_columns].astype("int")
        
        print("Exporting data to .csv file")
        gdf_output.to_csv(file.replace(".tif", ".csv"), header=False, index=False)
    else:
        print("No data to export")