In [1]:
#import packages
import rasterio
import numpy as np
import os

In [2]:
def read_raster(file_path):
    """Reads raster data, ensures numeric data, and handles NoData values."""
    with rasterio.open(file_path) as src:
        raster_data = src.read(1)  # Read the first band
        affine = src.transform
        nodata_value = src.nodata  # Get the NoData value

    # Ensure data is numeric
    if not np.issubdtype(raster_data.dtype, np.number):
        raise TypeError(f"Unsupported raster data type: {raster_data.dtype}")

    # Mask invalid values (NaN, inf, and optionally NoData)
    valid_mask = np.isfinite(raster_data)
    
    if nodata_value is not None:
        valid_mask &= raster_data != nodata_value  # Mask out NoData values

    if not np.any(valid_mask):
        raise ValueError("Raster contains only invalid or missing values.")

    # Extract valid data and indices
    valid_data = raster_data[valid_mask]
    valid_indices = np.argwhere(valid_mask)

    return valid_data, valid_indices, affine

def create_weights(valid_indices, affine, k=4):
    """Creates spatial weights dictionary using k-nearest neighbors."""
    coords = [tuple(affine * (col, row)) for row, col in valid_indices]
    weights = {}
    for i, coord in enumerate(coords):
        distances = np.sqrt(np.sum((np.array(coords) - coord) ** 2, axis=1))
        nearest_neighbors = np.argsort(distances)[1:k+1]  # Skip self (distance = 0)
        weights[i] = nearest_neighbors
    return weights

def calculate_morans_i(data, weights):
    """Calculates Moran's I manually."""
    n = len(data)
    mean_value = np.mean(data)
    deviations = data - mean_value

    # Numerator: Spatially weighted covariance
    num = 0
    for i, neighbors in weights.items():
        for neighbor in neighbors:
            num += deviations[i] * deviations[neighbor]

    # Denominator: Variance
    denom = np.sum(deviations ** 2)

    # Weight normalization factor (sum of weights)
    w_sum = sum(len(neighbors) for neighbors in weights.values())

    # Moran's I formula
    morans_i = (n / w_sum) * (num / denom)
    return morans_i

def process_years(resolution, years, file_template, k=4):
    """Iterates through years and calculates Moran's I for a given resolution."""
    results = {}
    for year in years:
        file_path = file_template.format(resolution=resolution, year=year)
        try:
            print(f"Processing {file_path}...")
            data, indices, affine = read_raster(file_path)
            weights = create_weights(indices, affine, k=k)
            moran_i = calculate_morans_i(data, weights)
            results[year] = moran_i
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            continue
    return results


In [3]:
# test read_raster
file_path = "vv_outputs/2020_vv_10.tif"
data, indices, affine = read_raster(file_path)
print(f"Data sample: {data[:10]}")
print(f"Number of valid pixels: {len(data)}")


Data sample: [134.  65.  86.  86. 106. 113. 113.  74.  89.  58.]
Number of valid pixels: 744083


In [None]:
# test create_weights
weights = create_weights(indices, affine, k=4)
print(f"Sample weights for first 5 pixels: {list(weights.items())[:5]}")

In [23]:
# parameters
years = range(2015, 2025)  # 2015–2024
file_template = "vv_outputs/{year}_vv_{resolution}.tif"
resolutions = ["10", "5"]


In [24]:
all_results = {}
for resolution in resolutions:
    print(f"Processing resolution: {resolution}")
    all_results[resolution] = process_years(resolution, years, file_template)

# Print results
for resolution, results in all_results.items():
    print(f"\nResults for {resolution} resolution:")
    for year, moran_i in results.items():
        print(f"Year {year}: Moran's I = {moran_i:.4f}")


Processing resolution: 10
Processing vv_outputs/2015_vv_10.tif...


KeyboardInterrupt: 