In [5]:
# Set Envionmentals

# Imports
import os
from glob import glob
import pathlib
import json
import requests
from math import floor, ceil
#Third Party
import earthpy as et
import earthpy.appeears as etapp
# import earthpy.earthexplorer as 
import geopandas as gpd
import geoviews as gv
import hvplot.pandas
import hvplot.xarray
import numpy as np
import rioxarray as rxr
import rioxarray.merge as rxrmerge
from shapely.geometry import mapping
import xarray as xr




with open('settings.json', 'r') as file:
    settings = json.load(file)

config = settings[0]

# Define the base directory for a run
run_name = config['study_site']  # You can change this for each run
base_dir = os.path.join("./", run_name)
if not os.path.exists(base_dir):
    os.makedirs(base_dir, exist_ok=True)

In [6]:
import math
from pyproj import CRS

def get_utm_crs(gdf):
    """Determine the UTM CRS based on the centroid of a GeoDataFrame."""
    # Ensure the gdf is in a geographic CRS for accurate centroid calculation
    gdf_geographic = gdf.to_crs(epsg=4326)
    centroid = gdf_geographic.geometry.unary_union.centroid
    lon, lat = centroid.x, centroid.y

    # Determine UTM zone
    utm_zone = math.floor((lon + 180) / 6) + 1

    utm_crs = CRS(f"EPSG:269{utm_zone:02d}")  # Northern hemisphere
    
     
    
    return utm_crs



In [7]:
# Get boundary gdf for clipping

# Download and load boundary data
boundary_config = config['boundary']
boundary_gdf = (gpd.read_file(boundary_config['url'])
                .to_crs(boundary_config['crs']))

# boundary_crs = get_utm_crs(boundary_gdf)
# print(boundary_crs)
# boundary_gdf = boundary_gdf.to_crs(boundary_crs)



study_site_gdf = (boundary_gdf
                  .set_index('GRASSLANDN')
                  .loc[[config['study_site']]]
                  )
print(study_site_gdf.crs)





EPSG:4269


In [8]:
(gv.tile_sources.EsriNatGeo *
 gv.Polygons(
     boundary_gdf
     [boundary_gdf.GRASSLANDN=='Kiowa National Grassland']
 )
 )


In [9]:
print(study_site_gdf.crs)

EPSG:4269


In [10]:
from math import floor, ceil

def download_soil_data(study_site_gdf, config):

    '''Retrieve appropriate download files based off settings and 
    study site boundaries, then converts to raster and merges if across
    multiple tiles.'''
    
    soil_config = config['data_sources']['soil']
    
    raster_dir = os.path.join(base_dir, soil_config['local_path'])
    os.makedirs(raster_dir, exist_ok=True)


    bounds = study_site_gdf.total_bounds
    min_lon, min_lat, max_lon, max_lat = bounds

    min_lat, min_lon = floor(min_lat), floor(min_lon)
    max_lat, max_lon = ceil(max_lat), ceil(max_lon)

    
    property = soil_config['property']
    stat = soil_config['stat']
    depth = soil_config['depth']

    rasters = []
    for lat in range(min_lat, max_lat):
        for lon in range(min_lon, max_lon):
            polaris_url = (
                "http://hydrology.cee.duke.edu/POLARIS/PROPERTIES/v1.0/"
                "{property}/{stat}/{depth}/"
                "lat{min_lat}{max_lat}_lon{min_lon}{max_lon}.tif").format(
                    property=property, 
                    stat=stat, 
                    depth=depth,
                    min_lat=lat,
                    min_lon=lon,
                    max_lat=min(lat + 1, max_lat),
                    max_lon=min(lon + 1, max_lon))

            local_path = os.path.join(raster_dir, f"soil_data_{lat}_{lon}.tif")
            if not os.path.exists(local_path):
                response = requests.get(polaris_url)
                if response.status_code == 200:
                    with open(local_path, 'wb') as file:
                        file.write(response.content)
            rasters.append(local_path)

    # Merge rasters into a single data array
    merged_raster = None
    for raster in rasters:
        raster_data = rxr.open_rasterio(raster, masked=True).squeeze()
        if merged_raster is None:
            merged_raster = raster_data
        else:
            merged_raster = rxrmerge.merge_arrays([merged_raster, raster_data])

    # Reproject the raster to match the CRS of the study site GDF if they are different
    if merged_raster.rio.crs != study_site_gdf.crs:
        merged_raster = merged_raster.rio.reproject(study_site_gdf.crs)

    return merged_raster

