In [None]:
%load_ext autoreload
%autoreload 2


## Libraries

In [None]:
import ee
import geemap
from typing import Literal


In [None]:
# Add root to path
import sys

sys.path.append("..")
from component.script.gee.ee_fao_gaul import (
    get_fao_gaul_features,
    get_fao_gaul_subj,
)
from component.script.gee.ee_rasterize_unique_values import gee_rasterize_unique_values
from component.script.gee.ee_raster_export import download_ee_image


In [None]:
SourceType = Literal["local", "gee"]
VariableType = Literal["vector", "raster"]

## GEE


In [None]:
ee_project = "you-ee-project"
ee.Initialize(project=ee_project)


## Create folders

In [None]:
from pathlib import Path

root_folder: Path = Path.cwd().parent
downloads_folder: Path = root_folder / "data"
downloads_folder.mkdir(parents=True, exist_ok=True)


## Set user parameters

In [None]:
project_name = "test"
years: list[int] = [2015, 2020, 2024]
forest_source: Literal["gfc", "tmf"] = "gfc"
tree_cover_threshold: int = 10


## Create projects folders

In [None]:
project_folder = downloads_folder / project_name
project_folder.mkdir(parents=True, exist_ok=True)

data_raw_folder = project_folder / "data_raw"
data_raw_folder.mkdir(parents=True, exist_ok=True)


## Helpers

In [None]:
from pathlib import Path


def set_variable_file_path(
    project_name: str, data_raw_folder: str, variable_name: str, variable_type: str
) -> Path:
    """
    Determine the file extension based on variable type and return the full file path.

    Args:
        project_name (str): The name of the project.
        data_raw_folder (str): The path to the data raw folder.
        variable_type (str): The type of the variable ("vector" or "raster").
        data_raw_folder (str): The path to the data raw folder.
        variable_name (str): The name of the variable.

    Returns:
        Path: The full file path with the appropriate extension.

    Raises:
        ValueError: If an unsupported variable type is provided.
    """
    # Define supported extensions
    extensions = {"vector": ".shp", "raster": ".tif"}

    # Validate variable type and get the corresponding extension
    if variable_type not in extensions:
        raise ValueError(
            f"Unsupported variable type: {variable_type}. Supported types are: {list(extensions.keys())}"
        )

    variable_extension = extensions[variable_type]

    # Create and return the full file path
    return Path(data_raw_folder) / f"{project_name}_{variable_name}{variable_extension}"


In [None]:
from pathlib import Path
import shutil
from typing import Union


def copy_and_rename_file(
    file_path: Union[str, Path], destination_path: Union[str, Path]
) -> Path:
    """
    Copy a file to a designated location and rename it. If the source is a shapefile (.shp),
    also copies all corresponding auxiliary files.

    Args:
        file_path (Union[str, Path]): The path to the source file to be copied.
        destination_path (Union[str, Path]): The full path including folder and new filename for the copied file.

    Returns:
        Path: The path to the newly copied file.

    Raises:
        FileNotFoundError: If the source file does not exist.
        PermissionError: If there are insufficient permissions to read the source file or write to destination.

    Example:
        >>> copy_and_rename_file('/path/to/source/file.shp', '/path/to/destination/renamed_file.shp')
        PosixPath('/path/to/destination/renamed_file.shp')
    """
    # Convert to Path objects
    source_file = Path(file_path)
    new_file_path = Path(destination_path)

    # Validate that source file exists
    if not source_file.exists():
        raise FileNotFoundError(f"Source file '{source_file}' does not exist.")

    # Ensure the destination directory exists
    new_file_path.parent.mkdir(parents=True, exist_ok=True)

    # Copy the main file to the destination with a new name
    shutil.copy2(source_file, new_file_path)

    # If source is a shapefile (.shp), also copy auxiliary files
    if source_file.suffix.lower() == ".shp":
        # Use pathlib to construct the glob pattern and iterate over auxiliary files
        aux_files = list(source_file.parent.glob(f"{source_file.stem}.*"))
        # print(f"Found {len(aux_files)} auxiliary files: {aux_files}")
        for aux_file in aux_files:
            new_aux_filename = f"{new_file_path.stem}{aux_file.suffix}"
            new_aux_path = new_file_path.parent / new_aux_filename
            shutil.copy2(aux_file, new_aux_path)
            print(f"Auxiliary file copied to {new_aux_path}")

    # print(f"File copied to {new_file_path}")
    return new_file_path


