In [None]:
# Use "pip install ____" in the terminal to install missing libraries
# Partition the libraries into smaller cells to avoid longer wait times

# Import the below libraries:
# For working with disk data 
import os # To create reproducible file paths
import pickle # To serialize and reserialize objects (save and load objects from disk)

# For visual displays within VS Code
import warnings # To display warnings

# For the progress bar
from IPython.display import display
from ipywidgets import IntProgress
from tqdm.notebook import tqdm

# For lazy loading
import dask
import gc

# For API access
import earthaccess # To access satellite imagery through the NASA API

# To work with different types of data and conduct data analysis
import cartopy.crs as ccrs # To project coordinates for spatial data and mapping
import earthpy as et # For spatial data analysis
import geopandas as gpd # To handle vectors/shapefiles
import geoviews as gv # To process visualizations
import numpy as np # To work with arrays
import pandas as pd # To work with tables
import re # For regular expressions; regular expression matching operations
import rioxarray as rxr # To work with raster data
import rioxarray.merge as rxrmerge # To merge rasters
from shapely.geometry import Polygon # To work with polygons
from sklearn.cluster import KMeans # For k-means clustering
import xarray as xr # To work with data arrays

# For hvPlot visualization
import hvplot.pandas # To generate plots from Pandas dataframes
import hvplot.xarray # To enable visualization of xarray objects

# Set the GDAL parameters to avoid interruptions
os.environ["GDAL_HTTP_MAX_RETRY"] = "5"
os.environ["GDAL_HTTP_RETRY_DELAY"] = "1"

# Hide non-critical warnings
warnings.simplefilter('ignore')

: 

In [None]:
# A decorator modifies or extends another function
# Pickling the function results ensures the function only runs if the output does not exist
 
# Make the caching decorator:
# Define the decorator
# Set up the rules for the caching; base name for caching = "func_key"
# Set the override to false to avoid reruns
def cached(func_key, override=False):
    # Dox strings explain what the function does
    # Establish the parameters for the function
    """
    A decorator to cache function results
    
    Parameters
    ==========
    key: str
      File basename used to save pickled results
    override: bool
      When True, re-compute even if the results are already stored
    """
    # Create the function that computes and saves or loads the results
    def compute_and_cache_decorator(compute_function):
        """
        Wrap the caching function
        
        Parameters
        ==========
        compute_function: function
          The function to run and cache results
        """
        # Create a function that accepts positional arguments
        # args = Arguments that are defined by their name in the function call
        def compute_and_cache(*args, **kwargs):
            """
            Perform a computation and cache, or load cached result.
            
            Parameters
            ==========
            args
              Positional arguments for the compute function
            kwargs
              Keyword arguments for the compute function
            """
            # Add an identifier ("cache_key") from the particular function call
            # If pass, build a single string by joining func_key with kwargs
            # If no cache_key value is given, provide base name only
            if 'cache_key' in kwargs:
                key = '_'.join((func_key, kwargs['cache_key']))
            else:
                key = func_key

            # Define a file path based on the directory structure in earthpy
            path = os.path.join(
                
                # Establish the earthpy directory
                et.io.HOME, 
                
                # Establish the earthpy dataset
                et.io.DATA_NAME, 
                
                # Make a subdirectory called "jars"
                'jars', 
                
                # Use f-string (formatted string) to create a string by embedding the value of the variable "key" into the string 
                # Use .pickle file extension (
                # Pickle file is a serialized python object
                f'{key}.pickle')
            
            # Check if the cache exists already or if caching should be overriden
            if not os.path.exists(path) or override:
                
                # Make a jars directory if needed
                os.makedirs(os.path.dirname(path), exist_ok=True)
                
                # Run the compute function as the user did
                result = compute_function(*args, **kwargs)
                
                # Pickle the object (save to file)
                # Open the cache file at file name
                with open(path, 'wb') as file:
                    
                    # Save the result without needing to recompute when loading it back into Python
                    pickle.dump(result, file)
            
            ### If the file already exists/the cache is not being overriden:
            else:
               
                # Unpickle the object (load the cached result)
                with open(path, 'rb') as file:
                    
                    # Use pickle.load to unserialize the file back into a Python object
                    result = pickle.load(file)

            # Return either the computed result or cached result    
            return result
        
        # Return wrapper function
        return compute_and_cache
    
    # Return decorator configured by func_key and override
    return compute_and_cache_decorator