# Usage
soil_data = download_soil_data(study_site_gdf, config)
soil_da = soil_data.rio.clip_box(*study_site_gdf.total_bounds)
soil_da


In [11]:

# def get_srtm_data(study_site_gdf, config):
study_site_name = config['study_site']
download_key = study_site_name.replace("National Grassland", "SRTM").replace(" ", "-")
srtm_dir = os.path.join(base_dir, config['data_sources']
                        ['elevation']['local_path'])

    # Initialize the downloader
srtm_downloader = etapp.AppeearsDownloader(
    download_key=download_key,
    ea_dir=srtm_dir,
    product='SRTMGL1_NC.003',
    layer='SRTMGL1_DEM',
    start_date='02-11-2000',
    end_date='02-21-2000',
    polygon=study_site_gdf
    )

    # Download files if they don't already exist
if not os.path.exists(srtm_downloader.data_dir):
    srtm_downloader.download_files()

    # Find all downloaded SRTM files
srtm_paths = glob(os.path.join(srtm_downloader.data_dir, '**', '*.tif'), recursive=True)

    # Load and merge the SRTM data arrays
print(srtm_paths)
srtm_das = [rxr.open_rasterio(srtm_path, masked=True).squeeze() for srtm_path in srtm_paths][0]
srtm_da = rxrmerge.merge_arrays(srtm_das)
srtm_da = srtm_da.rio.reproject(study_site_gdf.crs)

srtm_da



HTTPError: 401 Client Error: UNAUTHORIZED for url: https://appeears.earthdatacloud.nasa.gov/api/login

In [None]:
from xrspatial import slope


# Ensure srtm_da is loaded correctly
if srtm_da is None:
    print("Error: srtm_da is not loaded correctly.")
else:
    # Calculate slope
    dem_slope = slope(srtm_da)

    # Check if dem_slope is computed correctly
    if dem_slope is None:
        print("Error: Failed to compute slope.")
    else:
        # Save the slope data as a new raster file
        slope_path = os.path.join(base_dir, config['data_sources']
                                   ['elevation']['slope_path'])
        dem_slope.rio.to_raster(slope_path, driver='GTiff')
        print(f"Slope raster saved to {slope_path}")


In [None]:
# Get precipiaiton model for CONUS in year 1950
climate_config = config['data_sources']['climate']
maca_url = climate_config["MACA_url"]
scenario = climate_config['scenario']
year = climate_config['year']

maca_response = requests.get(maca_url)

# Grab the directory path from json
climate_path = os.path.join(base_dir, config['data_sources']
                            ['climate']['local_path'])

# Create the directory if it doesn't exist
if not os.path.exists(climate_path):
     os.makedirs(climate_path, exist_ok=True)

 # Define the full path including the filename
maca_path = os.path.join(climate_path, f'{year}-{scenario}-maca.nc')

 # Assuming maca_response is obtained from a requests.get() call
maca_response = requests.get(maca_url)

 # Write the file to the specified directory
with open(maca_path, 'wb') as maca_file:
    maca_file.write(maca_response.content)