## AOI

In [None]:
aoi_source: SourceType = "local"
aoi_type: VariableType = "vector"
aoi_name: Path = set_variable_file_path(project_name, data_raw_folder, "aoi", aoi_type)

In [None]:
if aoi_source == "gee":
    
    iso_code = "MTQ"

    ##Define AOI using a GEE featureCollecion
    # aoi_id = "str"
    # aoi = ee.FeatureCollection(aoi_id)

    ##Define AOI using a FAO Gaul Object
    aoi = get_fao_gaul_features(level=0, code=iso_code)

    if not Path(aoi_name).exists():
        geemap.ee_export_vector(
            aoi,
            aoi_name,
            selectors=["gaul0_name", "iso3_code"],
            keep_zip=False,
            timeout=600,
            verbose=False,
        )

if aoi_source == "local":
    aoi_path: Path = "/path/to/local/aoi.shp"
    copy_and_rename_file(aoi_path, aoi_name)
    aoi = geemap.shp_to_ee(aoi_path)

aoi_geom = aoi.geometry()
aoi_geometry_json = aoi_geom.serialize()

print("Done!")

## SubJuridistion

In [None]:
subj_source: SourceType = "gee"
subj_type: VariableType = "raster"


In [None]:
subj_name: Path = set_variable_file_path(
    project_name, data_raw_folder, "subj", subj_type
)
print(subj_name)

