In [None]:

def create_mask(ds, mask_type,granule_id=None,
                title=None):
    """
    Refer: HLS User Guide v2 Table 9 and Appendix: 
    """
    fmask_array = ds[['Fmask']].squeeze().to_array()

    
    # Mask cloud and snow, water
    # prepend with 0B for python
    
    # updated version:
    if 'cloud' in mask_type:
        bits   = [66, 74, 130, 136, 194, 200, 202]
    elif 'water' in mask_type:
        bits    = [96, 160, 224] ## [01100000, 10100000, 11100000]
        
    elif 'snow' in mask_type:
        bits    = [80, 144, 208] ## [01010000, 10010000, 11010000]
        
    elif 'aerosol' in mask_type:
        bits = [192]          ## [11000000] only high aerosol component is considered

        
    else:
        raise ValueError("Choose valid mask option")
        
    
    mask = np.isin(fmask_array, bits)
        
        
        
    return mask

# def convert_fmask_unique(raster):
#     """
#     Refer: HLS User Guide v2 Table 9 and Appendix: 
#     For convienent for comparision, conver HLS Fmask to unique values to match with original Fmask outputs
#     """

#     target_values = [[66, 74, 130, 136, 194, 200, 202], #cloud
#                      [68, 72, 132, 144, 196, 200], # cloud shadow
#                     [96, 160, 224], #water [01100000, 10100000, 11100000]
#                      [80, 144, 208], #snow
#                     ]
    

    
#     new_values = [4,2,1,3]

#     for i in range(len(target_values)):                                         
#         mask = np.isin(raster, list(target_values[i]))
#         raster[mask] = new_values[i]

#     mask_nan = ~np.isin(raster, new_values)
#     raster[mask_nan] = np.nan

        
#     return raster

def decode_hls_fmask(fmask):
    """
    Decode the 8-bit Fmask layer from HLS (HLSL30 or HLSS30) into individual masks.

    Parameters
    ----------
    fmask : numpy.ndarray or xarray.DataArray
        The 8-bit Fmask array (values 0–255).

    Returns
    -------
    dict
        Dictionary containing boolean masks for:
        - water
        - snow_ice
        - cloud_shadow
        - adjacent_shadow
        - cloud
    """

    # Ensure we’re working with a NumPy array
    fmask = fmask.values if isinstance(fmask, xr.DataArray) else fmask

    # Convert to uint8 for bitwise ops
    fmask = fmask.astype(np.uint8)


    # --- Bit extraction ---
    # Bits 0–1: Cloud confidence
    cloud_bit = (fmask >> 1) & 0b1

    # Bits 1–2: Cloud Adjencent
    adj_cloud_bit = (fmask >> 2) & 0b1

    # Bits 2–3: Cloud shadow confidence
    cloud_shadow_bit = (fmask >> 3) & 0b1

    # Bits 4–5: Snow/Ice confidence
    snow_bit = (fmask >> 4) & 0b1

    # Bits 6–7: Water confidence
    water_bit = (fmask >> 5) & 0b1

    # Bits 8-: Aerosol confidence
    aerosol_bits = (fmask >> 6) & 0b11

    masks = {
        "aerosol": aerosol_bits,  # 0-3 (level)
        "water": water_bit == 1,
        "snow_ice": snow_bit == 1,
        "cloud_shadow": cloud_shadow_bit == 1,
        "adjacent_cloud_shadow": adj_cloud_bit == 1,
        "cloud": cloud_bit == 1
    }

    return masks


def convert_fmask_unique(fmask):
    masks = decode_hls_fmask(fmask)
    classified = np.full(fmask.shape, np.nan)

    new_values = [1,2,3,4]

    # Assign priorities: water < snow/ice < shadow < adjacent_shadow < cloud
    classified[masks["water"]] = 1
    classified[masks["snow_ice"]] = 3
    classified[masks["cloud_shadow"]] = 2
    classified[masks["cloud"]] = 4

    mask_nan = ~np.isin(classified, new_values)
    classified[mask_nan] = np.nan


    return classified



        
def apply_mask(band_data, mask):
    """
        Apply mask to the band data
    """
    masked_raster = band_data.where(mask!=True)       
    return masked_raster

def get_fmask_info(i):
    """
    Return fmask version info and title based on index.

    Parameters:
        i (int): Index (0, 1, or other).
        day (str, optional): Day string to format (e.g. '2025_11_05').

    Returns:
        dict: {
            'fmask_version': str,
            'title': str,
            'day_text': str or None
        }
    """
    if i == 0:
        fmask_version = ''
        title = 'Reference'

    elif i == 1:
        fmask_version = 'fmask4.7'
        title = 'Fmask 4.7'
        
    elif i == 3:
        fmask_version = 'cirrus'
        title = 'Cirrus band TOA'
        
    else:
        fmask_version = 'fmask5_May2025'
        title = 'Fmask 5'

    return {
        'fmask_version': fmask_version,
        'title': title,
    }


