In [None]:
import numpy as np
from typing import List, Tuple
from datetime import datetime

def fit_periodic_function_with_harmonics_robust(time_series: np.ndarray, qa: np.ndarray, dates: List[datetime], num_harmonics: int = 3, max_iter: int = 5, delta: float = 1.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Fits a periodic function with up to three harmonics to a 3D time series data, 
    weighted by a quality assessment array using the provided dates. Uses iterative reweighted least squares 
    for robust fitting to handle outliers.
    
    Parameters:
    - time_series: np.ndarray, shape (time, width, height), the RGB channel data.
    - qa: np.ndarray, shape (time, width, height), quality assessment data with values between 0 and 1.
    - dates: List[datetime.datetime], dates corresponding to the time series data points.
    - num_harmonics: int, number of harmonics to include in the model (1 to 3).
    - max_iter: int, maximum number of iterations for IRLS.
    - delta: float, parameter for robust loss function (Huber delta).
    
    Returns:
    - amplitude_maps: List[np.ndarray], list of amplitude maps for each harmonic.
    - phase_maps: List[np.ndarray], list of phase maps for each harmonic.
    - offset_map: np.ndarray, constant offset of the fitted function.
    - mae_map: np.ndarray, Mean Absolute Error of the fitted function.
    - residuals_map: np.ndarray, residuals of the fitted function.
    """

    # Convert dates to 'datetime64' and compute normalized time as fraction of year
    times_datetime64 = np.array(dates, dtype='datetime64[D]')
    start_date = times_datetime64[0]
    days_since_start = (times_datetime64 - start_date).astype(int)
    t_normalized = days_since_start / 365.25  # Normalize to fraction of year

    # Initial design matrix with harmonics and constant term
    harmonics = []
    for k in range(1, num_harmonics + 1):
        t_radians = 2 * np.pi * k * t_normalized
        harmonics.extend([np.cos(t_radians), np.sin(t_radians)])

    A = np.stack(harmonics + [np.ones_like(t_normalized)], axis=-1)  # Design matrix

    # Reshape time_series and qa for vectorized operations
    pixels = time_series.reshape(time_series.shape[0], -1)
    weights = qa.reshape(qa.shape[0], -1)

    for _ in range(max_iter):
        # Broadcasting for weighted design matrix
        A_expanded = np.expand_dims(A, 2)
        weights_expanded = np.expand_dims(weights, 1)
        A_weighted = A_expanded * weights_expanded

        # Compute the normal equation components
        ATA = np.einsum('ijk,ilk->jlk', A_weighted, A_expanded)
        ATb = np.einsum('ijk,ik->jk', A_weighted, pixels)

        # Solve for parameters
        ATA_reshaped = ATA.transpose(2, 0, 1)
        ATb_reshaped = ATb.T
        params = np.array([solve_params(ATA_reshaped[i], ATb_reshaped[i]) for i in range(ATA_reshaped.shape[0])])
        num_params = 2 * num_harmonics + 1
        params_reshaped = params.reshape(time_series.shape[1], time_series.shape[2], num_params).transpose(2, 0, 1)

        # Calculate fitted values and residuals
        fitted_values = np.dot(A, params.T)  # Shape: (time, n_pixels)
        residuals = pixels - fitted_values

        # Update weights based on residuals using Huber loss
        residuals_abs = np.abs(residuals)
        new_weights = np.where(residuals_abs < delta, 
                               1, 
                               delta / residuals_abs)
        weights = np.minimum(weights, new_weights)

    # Extract amplitude and phase maps
    amplitude_maps = []
    phase_maps = []

    for i in range(num_harmonics):
        A_params = params_reshaped[2 * i]
        B_params = params_reshaped[2 * i + 1]
        amplitude_map = np.sqrt(A_params**2 + B_params**2)
        phase_map = np.arctan2(B_params, A_params)

        # Adjust and normalize phases
        phase_adjusted = (phase_map - (2 * np.pi * (i + 1) * t_normalized[0])) % (2 * np.pi)
        phase_normalized = np.where(phase_adjusted > np.pi, phase_adjusted - 2 * np.pi, phase_adjusted)

        amplitude_maps.append(amplitude_map.reshape(time_series.shape[1], time_series.shape[2]))
        phase_maps.append(phase_normalized.reshape(time_series.shape[1], time_series.shape[2]))

    # Offset map
    offset_map = params_reshaped[-1].reshape(time_series.shape[1], time_series.shape[2])

    # Calculate MAE (Mean Absolute Error)
    mae = np.mean(np.abs(residuals), axis=0)
    mae_map = mae.reshape(time_series.shape[1], time_series.shape[2])

    # Reshape residuals to match original dimensions
    residuals_map = residuals.reshape(time_series.shape[0], time_series.shape[1], time_series.shape[2])

    return (*amplitude_maps, *phase_maps, offset_map, mae_map, residuals_map)

def solve_params(ATA: np.ndarray, ATb: np.ndarray) -> np.ndarray:
    """ Solve linear equations with error handling for non-invertible cases. """
    try:
        return np.linalg.solve(ATA, ATb)
    except np.linalg.LinAlgError:
        return np.full(ATb.shape, np.nan)  # Return NaN for non-invertible matrices


inverse_dfunc = {
    'rgb': lambda x: x / 65535 if x.dtype == np.uint16 else x,
    'ndvi': lambda x: 2 * (x - 0.5),
    'gndvi': lambda x: 2 * (x - 0.5),
    'ndwi': lambda x: 2 * (x - 0.5),
    'ndmi': lambda x: 2 * (x - 0.5),
    'nbr': lambda x: 2 * (x - 0.5),
    'ndre': lambda x: 2 * (x - 0.5),
    'evi': lambda x: 2 * (x - 0.5) / 2.5,  # for EVI, apply the inverse of the entire transformation
    'crswir': lambda x: x * 5,  # scale back crswir
}


# Compare both fitting method

In [None]:
import os 
import rasterio
import time 
from tqdm import tqdm
from datetime import datetime
from utils import load_folder, get_aspect, calculate_slope_with_dates, postprocess_cloud_mask, postprocess_qa_vegetation
import numpy as np
from tqdm import tqdm
from scipy.stats import zscore
from typing import Dict, Tuple, List
from datetime import datetime

def detect_outliers_and_adjust_weights(data: np.ndarray, weights: np.ndarray, threshold: float = 3.0) -> np.ndarray:
    """
    Detects outliers based on z-scores and adjusts weights accordingly.
    
    Parameters:
    - data: np.ndarray, the data for which outliers need to be detected.
    - threshold: float, z-score threshold for outlier detection.
    
    Returns:
    - adjusted_weights: np.ndarray, weights adjusted to reduce influence of outliers.
    """
    # Calculate z-scores for the data
    z_scores = zscore(data, axis=0)
    
    # Identify outliers based on z-score threshold
    is_outlier = np.abs(z_scores) > threshold
    
    # Reduce weight for outliers
    weights[is_outlier] = 0.001
    
    return weights 

indices = ['rgb', 'crswir', 'ndvi', 'gndvi', 'evi', 'nbr']
dir_ = '/Users/arthurcalvi/Data/species/validation/tiles_2_5_km'
error_files = []
folder = os.listdir(dir_)[3]
path = os.path.join(dir_, folder)
max_iter = 2

if os.path.isdir(path):
    if os.path.exists(os.path.join(path, 'features')):
        print('Features already extracted.')
    else:
    # if True:
        print(f'Processing {folder}...')
        dates = [datetime.strptime(filename.split('_')[0], '%Y-%m-%d') for filename in os.listdir(os.path.join(path, 'rgb'))]
        dates.sort()
        dict_indices = {index: inverse_dfunc[index](load_folder(os.path.join(path, index))) for index in indices}

        qa = load_folder(os.path.join(path, 'qa'))
        vegetation_mask = load_folder(os.path.join(path, 'qa'), postprocess_qa_vegetation, {}).mean(axis=0)

        dir_dem = os.path.join(path, 'dem.tif')
        raster = rasterio.open(dir_dem)
        dem = raster.read(1)
        aspect = get_aspect(dem)

        # Process QA mask
        new_qa = []
        for frame in tqdm(qa):
            frame_ = np.zeros_like(frame)
            mask = (frame == 1) | (frame == 3) | (frame == 6) | (frame == 8) | (frame == 9) | (frame == 10) | (frame == 11)
            frame_[mask] = 1
            frame_ = postprocess_cloud_mask(frame_, 5, 25)
            new_qa.append(frame_)
        qa_mask = np.array(new_qa)
        qa_mask = 1 - qa_mask

        # Calculate slopes for disturbance detection
        K = 6
        slopes = [calculate_slope_with_dates(dict_indices['rgb'][:, 0], dates, i, K) for i in tqdm(range(dict_indices['rgb'].shape[0]))]
        slopes = np.array(slopes)
        zero_mask = np.zeros_like(qa_mask[0], dtype=int)
        expanded_mask_with_disturbances = qa_mask.copy()
        for i in range(8, len(slopes)):
            zero_mask[abs(slopes[i]) > 10] = 1
            expanded_mask_with_disturbances[i, zero_mask.astype(bool)] = 0.001

        # Detect outliers and adjust weights
        weights = detect_outliers_and_adjust_weights(dict_indices['crswir'], expanded_mask_with_disturbances, 3.0) 

        # Fit periodic functions with two different methods
        #1 harmonic 
        start_1 = time.time()
        amplitude_map, phase_map, offset_map, mae_map, residuals_map = fit_periodic_function_with_harmonics_robust(dict_indices['crswir'], weights, dates, 1, max_iter)
        end_1 = time.time()
        print(f'Elapsed time for first method: {end_1 - start_1}')
        #2 harmonics
        start_2 = time.time()   
        amplitude_map_21, amplitude_map_22, phase_map_21, phase_map_22, offset_map_2, mae_map_2, residuals_map_2 = fit_periodic_function_with_harmonics_robust(dict_indices['crswir'], weights, dates, 2, max_iter)
        end_2 = time.time()
        print(f'Elapsed time for second method: {end_2 - start_2}')
        #3 harmonics
        start_3 = time.time()
        amplitude_map_31, amplitude_map_32, amplitude_map_33, phase_map_31, phase_map_32, phase_map_33, offset_map_3, mae_map_3, residuals_map_3 = fit_periodic_function_with_harmonics_robust(dict_indices['crswir'], weights, dates, 3, max_iter)
        end_3 = time.time()
        print(f'Elapsed time for third method: {end_3 - start_3}')

In [None]:
import matplotlib.pyplot as plt
from utils import normalize
#show the 7 maps created on two different rows (one row = one method)
fig, axs = plt.subplots(3, 8, figsize=(20, 10))

im = axs[0, 0].imshow(amplitude_map, cmap='viridis', vmin=0, vmax=65535)
axs[0, 0].set_title('Amplitude 1')
#add colorbar shrink = 0.5 horizontal 
fig.colorbar(im, ax=axs[0, 0], orientation='horizontal', shrink=0.5)
im = axs[0, 1].imshow(phase_map, cmap='viridis')
axs[0, 1].set_title('Phase 1')
fig.colorbar(im, ax=axs[0, 1], orientation='horizontal', shrink=0.5)
im = axs[0, 2].imshow(offset_map, cmap='viridis', vmin=0, vmax=65535)
axs[0, 2].set_title('Offset 2')
fig.colorbar(im, ax=axs[0, 2], orientation='horizontal', shrink=0.5)
im = axs[0, 3].imshow(mae_map, cmap='viridis', vmin=0, vmax=65535)
axs[0, 3].set_title('Sigma 2')
fig.colorbar(im, ax=axs[0, 3], orientation='horizontal', shrink=0.5)
axs[0, 4].imshow(2 * normalize(dict_indices['rgb'][-1].transpose(1, 2, 0)))
axs[0, 4].set_title('RGB')
im = axs[0, 5].imshow(aspect, cmap='viridis')
axs[0, 5].set_title('Aspect')
fig.colorbar(im, ax=axs[0, 5], orientation='horizontal', shrink=0.5)

im = axs[1, 0].imshow(amplitude_map_21, cmap='viridis', vmin=0, vmax=65535)
axs[1, 0].set_title('Amplitude 21')
fig.colorbar(im, ax=axs[1, 0], orientation='horizontal', shrink=0.5)
im = axs[1, 1].imshow(phase_map_21, cmap='viridis')
axs[1, 1].set_title('Phase 21')
fig.colorbar(im, ax=axs[1, 1], orientation='horizontal', shrink=0.5)
im = axs[1, 2].imshow(offset_map_2, cmap='viridis', vmin=0, vmax=65535)
axs[1, 2].set_title('Offset 2')
fig.colorbar(im, ax=axs[1, 2], orientation='horizontal', shrink=0.5)
im = axs[1, 3].imshow(mae_map_2, cmap='viridis', vmin=0, vmax=65535)
axs[1, 3].set_title('Sigma 2')
fig.colorbar(im, ax=axs[1, 3], orientation='horizontal', shrink=0.5)
axs[1, 4].imshow(amplitude_map_22, cmap='viridis', vmin=0, vmax=65535)
axs[1, 4].set_title('Amplitude 22')
fig.colorbar(im, ax=axs[1, 4], orientation='horizontal', shrink=0.5)
im = axs[1, 5].imshow(phase_map_22, cmap='viridis')
axs[1, 5].set_title('Phase 22')
fig.colorbar(im, ax=axs[1, 5], orientation='horizontal', shrink=0.5)

im = axs[2, 0].imshow(amplitude_map_31, cmap='viridis', vmin=0, vmax=65535)
axs[2, 0].set_title('Amplitude 31')
fig.colorbar(im, ax=axs[2, 0], orientation='horizontal', shrink=0.5)
im = axs[2, 1].imshow(phase_map_31, cmap='viridis')
axs[2, 1].set_title('Phase 31')
fig.colorbar(im, ax=axs[2, 1], orientation='horizontal', shrink=0.5)
im = axs[2, 2].imshow(offset_map_3, cmap='viridis', vmin=0, vmax=65535)
axs[2, 2].set_title('Offset 3')
fig.colorbar(im, ax=axs[2, 2], orientation='horizontal', shrink=0.5)
im = axs[2, 3].imshow(mae_map_3, cmap='viridis', vmin=0, vmax=65535)
axs[2, 3].set_title('Sigma 3')
fig.colorbar(im, ax=axs[2, 3], orientation='horizontal', shrink=0.5)
im = axs[2, 4].imshow(amplitude_map_32, cmap='viridis', vmin=0, vmax=65535)
axs[2, 4].set_title('Amplitude 32')
fig.colorbar(im, ax=axs[2, 4], orientation='horizontal', shrink=0.5)
im = axs[2, 5].imshow(phase_map_32, cmap='viridis')
axs[2, 5].set_title('Phase 32')
fig.colorbar(im, ax=axs[2, 5], orientation='horizontal', shrink=0.5)
im = axs[2, 6].imshow(amplitude_map_33, cmap='viridis', vmin=0, vmax=65535)
axs[2, 6].set_title('Amplitude 33')
fig.colorbar(im, ax=axs[2, 6], orientation='horizontal', shrink=0.5)
im = axs[2, 7].imshow(phase_map_33, cmap='viridis')
axs[2, 7].set_title('Phase 33')
fig.colorbar(im, ax=axs[2, 7], orientation='horizontal', shrink=0.5)

for ax in axs:
    for a in ax:
        a.axis('off')

plt.tight_layout()

In [None]:
print(f'1 harmonic - mae mean = {mae_map.mean()}, mae std = {mae_map.std()}')
print(f'2 harmonics - mae mean = {mae_map_2.mean()}, mae std = {mae_map_2.std()}')
print(f'3 harmonics - mae mean = {mae_map_3.mean()}, mae std = {mae_map_3.std()}')

# Compute the forest mask

vegetation_mask > 0.25 semble etre une bonne valeur pour la foret sans enlever trop pixels de foret

In [None]:
#load forest 

path_CHM = '/Users/arthurcalvi/Data/Disturbances_maps/FORMS/int2/2020_l93.tif'
chm2020 = rasterio.open(path_CHM) #cm 
# array = chm2020.read(1)


In [None]:
import rasterio
from rasterio.warp import transform_bounds, reproject, Resampling
from rasterio.windows import from_bounds
import numpy as np

def extract_and_reproject_window(big_raster: rasterio.io.DatasetReader, 
                                 small_raster: rasterio.io.DatasetReader):
    """
    Extract a window from the big raster within the bounds of the small raster and reproject it 
    to the CRS of the small raster.

    Args:
        big_raster (rasterio.io.DatasetReader): The large raster (e.g., chm2020) already opened with rasterio.
        small_raster (rasterio.io.DatasetReader): The small raster already opened with rasterio.

    Returns:
        numpy.ndarray: The reprojected windowed data.
        rasterio.Affine: The transform of the reprojected data.
    """
    # Get the bounds and CRS of the small raster
    small_bounds = small_raster.bounds
    small_crs = small_raster.crs

    # Reproject the bounds of the small raster to the CRS of the big raster
    reprojected_bounds = transform_bounds(small_crs, big_raster.crs, *small_bounds)

    # Create a window from the reprojected bounds
    window = from_bounds(*reprojected_bounds, transform=big_raster.transform)

    # Read the data within this window
    windowed_data = big_raster.read(window=window)
    window_transform = big_raster.window_transform(window)

    # Prepare an array to hold the reprojected data
    reprojected_data = np.empty(shape=(big_raster.count, 
                                       small_raster.height, 
                                       small_raster.width), dtype=windowed_data.dtype)

    # Reproject the windowed data to the CRS of the small raster
    reproject(
        source=windowed_data,
        destination=reprojected_data,
        src_transform=window_transform,
        src_crs=big_raster.crs,
        dst_transform=small_raster.transform,
        dst_crs=small_crs,
        resampling=Resampling.nearest 

    )

    mask = (reprojected_data > 10_000) & (reprojected_data < 35_000)
    reprojected_data[mask] = reprojected_data.mean()

    return reprojected_data, small_raster.transform

import numpy as np
from scipy.interpolate import griddata




chm2020_cropped, transform = extract_and_reproject_window(chm2020, raster)

#Build forest mask 
vegetation_mask = (vegetation_mask > 0.25).astype(bool)
mask = (chm2020_cropped > 500) & vegetation_mask 

fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].imshow(3 * normalize(dict_indices['rgb'][-1].transpose(1, 2, 0)))
axes[0].set_title('RGB')
axes[1].imshow(mask.squeeze(), cmap='viridis')
axes[1].set_title('Forest mask')

for ax in axes:
    ax.axis('off')

# Time series

In [None]:
import numpy as np
from typing import List, Tuple
from datetime import datetime

def reconstruct_signal(amplitudes: List[np.ndarray], phases: List[np.ndarray], offset_map: np.ndarray, dates: List[datetime], pixel_coords: Tuple[int, int]) -> Tuple[np.ndarray, np.ndarray]:
    """
    Reconstructs the time series signal for a specific pixel using the fitted parameters.

    Parameters:
    - amplitudes: List[np.ndarray], list of amplitude maps for each harmonic.
    - phases: List[np.ndarray], list of phase maps for each harmonic.
    - offset_map: np.ndarray, constant offset of the fitted function.
    - dates: List[datetime.datetime], dates corresponding to the time series data points.
    - pixel_coords: Tuple[int, int], (x, y) coordinates of the pixel in the image.
    
    Returns:
    - reconstructed_signal: np.ndarray, the reconstructed signal for the specified pixel.
    - times_datetime64: np.ndarray, the dates corresponding to the time series data points.
    """
    
    # Convert dates to 'datetime64' and compute normalized time as fraction of year
    times_datetime64 = np.array(dates, dtype='datetime64[D]')
    # Create times_datetime64 with regular time step every week
    times_datetime64 = np.arange(times_datetime64[0], times_datetime64[-1] + np.timedelta64(7, 'D'), np.timedelta64(7, 'D'))
    start_date = times_datetime64[0]
    days_since_start = (times_datetime64 - start_date).astype(int)
    t_normalized = days_since_start / 365.25  # Normalize to fraction of year

    # Initialize the signal with the offset
    signal = offset_map[pixel_coords[0], pixel_coords[1]] * np.ones_like(t_normalized)

    # Add harmonics to the signal
    for i, (amp_map, phase_map) in enumerate(zip(amplitudes, phases)):
        # Convert normalized time to radians for this harmonic
        t_radians = 2 * np.pi * (i + 1) * t_normalized
        A = amp_map[pixel_coords[0], pixel_coords[1]] * np.cos(phase_map[pixel_coords[0], pixel_coords[1]])
        B = amp_map[pixel_coords[0], pixel_coords[1]] * np.sin(phase_map[pixel_coords[0], pixel_coords[1]])
        
        signal += A * np.cos(t_radians) + B * np.sin(t_radians)

    return signal, times_datetime64

pixel = (50, 10)
signal1, time = reconstruct_signal([amplitude_map], [phase_map], offset_map, dates, pixel)
signal2, _ = reconstruct_signal([amplitude_map_21, amplitude_map_22], [phase_map_21, phase_map_22], offset_map_2, dates, pixel)
signal3, _ = reconstruct_signal([amplitude_map_31, amplitude_map_32, amplitude_map_33], [phase_map_31, phase_map_32, phase_map_33], offset_map_3, dates, pixel)

fig, ax = plt.subplots(3, 1, figsize=(20, 10))

ax[0].plot(time, signal1, label='Method 1')
ax[0].set_title('Reconstructed signal using method 1')
for i in range(len(dates)):
    ax[0].scatter(dates[i], dict_indices['crswir'][i, pixel[0], pixel[1]], color=(0,0,0, expanded_mask_with_disturbances[i, pixel[0], pixel[1]]))
text = f'A1 = {amplitude_map[pixel[0], pixel[1]]:.2f}, P1 = {phase_map[pixel[0], pixel[1]]:.2f}, C = {offset_map[pixel[0], pixel[1]]:.2f}'
ax[0].text(0.5, 0.9, text, horizontalalignment='center', verticalalignment='center', transform=ax[0].transAxes)

#show residual

ax[1].plot(time, signal2, label='Method 2')
ax[1].set_title('Reconstructed signal using method 2')
ax[1].scatter(dates, dict_indices['crswir'][:, pixel[0], pixel[1]], label='Original signal', color='k')
text = f'A1 = {amplitude_map_21[pixel[0], pixel[1]]:.2f}, P1 = {phase_map_21[pixel[0], pixel[1]]:.2f}, A2 = {amplitude_map_22[pixel[0], pixel[1]]:.2f}, P2 = {phase_map_22[pixel[0], pixel[1]]:.2f}, C = {offset_map_2[pixel[0], pixel[1]]:.2f}'
ax[1].text(0.5, 0.9, text, horizontalalignment='center', verticalalignment='center', transform=ax[1].transAxes)

ax[2].plot(time, signal3, label='Method 3')
ax[2].set_title('Reconstructed signal using method 3')
ax[2].scatter(dates, dict_indices['crswir'][:, pixel[0], pixel[1]], label='Original signal', color='k')
text = f'A1 = {amplitude_map_31[pixel[0], pixel[1]]:.2f}, P1 = {phase_map_31[pixel[0], pixel[1]]:.2f}, A2 = {amplitude_map_32[pixel[0], pixel[1]]:.2f}, P2 = {phase_map_32[pixel[0], pixel[1]]:.2f}, A3 = {amplitude_map_33[pixel[0], pixel[1]]:.2f}, P3 = {phase_map_33[pixel[0], pixel[1]]:.2f}, C = {offset_map_3[pixel[0], pixel[1]]:.2f}'
ax[2].text(0.5, 0.9, text, horizontalalignment='center', verticalalignment='center', transform=ax[2].transAxes)

for a in ax:
    a.legend()
    a.grid()
    a.spines['top'].set_visible(False)
    a.spines['right'].set_visible(False)

plt.tight_layout()