In [None]:
# A level 12 watershed is the most detailed, granular way of dividing the watershed dataset
# Assign the hydrologic unit code
HUC_LEVEL = 12

# Using the cache decorator, download, unzip, and read the shapefile:
# Use @ to call the decorator
# The Colorado watershed is in region 12
@cached(f'wbd_14_hu{HUC_LEVEL}_gdf')

# Make a function to read the file
def read_wbd_file(wbd_filename, cache_key):
    # Define the URL data is being pulled from
    wbd_url = (
        "https://prd-tnm.s3.amazonaws.com/StagedProducts/Hydrography/WBD/HU2/Shape/"
        # Insert the name of the desired file
        f"{wbd_filename}.zip"
    )

    # Download the data and unzip it into the directory
    wbd_dir = et.data.get_data(url = wbd_url)

    # Create a path to the shapefile in the directory
    wbd_path = os.path.join(wbd_dir,
                            'Shape',                            
                            f'WBDHU{HUC_LEVEL}.shp')
    
    # Read the shapefile as a GeoDataFrame (GDF)
    wbd_gdf = gpd.read_file(wbd_path,    
                            # Use the pyogrio library
                            engine = 'pyogrio')
    
    # Return the GDF for the watershed boundaries
    return wbd_gdf

In [None]:
# Open the shapefile using the created wbd_read_file
wbd_gdf = read_wbd_file("WBD_14_HU2_SHAPE",
                            f'hu{HUC_LEVEL}')

# Open the shapefile to see the results
wbd_gdf

In [None]:
# Filter/restrict the shapefile to the watershed being used
# Define the GDF for the watershed by subsetting the GDF for the entire dataset
delta_gdf = wbd_gdf[wbd_gdf[
                            # Filter the GDF to the rows of the selected watershed
                            # Hydrologic unit code (HUC) for the Gunnison watershed is 14020002
                            # Dissolve to exclude individual rows
                            f'huc{HUC_LEVEL}'].isin(['14020002'])].dissolve()

# Display the data
delta_gdf

In [None]:
# Create a site map with satellite imagery in the background 
(
    # Project the delta's GDF to Mercator
    delta_gdf.to_crs(ccrs.Mercator())

    # Use hvPlot
    .hvplot(
        # Make the watershed transparent
        alpha = 0.35, fill_color = "white",
        # Add the satellite basemap
        tiles = "EsriImagery",
        # Plot in Mercator
        crs = ccrs.Mercator())
        
    # Set the plot size
    .opts(width = 600, height = 300)
)

In [None]:
# Log into EarthAccess
earthaccess.login(persist = True)

In [None]:
# Search for wanted HLS granules 
results = earthaccess.search_data(
    # Specify the desired dataset and spatial resolution
    short_name = "HLSL30",
    # Specify that cloud data is being used
    cloud_hosted = True,
    # Use the bounding box to establish the watershed boundary
    bounding_box = tuple(delta_gdf.total_bounds),
    # Set the temporal range for the data
    temporal = ("2024-06", "2024-08")
)

# View the results
results