In [None]:
maca_ds = xr.open_dataset(maca_path, engine = 'netcdf4')
maca_ds = maca_ds.assign_coords(lon=maca_ds.lon - 360)
maca_ds = maca_ds.rio.write_crs(4269)
precip_da = maca_ds['precipitation'].mean("time")
# precip_da = maca_ds.precipitation.mean("time")
precip_da.rio.write_crs(4269, inplace = True)
precip_da.rio.set_spatial_dims('lon','lat', inplace = True)

maca_ds.precipitation.mean('time').hvplot(rasterize = True)


In [None]:
precip_da = precip_da.rio.reproject_match(soil_da)
precip_da = precip_da.rio.clip_box(*study_site_gdf.total_bounds)
precip_da
print(precip_da.shape)

In [None]:
from rasterio.enums import Resampling

def harmonize_rasters(raster_list, reference_raster):
    """
    Harmonize a list of rasters to match the extent, resolution, and CRS of a reference raster.
    Only reprojects and resamples rasters if they are not already harmonized.

    Parameters:
    raster_list (list of xarray.DataArray): List of rasters to be harmonized.
    reference_raster (xarray.DataArray): The raster to use as the reference for harmonization.

    Returns:
    list of xarray.DataArray: List of harmonized rasters.
    """
    harmonized_rasters = []
    for raster in raster_list:
        # Check if the raster is already harmonized with the reference raster
        if (raster.rio.crs == reference_raster.rio.crs and
            raster.rio.shape == reference_raster.rio.shape and
            raster.rio.transform() == reference_raster.rio.transform()):
            # Raster is already harmonized
            harmonized_rasters.append(raster)
        else:
            # Reproject to match CRS of reference raster
            reprojected_raster = raster.rio.reproject_match(reference_raster)

            # Resample to match resolution of reference raster, using nearest neighbor interpolation
            resampled_raster = reprojected_raster.rio.reproject(
                reference_raster.rio.crs,
                shape=(reference_raster.rio.height, reference_raster.rio.width),
                resampling=Resampling.nearest)

            harmonized_rasters.append(resampled_raster)

    return harmonized_rasters


# Now harmonized_rasters[0], harmonized_rasters[1], and harmonized_rasters[2]
# correspond to harmonized elevation, slope data, and climate respectively.


In [None]:
# Harmonization run
rasters = [soil_da, srtm_da, dem_slope, precip_da]
raster_names = ['soil_da', 'srtm_da', 'dem_slope', 'precip_da']
# Check if any raster data array is None
for raster, name in zip(rasters, raster_names):
    if raster is None:
        print(f"Error: {name} is not loaded correctly.")

# If all rasters are loaded correctly, proceed with harmonization
if all(raster is not None for raster in rasters):
    harmonized_rasters = harmonize_rasters(
        [srtm_da, dem_slope, precip_da],
        reference_raster=soil_da
    )
    # Process harmonized rasters
else:
    print("Harmonization skipped due to missing raster data.")

In [None]:
# Testing fuzzy logic code

# Create a simple test data array
x = np.linspace(0, 10, 100)
test_data_array = xr.DataArray(x, dims=["x"])

# Define a simple triangular membership function
def create_membership_function(data_array, a, b, c):
    return xr.where(
        (data_array <= a) | (data_array >= c), 0,
        xr.where(
            data_array <= b,
            (data_array - a) / (b - a), 
            (c - data_array) / (c - b))
    )

# Apply the membership function to the test data array
mf_test = create_membership_function(test_data_array, 3, 5, 7)

# Define and apply a simple fuzzy rule
def apply_simple_rule(data_array, threshold):
    return xr.where(data_array > threshold, 1, 0)

# Apply the rule to the membership function result
rule_result = apply_simple_rule(mf_test, 0.5)

# Print the results
print("Membership Function Result:\n", mf_test)
print("\nRule Application Result:\n", rule_result)


In [None]:
# Load fuzzy logic configuration
fuzzy_config = config['fuzzy_logic']

# Prepare data arrays
variables_data = {
    'soil_pH': soil_da,
    'elevation': srtm_da,
    'climate': precip_da,
    'slope': dem_slope
}