In [None]:
def get_fmask_percentage_compare(files_s3_fmask5, files_s3_fmask4, days):
    
    df_concat_list = list()
    for day in days:
        # convert date to julian date
        day_julian = day
        
        # get list of mrgs_ids within this certain date
        mrgs_ids = np.unique([i.split('.')[2] for i in files_s3_fmask5 if day in i])
    
        # check if we have both fmask4 and fmask5 for the same granule, otherwise skip
        matches = [f for f in files_s3_fmask4 if day_julian in f and any(i in f for i in mrgs_ids) if f.endswith('.tif')]

        #update the list of matched ids
        mrgs_ids = np.unique([i.split('.')[2] for i in matches])
        
        if len(matches) != 5:
            print('No matching for this granules between Fmask 4 and 5 for this mrgs ID on this date '+str(day))
            pass
        else:
            for mrgs in mrgs_ids:
                matches_filtered = [f for f in matches if mrgs in f]
                
    
                file_fmask4 = [i for i in matches_filtered if 'Fmask.tif' in i][0]
                file_fmask5 = [i for i in files_s3_fmask5 if day in i if mrgs in i][0]
                df_list = list()
                for file in [file_fmask4,file_fmask5]:
                    fmask_version = file.split('_')[0]
                    
                    raster = rasterio.open('s3://'+bucket_name+'/'+file).read(1).astype(np.float16)
    
                    # simply the fmask4 to the same unique clases as in fmask 5 for comparision
                    # update: now using both conversion for Fmask 4.7 and 5
                    # if 'Fmask4' in fmask_version:
                    raster = convert_fmask_unique(raster)
                    
                    # mask out 255 to nan
                    raster[raster == 255] = np.nan
                    raster[raster == 0] = np.nan #ignore the land classification
                    
                    
                    df = pd.DataFrame()
                    unique_class = np.unique(raster[~np.isnan(raster)])
                    df['Date'] = [day]*len(unique_class)
                    df['mrgs_id'] = [mrgs]*len(unique_class)
                    df['fmask_version'] = [fmask_version]*len(unique_class)
                    # df['id'] = [day]*len(unique_class)
                    
                    # loop over each code 
                    classes_all = ['Land/NaN','Water','Cloud Shadow','Snow/Ice','Cloud']
                    classes_uni = [classes_all[int(i)] for i in unique_class]
                    
                    percentage_list = list()
                    for code in unique_class:
                        raster_copy = raster.copy()
                        
                        raster_copy[raster_copy == code] = 10
                        raster_copy[raster_copy != 10] = 0
                        
                        # percentage 
                        percentage = np.count_nonzero(raster_copy
                                                       )/(raster_copy.shape[0]*raster_copy.shape[1]
                                                          )*100   
                        percentage_list.append(percentage)                                 
                                                     
                    df['Features'] = classes_uni
                    df['Percentage'] = percentage_list
                    
                    df_list.append(df)
                    
                df_concat = pd.concat(df_list)
                
                df_concat_list.append(df_concat)
      

    df_concat_final = pd.concat(df_concat_list)

    return df_concat_final

In [None]:
def calculate_cloud_coverage_timeseries(fmask4_data, fmask5_data, verbose=True, mask_value=4):
    """
    Calculate coverage percentage for a specific mask value for each time step in Fmask 4.7 and 5.0 datasets.
    
    Can be used for cloud (mask_value=4) or cloud shadow (mask_value=2).
    
    Parameters
    ----------
    fmask4_data : xarray.DataArray or None
        Fmask 4.7 time-series DataArray with dimensions (time, y, x).
        Values: 1=water, 2=cloud_shadow, 3=snow_ice, 4=cloud, NaN=clear.
    fmask5_data : xarray.DataArray or None
        Fmask 5.0 time-series DataArray with dimensions (time, y, x).
        Values: 1=water, 2=cloud_shadow, 3=snow_ice, 4=cloud, NaN=clear.
    verbose : bool, optional
        If True, print the comparison results. Default is True.
    mask_value : int, optional
        Fmask value to calculate coverage for. Default is 4 (cloud).
        Use 2 for cloud shadow, 4 for cloud.
    
    Returns
    -------
    pandas.DataFrame
        DataFrame with columns:
        - time: Time coordinate for each observation
        - fmask4_coverage: Coverage percentage for Fmask 4.7
        - fmask5_coverage: Coverage percentage for Fmask 5.0
        - difference: Difference (Fmask 5.0 - 4.7) in percentage points
    """
    import pandas as pd
    import numpy as np
    
    if fmask4_data is None or fmask5_data is None:
        if verbose:
            print("Warning: One or both Fmask datasets are None. Cannot calculate coverage.")
        return None
    
    # Find common time steps
    common_times = sorted(set(fmask4_data.time.values) & set(fmask5_data.time.values))
    
    if not common_times:
        if verbose:
            print("Warning: No common time steps between Fmask 4.7 and 5.0.")
        return None
    
    results = []
    
    for time_val in common_times:
        fmask4_slice = fmask4_data.sel(time=time_val)
        fmask5_slice = fmask5_data.sel(time=time_val)
        
        # Count pixels for the specified mask_value
        count4 = (fmask4_slice == mask_value).sum().values
        count5 = (fmask5_slice == mask_value).sum().values
        
        # Count total valid (non-NaN) pixels
        valid4_count = (~np.isnan(fmask4_slice)).sum().values
        valid5_count = (~np.isnan(fmask5_slice)).sum().values
        
        # Calculate coverage percentage
        if valid4_count > 0:
            coverage4 = (count4 / valid4_count) * 100.0
        else:
            coverage4 = np.nan
        
        if valid5_count > 0:
            coverage5 = (count5 / valid5_count) * 100.0
        else:
            coverage5 = np.nan
        
        difference = coverage5 - coverage4
        
        results.append({
            'time': time_val,
            'fmask4_coverage': coverage4,
            'fmask5_coverage': coverage5,
            'difference': difference
        })
    
    df = pd.DataFrame(results)
    
    if verbose and len(df) > 0:
        mask_name = "Cloud" if mask_value == 4 else "Cloud Shadow" if mask_value == 2 else f"Mask {mask_value}"
        print(f"{mask_name} Coverage Comparison:")
        print(f"  Mean Fmask 4.7 coverage: {df['fmask4_coverage'].mean():.2f}%")
        print(f"  Mean Fmask 5.0 coverage: {df['fmask5_coverage'].mean():.2f}%")
        print(f"  Mean difference (Fmask 5.0 - 4.7): {df['difference'].mean():.2f}%")
    
    return df