In [None]:
# Create a function to process all the granules and extract information for each granule from the Earth Access search:
# Define the function
def get_earthaccess_links(results):
    # Make and display a progress bar
    f = IntProgress(min = 0, max = len(results), description = "Open granules")
    display(f) # To display the progress bar

    # Use a regular expression to extract tile_id and bank from THE .tif files
    re_url = re.compile(
        r'\.(?P<tile_id>T\w+)\.\d+T\d+\.v\d+\.\d+\.(?P<band>[A-Za-z0-9]+)\.tif$'
    )

    # Accumulate GDF rows from each granule
    link_rows = []

    # Loop over the granules to extract info
    for granule in results:
        # Locate the metadata (Universal metadata model = UMM)
        info_dict = granule['umm']
        # Pull out the unique identifier for the granule
        granule_id = info_dict['GranuleUR']
        # Extract the date and time 
        datetime = pd.to_datetime(
            info_dict['TemporalExtent']['RangeDateTime']['BeginningDateTime']
        )
        # Extact the boundary coordinates for the granule
        points = (
            info_dict
            ['SpatialExtent']['HorizontalSpatialDomain']['Geometry']['GPolygons'][0]['Boundary']['Points']
        )
        # Create a polygon using coordinate points for the granules
        geometry = Polygon(
            [(point['Longitude'],
            point['Latitude']) for point in points]
        )

        # Get the URL and open granule
        files = earthaccess.open([granule])

        # Loop through each file in the granule
        for file in files:
            # Use a URL regular expression to get the URL
            match = re_url.search(file.full_name) 
            # If a match is found, append data to initialized link_rows GDF 
            if match is not None:
                link_rows.append(
                    # Make a GDF with the granule's data and geometry
                    gpd.GeoDataFrame(
                        dict(
                            # For the timestamp
                            datetime = [datetime],
                            # For a unique ID
                            tile_id = [match.group('tile_id')],
                            # For the band name
                            band = [match.group('band')],
                            # For the URL
                            url = [file], 
                            # For the polygon
                            geometry = [geometry],
                            # For the coordinate reference system (CRS)
                            crs = "ESPG:4326"
                        )
                    )
                )

        # Update the progress bar after each granule is done
        f.value+=1

    # Combine into a single GDF
    file_df = pd.concat(link_rows).reset_index(drop = True)   

    # Return the final GDF file
    return file_df

In [None]:
# View the granule results
granule = results[0]
granule

In [None]:
info_dict = granule ['umm']
info_dict

In [None]:
# Run the function to get the granule search results
""" The library ipywidgets needs to be installed to display the IntProgress bar. This can be done by using the
command "pip install ipywidgets" in the terminal. "from IPython.display import display" and "from ipywidgets 
import IntProgress" are also used in the first cell to help Python process this display. """
file_df = get_earthaccess_links(results)

In [None]:
# Apply the cached decorator to the function
@cached('delta_reflecance_da_df')

# Write a function that computes reflectance data using the search results (dataframe of URLs) and watershed boundary
def compute_reflectance_da(search_results, boundary_gdf):
    """
    Use VSI to connect to the files, crop them, apply a cloud mask, and wrangle the data
    Return a single reflectance dataframe with bands as columns and the centroid coordinates and datetime as the
    index

    Parameters
    search_results: list # The search result links to the files (URLs)
    boundary_gdf: gpd.GeoDataFrame # The boundary is used to crop the data
    """
    # Write a function to open the raster from the URL, apply scale factor, and crop and mask data
    def open_dataarray(url, boundary_proj_gdf, scale = 1, masked = True):
        # Open the raster data
        da = rxr.open_rasterio(url, masked = masked, 
                               # Add chunking
                               chunks={"x": 1024, "y": 1024}
                               ).squeeze() * scale
        # If needed, reproject the boundary to match the raster's CRS
        if boundary_proj_gdf is None:
            boundary_proj_gdf = boundary_gdf.to_crs(da.rio.crs)
        # Crop the raster to the bounding box
        cropped = da.rio.clip_box(*boundary_proj_gdf.total_bounds)
        # Return the cropped raster
        return cropped

    # Write a function to apply a cloud mask
    # Mask out law quality data by bit
    def compute_quality_masked(da, mask_bits = [1,2,3]):
        # Unpack the bits to a new axis
        bits = (
            # Unpack each number into individual bits
            np.unpackbits(
                # Convert to 8-bit unsigned integer
                da.astype(np.uint8),
                # Set the order of the bits
                bitorder = 'little'
            # Reshape to match the original data with an extra dimension for the bits
            ).reshape(da.shape + (-1,))
        )
        # Grab the bits and check if they are flagged
        mask = np.prod(
            bits[
                ..., mask_bits] == 0, 
                     axis = -1
        )
        # Return the mask
        return mask

    # Grab the metadata
    file_df = get_earthaccess_links(search_results)

    # Store results for each granule in a list
    granule_da_rows = []

    # Store the projected boundary
    boundary_proj_gdf = None

    # Group the data for each granule
    group_iter = file_df.groupby(
        # By datetime and tile_id
        ['datetime', 'tile_id']
    )

    # Loop through each image and its metadata
    for (datetime, tile_id), granule_df in tqdm(group_iter):
        # Print the status bar
        print(f'Processing granule {tile_id} {datetime}')
        # Find each granule's cloud mask (fmask) URL
        cloud_mask_url = (
            granule_df.loc[granule_df.band == 'Fmask', 'url']
            .values[0])
        # Open the granule cloud mask
        cloud_masked_cropped_da = open_dataarray(cloud_mask_url, boundary_proj_gdf, masked = False)
        # Compute the cloud mask
        cloud_mask = compute_quality_masked(cloud_masked_cropped_da)
        # Free memory early
        del cloud_masked_cropped_da        
        gc.collect()
        # Loop through each spectral band to open, crop, and mask the band
        da_list = []
        df_list = []

        # Loop through each band in the granule
        for i, row in granule_df.iterrows():
            # Only loop through the spectral bands
            if row.band.startswith(('B')):
                # Open the band's raster and scale to reflectance
                band_cropped = open_dataarray(
                    row.url, 
                    boundary_proj_gdf, 
                    scale = 0.0001
                )
                # Name the raster by the band
                band_cropped.name = row.band
                # Cut raster memory in half
                band_cropped = band_cropped.astype("float16")
                # Apply the cloud mask to the raster
                row['da'] = band_cropped.where(cloud_mask)
                # Append the row to granule_da_rows
                granule_da_rows.append(row.to_frame().T)
                
    # Reassemble the metadata dataframe
    return pd.concat(granule_da_rows)