# Print shapes of the data arrays
print("Data Arrays Shapes:")
for var_name, data_array in variables_data.items():
    print(f"{var_name}: {data_array.shape}")

In [None]:
def create_membership_function(data_array, set_config):
    """
    Creates a membership function based on the provided configuration.

    This function generates a membership function for a given data array based on 
    the configuration specified in `set_config`. Currently, it supports a 
    triangular membership function.

    Parameters:
    - data_array (xarray.DataArray): The data array for which the membership 
      function is to be created. This array contains the values to be evaluated 
      by the membership function.
    - set_config (dict): A dictionary containing the configuration for the 
      membership function. It includes the type of function ('triangular') and 
      its parameters ('params') which are three points defining the triangular 
      function: a, b, and c.

    Returns:
    xarray.DataArray: A data array of the same shape as `data_array`, containing 
    the membership values for each element in `data_array`. The membership values 
    are calculated based on the type of function and its parameters defined in 
    `settings.json' imported to.

    The triangular membership function is defined as follows:
    - For x <= a or x >= c, membership value is 0.
    - For a < x <= b, membership value increases linearly from 0 to 1.
    - For b < x < c, membership value decreases linearly from 1 to 0.

    If the function type is not supported, the function will not perform any 
    operation and may return an error or the original data array.
    """
    set_type = set_config['type']
    a, b, c = set_config['params']  # Unpack the parameters directly
    if set_type == 'triangular':
        return xr.where(
            (data_array <= a) | (data_array >= c), 0,
            xr.where(
                data_array <= b,
                (data_array - a) / (b - a), 
                (c - data_array) / (c - b))
        )


In [None]:
def apply_membership_functions(variables_data, variables_config):
    """
    Applies membership functions to data arrays based on configuration settings.

    Parameters:
    - variables_data (dict): Dictionary with variable names as keys and xarray.DataArray objects as values.
    - variables_config (dict): Configuration for each variable, including fuzzy set definitions and membership function parameters.

    Returns:
    dict: Nested dictionary with variable names as keys. Each key maps to a dictionary of fuzzy set names and their corresponding membership values as xarray.DataArray objects.

    This function creates fuzzy sets for each variable using specified membership functions. It assumes matching entries in variables_data and variables_config.

    Example:
    {'temperature': {'low': DataArray, 'medium': DataArray, 'high': DataArray}}
    """
    
    membership_functions = {}
    for var_name, data_array in variables_data.items():
        membership_functions[var_name] = {}
        for set_name, set_config in variables_config[var_name]['sets'].items():
            membership_functions[var_name][set_name] = create_membership_function(data_array, set_config)
    return membership_functions


In [None]:
membership_functions = apply_membership_functions(variables_data, 
                                                  fuzzy_config['variables'])