In [None]:
if subj_source == "gee":
    filtered_subj, filtered_attribute = get_fao_gaul_subj(2, aoi)
    subj_image = (
        ee.Image(gee_rasterize_unique_values(filtered_subj, "gaul2_name"))
        .clip(aoi)
        .toByte()
    )
    if not Path(subj_name).exists():
        download_ee_image(
            subj_image,
            subj_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{subj_name} already exists. Skipping download.")

if subj_source == "local":
    subj_path: Path = ""
    copy_and_rename_file(subj_path, subj_name)


## Protected Areas

In [None]:
pa_source: SourceType = "gee"
pa_type: VariableType = "raster"


In [None]:
pa_name: Path = set_variable_file_path(project_name, data_raw_folder, "pa", pa_type)
print(pa_name)

In [None]:
if subj_source == "gee":
    # Source
    # https://developers.google.com/earth-engine/datasets/catalog/WCMC_WDPA_current_polygons

    wdpa_poly = (
        ee.FeatureCollection("WCMC/WDPA/current/polygons")
        .filterBounds(aoi)
        .filter(
            ee.Filter.inList(
                "STATUS", ["Designated", "Inscribed", "Established", "Proposed"]
            )
        )
    )
    wdpa_image = (
        wdpa_poly.reduceToImage(["WDPAID"], ee.Reducer.first())
        .gt(0)
        .unmask()
        .clip(aoi)
        .toByte()
    )
    if not Path(pa_name).exists():
        download_ee_image(
            wdpa_image,
            pa_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{pa_name} already exists. Skipping download.")
        
if pa_source == "local":
    pa_path: Path = ""
    copy_and_rename_file(pa_path, pa_name)

print("Done!")

## Altitude and Slope

In [None]:
altitude_source: SourceType = "gee"
altitude_type: VariableType = "raster"
slope_source: SourceType = "gee"
slope_type: VariableType = "raster"


In [None]:
altitude_name: Path = set_variable_file_path(
    project_name, data_raw_folder, "altitude", altitude_type
)
print(altitude_name)
slope_name: Path = set_variable_file_path(
    project_name, data_raw_folder, "slope", slope_type
)
print(slope_name)

In [None]:
if altitude_source == "gee":
    srtm = ee.Image("USGS/SRTMGL1_003").select("elevation").clip(aoi)
    if not Path(altitude_name).exists():
        download_ee_image(
            srtm,
            altitude_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            nodata_value=None,
        )
    else:
        print(f"{altitude_name} already exists. Skipping download.")

else:
    # Local
    altitude_path: Path = ""
    copy_and_rename_file(altitude_path, altitude_name)


if slope_source == "gee":
    slope = ee.Terrain.slope(srtm).clip(aoi)
    if not Path(slope_name).exists():
        download_ee_image(
            slope,
            slope_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            nodata_value=None,
        )
    else:
        print(f"{slope_name} already exists. Skipping download.")

else:
    # Local
    slope_path: Path = ""
    copy_and_rename_file(slope_path, slope_name)

print("Done!")

## Forest layers

In [None]:
forest_source_primary: SourceType = "gee"
forest_source_secondary = forest_source
forest_type: VariableType = "raster"


In [None]:
if forest_source_primary == "gee":
    if forest_source_secondary == "gfc":
        forest_var = (
            "forest_" + forest_source_secondary + "_" + str(tree_cover_threshold) + "_"
        )
    elif forest_source_secondary == "tmf":
        forest_var = "forest_" + forest_source_secondary + "_"
elif forest_source_primary == "local":
    forest_var = "forest_"

In [None]:
forest1_name: Path = set_variable_file_path(
    project_name, data_raw_folder, forest_var + str(years[0]), forest_type
)
forest2_name: Path = set_variable_file_path(
    project_name, data_raw_folder, forest_var + str(years[1]), forest_type
)
forest3_name: Path = set_variable_file_path(
    project_name, data_raw_folder, forest_var + str(years[2]), forest_type
)

print(forest1_name)
print(forest2_name)
print(forest3_name)

In [None]:
if forest_source_primary == "gee" and forest_source == "gfc":
    gfcImage = ee.Image("UMD/hansen/global_forest_change_2024_v1_12")
    clipGfc = gfcImage.clip(aoi)
    forest2000 = clipGfc.select(["treecover2000"])
    forest2000_thr = (
        ee.Image(0).where(forest2000.gte(tree_cover_threshold), 1).clip(aoi)
    )
    loss = clipGfc.select(["lossyear"])

    forest_gcf_t1 = forest2000_thr.where(loss.lt(years[0] - 2000), 0).rename("B1")
    forest_gcf_t2 = forest2000_thr.where(loss.lt(years[1] - 2000), 0).rename("B1")
    forest_gcf_t3 = forest2000_thr.where(loss.lt(years[2] - 2000), 0).rename("B1")

    if not Path(forest1_name).exists():
        download_ee_image(
            forest_gcf_t1,
            forest1_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest1_name} already exists. Skipping download.")


    if not Path(forest2_name).exists():
        download_ee_image(
            forest_gcf_t2,
            forest2_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest2_name} already exists. Skipping download.")

    if not Path(forest3_name).exists():
        download_ee_image(
            forest_gcf_t3,
            forest3_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest3_name} already exists. Skipping download.")

    print("Done!")

In [None]:
if forest_source_primary == "gee" and forest_source == "tmf":
    tmfImage = (
        ee.ImageCollection("projects/JRC/TMF/v1_2024/AnnualChanges")
        .filterBounds(aoi)
        .mosaic()
    )
    forest2_t1 = tmfImage.select("Dec" + str(years[0] - 1))
    forest2_t2 = tmfImage.select("Dec" + str(years[1] - 1))
    forest2_t3 = tmfImage.select("Dec" + str(years[2] - 1))

    forest_tmf_t1 = (
        forest2_t1.where(forest2_t1.eq(2), 1).where(forest2_t1.neq(1), 0).rename("B1")
    )
    forest_tmf_t2 = (
        forest2_t2.where(forest2_t2.eq(2), 1).where(forest2_t2.neq(1), 0).rename("B2")
    )
    forest_tmf_t3 = (
        forest2_t3.where(forest2_t3.eq(2), 1).where(forest2_t3.neq(1), 0).rename("B3")
    )
    if not Path(forest1_name).exists():
        download_ee_image(
            forest_tmf_t1,
            forest1_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest1_name} already exists. Skipping download.")


    if not Path(forest2_name).exists():
        download_ee_image(
            forest_tmf_t2,
            forest2_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest2_name} already exists. Skipping download.")

    if not Path(forest3_name).exists():
        download_ee_image(
            forest_tmf_t3,
            forest3_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{forest3_name} already exists. Skipping download.")

    print("Done!")

In [None]:
if forest_source_primary == "local":
    forest1_path: Path = ""
    copy_and_rename_file(forest1_path, forest1_name)
    forest2_path: Path = ""
    copy_and_rename_file(forest2_path, forest2_name)
    forest3_path: Path = ""
    copy_and_rename_file(forest3_path, forest3_name)

## Rivers

In [None]:
rivers_source: SourceType = "gee"
rivers_type: VariableType = "raster"

In [None]:
rivers_name: Path = set_variable_file_path(
    project_name, data_raw_folder, "rivers", rivers_type
)

print(rivers_name)

In [None]:
if rivers_source == "gee":
    # Source
    # https://gee-community-catalog.org/projects/osm_water/
    osm_water = (
        ee.ImageCollection("projects/sat-io/open-datasets/OSM_waterLayer")
        .filterBounds(aoi)
        .mosaic()
        .clip(aoi)
    )
    osm_rivers = osm_water.gte(2).unmask().clip(aoi).toByte()

    if not Path(rivers_name).exists():
        download_ee_image(
            osm_rivers,
            rivers_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{rivers_name} already exists. Skipping download.")

else:
    # Local
    rivers_path: Path = ""
    copy_and_rename_file(rivers_path, rivers_name)

print("Done!")


## Roads

In [None]:
roads_source: SourceType = "gee"
roads_type: VariableType = "raster"


In [None]:
roads_name: Path = set_variable_file_path(
    project_name, data_raw_folder, "roads", roads_type
)
print(roads_name)


In [None]:
if roads_source == "gee":
    osm_roads = (
        ee.Image(
            "projects/ee-andyarnellgee/assets/crosscutting/infrastructure/roads_osm/roadsAllImageOSM"
        )
        .unmask()
        .clip(aoi)
        .toByte()
    )
    if not Path(roads_name).exists():
        download_ee_image(
            osm_roads,
            roads_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{roads_name} already exists. Skipping download.")

else:
    roads_path: Path = ""
    copy_and_rename_file(roads_path, roads_name)

print("Done!")


## Towns

In [None]:
towns_source: SourceType = "gee"
towns_type: VariableType = "raster"


In [None]:
def closest_epoch(year):
    # Define the list of epochs from 1975 to 2020 with 5-year intervals
    epochs = list(range(1975, 2021, 5))

    # Find the closest epoch to the given year
    closest = min(epochs, key=lambda x: abs(x - year))

    return closest


# Apply the function to each year
closest_epochs = [closest_epoch(y) for y in years]


In [None]:
if towns_source == "gee":
    towns1_name: Path = set_variable_file_path(
        project_name, data_raw_folder, "towns_" + str(closest_epochs[0]), towns_type
    )
    towns2_name: Path = set_variable_file_path(
        project_name, data_raw_folder, "towns_" + str(closest_epochs[1]), towns_type
    )
    towns3_name: Path = set_variable_file_path(
        project_name, data_raw_folder, "towns_" + str(closest_epochs[2]), towns_type
    )
elif towns_source == "local":
    towns_name: Path = set_variable_file_path(
        project_name, data_raw_folder, "towns", towns_type
    )


In [None]:
if towns_source == "gee":
    jrc_ghsl_pop = ee.ImageCollection("JRC/GHSL/P2023A/GHS_POP")

    cities_pop_1 = ee.Image("JRC/GHSL/P2023A/GHS_POP/" + str(closest_epochs[0]))
    cities_pop_2 = ee.Image("JRC/GHSL/P2023A/GHS_POP/" + str(closest_epochs[1]))
    cities_pop_3 = ee.Image("JRC/GHSL/P2023A/GHS_POP/" + str(closest_epochs[2]))

    cities_build_1 = ee.Image(
        "JRC/GHSL/P2023A/GHS_BUILT_S/" + str(closest_epochs[0])
    ).select("built_surface")
    cities_build_2 = ee.Image(
        "JRC/GHSL/P2023A/GHS_BUILT_S/" + str(closest_epochs[1])
    ).select("built_surface")
    cities_build_3 = ee.Image(
        "JRC/GHSL/P2023A/GHS_BUILT_S/" + str(closest_epochs[2])
    ).select("built_surface")

    cities1 = (
        ee.Image(0).where(cities_pop_1.gte(15).And(cities_build_1.gte(90)), 1).clip(aoi)
    )
    cities2 = (
        ee.Image(0).where(cities_pop_2.gte(15).And(cities_build_2.gte(90)), 1).clip(aoi)
    )
    cities3 = (
        ee.Image(0).where(cities_pop_3.gte(15).And(cities_build_3.gte(90)), 1).clip(aoi)
    )
    if not Path(towns1_name).exists():
        download_ee_image(
            cities_build_1,
            towns1_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{towns1_name} already exists. Skipping download.")


    if not Path(towns2_name).exists():
        download_ee_image(
            cities_build_2,
            towns2_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{towns2_name} already exists. Skipping download.")
    
    if not Path(towns3_name).exists():
        download_ee_image(
            cities_build_3,
            towns3_name,
            scale=30,
            crs="EPSG:4326",
            region=aoi_geom,
            overwrite=True,
            unmask_value=255,
            nodata_value=255,
        )
    else:
        print(f"{towns3_name} already exists. Skipping download.")

    print("Done!")

In [None]:
if towns_source == "local":
    towns_path: Path = ""
    copy_and_rename_file(towns_path, towns_name)
    print("Done!")


In [None]:
## Consider using Global Human Modification v3
# https://gee-community-catalog.org/projects/ghm/?h=human

## Oxford accessibility to cities 2015
## ee.Image('Oxford/MAP/accessibility_to_cities_2015_v1_0')


## Custom variable

In [None]:
custom_source: SourceType = "gee"

# Define custom variables
custom_gee_variables = [
    {
        "asset_id": "your/custo/asset_id",
        "name": "my_custom_variable_name",
        "type": "raster"
    }, 
    {
        "asset_id": "your/custo/asset_id_2",
        "name": "my_custom_variable_name_2",
        "type": "raster"
    }, 
]

# if it is local, define the path
custom_local_variables = [
    {
        "path": "path_to/custom/asset.tif",
        "name": "my_custom_variable_name",
        "type": "raster"
    },
    {
        "path": "path_to/custom/asset.shp",
        "name": "my_custom_variable_name",
        "type": "vector"
    },  
]

In [None]:
if custom_source == "gee":

    for variable in custom_gee_variables:

        custom_variable_name = variable["name"]
        custom_type = variable["type"]

        custom_name: Path = set_variable_file_path(
            project_name, data_raw_folder, custom_variable_name, custom_type
        )
        print(custom_name)

        custom_gee_id = variable["asset_id"]
        custom_image = ee.Image(custom_gee_id).clip(aoi_geom)

        if not Path(custom_name).exists():
            download_ee_image(
                custom_image,
                custom_name,
                scale=30,
                crs="EPSG:4326",
                region=aoi_geom,
                overwrite=True,
                unmask_value=255,
                nodata_value=255,
            )
        else:
            print(f"{custom_name} already exists. Skipping download.")



In [None]:
if custom_source == "local":
    for variable in custom_local_variables:
        custom_name = variable["name"]
        custom_path: Path = variable["path"]
        print(custom_path)
        copy_and_rename_file(custom_path, custom_name)



## Save parameters


In [None]:
# def save_selected_keys_to_txt(self, keys_to_save: list, filename: str):
#         """Saves selected keys of a dictionary to a text file in 'key= value' format.
#         If a key contains a dictionary, its contents are written as separate key-value pairs."""
#         nfile = self.args["workdir"] + '/' + filename
#         print(nfile)

#         with open(nfile, "w") as file:
#             for key in keys_to_save:
#                 if key in self.args:  # Only write if key exists in the dictionary
#                     value = self.args[key]
#                     if isinstance(value, dict):  # If value is a dictionary, write its contents
#                         for sub_key, sub_value in value.items():
#                             file.write(f"{sub_key}= {sub_value}\n")
#                     else:
#                         file.write(f"{key}= {value}\n")


In [None]:
# save_selected_keys_to_txt(['get_fcc_args','isocode', 'proj'], 'parameters.txt')