In [None]:
# Apply the function
reflectance_da_df = compute_reflectance_da(results, delta_gdf)

In [None]:
# Display the dataframe
reflectance_da_df

In [None]:
# Apply the cache decorator
@cached('delta_reflectance_da')

# Create a function to merge and composite reflectance data from multiple granules
"""The end result will be a single composite reflectance image for each spectral band."""
def merge_and_composite_arrays(granule_da_df):
    # Initialize a list to store composites after processing
    da_list = []

    # Loop over each spectral band
    for band, band_df in granule_da_df.groupby('band'):
        # Create a list for storing merged data arrays (one per date)
        merged_das = []
        # Loop over the dates and times of the image acquisition and merge the granules for each data
        for datetime, date_df in band_df.groupby('datetime'):
            # Merge granules for each date
            merged_da = rxrmerge.merge_arrays(list(date_df))
            # Mask negative values (could be no data or invalid data)
            merged_da = merged_da.where(merged_da > 0)
            # Append to the initialized merged_das list
            merged_das.append(merged_da)
        # Composite images across dates
        composite_da = xr.concat(merged_das,
                                 # Make a datetime dimension
                                 # Calculate the median value across datetimes for the pixels
                                 dim = 'datetime').median('datetime')
        # Assign band number to the attribute of the composite data array
        composite_da['band'] = int(band[1:])
        # Name the composite data array
        composte_da.name = 'reflectance'
        # Add the processed and composite data array to the list
        da_list.append(composite_da)
    # Concatenate the composite data arrays for each band along the band dimension
    return xr.concat(da_list, dim = 'band')

In [None]:
# Call the function to get the final composite reflectance data 
reflectance_da = merge_and_composite_arrays(reflectance_da_df)

In [None]:
# Convert the spectral DataArray to a tidy DataFrame
model_df = (reflectance_da,
            # Flatten the array into a long dataframe
            .to_dataframe()
            # Select the reflectance column
            .reflectance
            # Make the column wide;
            """Each row will be a pixel location. Each column is a spectral band with the reflectance value."""
            .unstack('band')
)

# Filter out rows with no data
model_df = model_df.drop(columns = [10, 11]).dropna()

# Display the dataframe
model_df

In [None]:
# Assign the minimum and maximum values
min_values = model_df.min()
max_values = model_df.max()

# Print the minimum and maximum values
print(min_values)
print(max_values)

In [None]:
# Initialize the k-means model 
k_means = KMeans(n_clusters = 5)

# Fit the model and predict
prediction = k_means.fit_predict(model_df.values)

# Add the predicted values back to the model dataframe
model_df['clusters'] = prediction

# Display the dataframe
model_df

In [None]:
# Make data array with bands to use for red, green, and blue (rgb)
rgb = reflectance_da.sel(band = [4, 3, 2])

# Plot rgb
(
    rgb_plot.hvplot.rgb(y = 'y',
                    x = 'x',
                    bands = 'band',
                    xaxis = None,
                    yaxis = None)

                    model_df.clusters.to_xarray().sortby(['x', 'y']).hvplot(
        cmap="Colorblind", aspect='equal') 
)