In [None]:
def evaluate_fuzzy_rules(membership_functions, rules_config):
   
    """
    Evaluates fuzzy logic rules to compute habitat suitability.

    Parameters:
    - membership_functions (dict): Nested dictionary of membership functions 
      for each variable and fuzzy set.
    - rules_config (list): List of dictionaries, each representing a fuzzy logic rule.

    Returns:
    xarray.DataArray: DataArray representing habitat suitability based on evaluated rules.

    This function iterates over each rule in rules_config, applies logical operations 
    (AND, OR) on the membership functions, and computes the habitat suitability score. 
    The suitability score is scaled based on the outcome of each rule ('high', 'medium', 'low').

    Example:
    evaluate_fuzzy_rules(membership_functions, fuzzy_config['rules'])
    """
   
    # Initialize habitat suitability with zeros
    reference_mf = next(iter(next(iter(membership_functions.values())).values()))
    habitat_suitability = xr.full_like(reference_mf, 0)

    # Iterate over each rule
    for rule in rules_config:
        condition = rule['if']
        outcome = rule['then'].split('.')[-1]  # Extract the outcome part

        

        # Initialize a temporary array for this rule's result
        rule_result = xr.full_like(reference_mf, 0)

        # Process the condition
        for i in range(0, len(condition), 2):
            var_set = condition[i].split('.')
            mf = membership_functions[var_set[0]][var_set[1]]

            aligned_rule_result, aligned_mf = xr.align(rule_result, mf, join='outer')

            if i == 0:
                rule_result = mf
            else:
                if condition[i-1] == 'AND':
                    rule_result = xr.where(aligned_rule_result > 0, aligned_mf, 0)
                elif condition[i-1] == 'OR':
                    rule_result = np.maximum(aligned_rule_result, aligned_mf)

        # Apply the outcome to the habitat suitability
        if outcome == 'high':
            habitat_suitability = np.maximum(habitat_suitability, rule_result * 100)
        elif outcome == 'medium':
            habitat_suitability = np.maximum(habitat_suitability, rule_result * 50)
        elif outcome == 'low':
            habitat_suitability = np.maximum(habitat_suitability, rule_result * 25)

    return habitat_suitability

# # Evaluate the fuzzy rules with the full logic
# habitat_suitability = evaluate_fuzzy_rules(membership_functions, fuzzy_config['rules'])

# # Print the final habitat suitability result
# print("Habitat Suitability Result:", habitat_suitability)


In [None]:
# Membership debugging cell 


# Apply membership functions
fuzzy_config = config['fuzzy_logic']
membership_functions = apply_membership_functions(variables_data, 
                                                  fuzzy_config['variables'])

# Print the membership functions dictionary
print("Membership Functions Dictionary:")
for var_name, sets in membership_functions.items():
    print(f"Variable: {var_name}")
    for set_name, mf in sets.items():
        print(f"  {set_name}: {mf}")

# Check the output of next(iter(membership_functions.values()))
print("\nOutput of next(iter(membership_functions.values())):")
first_mf_output = next(iter(membership_functions.values()))
print(first_mf_output)


# Print membership function results
print("Membership Functions Results:")
for var_name, mf_dict in membership_functions.items():
    print(f"{var_name}:")
    for set_name, mf in mf_dict.items():
        print(f"  {set_name}: {mf}")

# Evaluate fuzzy rules
habitat_suitability = evaluate_fuzzy_rules(membership_functions, 
                                           fuzzy_config['rules'])

# Print habitat suitability result
print("Habitat Suitability Result:", habitat_suitability)

In [None]:
# Assuming you know the CRS of habitat_suitability, for example, 'EPSG:4326'
habitat_suitability.rio.write_crs("EPSG:4269", inplace=True)

# Now, reproject the habitat suitability DataArray to match the CRS of the study site GeoDataFrame
habitat_suitability = habitat_suitability.rio.reproject_match(soil_da)

print(habitat_suitability.shpae)

In [None]:
import geoviews.feature as gf
from cartopy import crs as ccrs

# Ensure both DataArray and GeoDataFrame are in the same CRS
# If they are not, you might need to reproject one of them

# Create a plot of the habitat suitability
habitat_plot = habitat_suitability.hvplot.image(
    x='x', y='y', 
    cmap='viridis', 
    width=700, height=400, 
    colorbar=True, 
    title='Habitat Suitability Map'
)

# Create a plot of the study site polygon
study_site_plot = study_site_gdf.hvplot(
    geo=True, 
    alpha=0.3,  # Adjust transparency as needed
    color='red'
)

# Overlay the two plots
final_plot = habitat_plot * study_site_plot

# Display the final plot
final_plot


In [None]:
# Check CRS
print(habitat_suitability.rio.crs)
print(study_site_gdf.crs)

# If they are different, reproject one to match the other
# For example, if reprojecting the GeoDataFrame to match the DataArray:
# habitat_suitability = habitat_suitability.rio.reproject_match(study_site_gdf)
