In [1]:
import xarray as xr
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from scipy.stats import pearsonr

In [2]:
# Preprocess function
def preprocess(ds):
    if "t2m" in ds:
        ds["t2m"] = ds["t2m"] - 273.15
        ds["t2m"].attrs["units"] = "Celsius"
    if "msl" in ds:
        ds["msl"] = ds["msl"] / 100.0
        ds["msl"].attrs["units"] = "hPa"
    if "tp" in ds:
        ds = ds.drop_vars("tp")
    return ds

In [None]:
# marsfc - Compute the Pearson Correlation Coefficient per lead time per grid cell across all 12 months

In [None]:
forecast_files = sorted([
    "../../Surface Variables/20240101/20240101_marsfc_sv_q.nc", "../../Surface Variables/20240201/20240201_marsfc_sv_q.nc", 
    "../../Surface Variables/20240301/20240301_marsfc_sv_q.nc", "../../Surface Variables/20240401/20240401_marsfc_sv_q.nc",
    "../../Surface Variables/20240501/20240501_marsfc_sv_q.nc", "../../Surface Variables/20240601/20240601_marsfc_sv_q.nc", 
    "../../Surface Variables/20240701/20240701_marsfc_sv_q.nc", "../../Surface Variables/20240801/20240801_marsfc_sv_q.nc",
    "../../Surface Variables/20240901/20240901_marsfc_sv_q.nc", "../../Surface Variables/20241001/20241001_marsfc_sv_q.nc", 
    "../../Surface Variables/20241101/20241101_marsfc_sv_q.nc", "../../Surface Variables/20241201/20241201_marsfc_sv_q.nc"
])

truth_files = sorted([
    "../../Surface Variables/20240101/20240101_era5_fc_sv_q.nc", "../../Surface Variables/20240201/20240201_era5_fc_sv_q.nc", 
    "../../Surface Variables/20240301/20240301_era5_fc_sv_q.nc", "../../Surface Variables/20240401/20240401_era5_fc_sv_q.nc",
    "../../Surface Variables/20240501/20240501_era5_fc_sv_q.nc", "../../Surface Variables/20240601/20240601_era5_fc_sv_q.nc", 
    "../../Surface Variables/20240701/20240701_era5_fc_sv_q.nc", "../../Surface Variables/20240801/20240801_era5_fc_sv_q.nc",
    "../../Surface Variables/20240901/20240901_era5_fc_sv_q.nc", "../../Surface Variables/20241001/20241001_era5_fc_sv_q.nc", 
    "../../Surface Variables/20241101/20241101_era5_fc_sv_q.nc", "../../Surface Variables/20241201/20241201_era5_fc_sv_q.nc"
])

variables = ['t2m', 'q', 'u10', 'v10', 'msl']
results = {}

def daskpreprocess(ds):
    return ds.chunk({'time': 41, 'latitude': 64, 'longitude': 128})


# Vectorized pearson function
def pearsonr_ufunc(x, y):
    return np.corrcoef(x, y)[0, 1] if np.all(np.isfinite(x)) and np.all(np.isfinite(y)) else np.nan

for var in variables:
    print(f"Starting for {var}")
    fc_stack = []
    truth_stack = []

    for f_path, t_path in zip(forecast_files, truth_files):
        print(f"Loading {f_path}")
        ds_f = preprocess(xr.open_dataset(f_path))
        ds_t = preprocess(xr.open_dataset(t_path))

        ds_t = ds_t.rename({"valid_time": "time"})

         # assign numbers for timesteps instead of datetime stamps
        ds_f = ds_f.assign_coords(time=np.arange(len(ds_f.time)).astype("float64"))
        ds_t = ds_t.assign_coords(time=np.arange(len(ds_t.time)).astype("float64"))

        # Apply dask chunking
        ds_f = daskpreprocess(ds_f)
        ds_t = daskpreprocess(ds_t)

        forecast, truth = ds_f[var], ds_t[var]
        forecast, truth = xr.align(forecast, truth)

        fc_stack.append(forecast)
        truth_stack.append(truth)
        print("Stacked")

    
    forecast = xr.concat(fc_stack, dim='month')
    truth = xr.concat(truth_stack, dim='month')
    print("joined all Stacked together")

    print(f"  computing correlation for {var}")

    # Rechunk so 'month' is in one block
    forecast = forecast.chunk({'month': -1})
    truth = truth.chunk({'month': -1})
    
    # Compute correlation across 'month' dimension for each time/lat/lon
    corr = xr.apply_ufunc(
        pearsonr_ufunc,
        forecast,
        truth,
        input_core_dims=[['month'], ['month']],
        vectorize=True,
        dask='parallelized',
        output_dtypes=[float],
    )
    print("computed correlation")

    # Assign coordinates for clarity
    corr = corr.assign_coords({
    "time": forecast.time,
    "latitude": forecast.latitude,
    "longitude": forecast.longitude
    })

    results[var] = corr
    print(f"✅ done with {var}")

# Convert dictionary to dataset
ds_out = xr.Dataset(results)
print("results turned into Dataset")
# ds_out.load()
#print("💾 Saving to NetCDF...")
#ds_out.to_netcdf('Global_marsfc_CC_MAP_leadtimes.nc')
#print("✅ Correlation dataset saved as 'Global_marsfc_CC_MAP_leadtimes.nc'")

In [None]:
# Save variables individually to avoid loading everything into memory
for var in ds_out.data_vars:
    print(f"Starting for {var}")
    # compute() and load() load to the memory
    ds_out[var] = ds_out[var].compute()
    print(f"Loaded {var} to memory")
print(f"Start saving to netcdf")
ds_out.to_netcdf("Global_marsfc_CC_MAP_leadtimes.nc")

In [3]:
forecast_files = sorted([
   # "../../Surface Variables/20240101/20240101_marsai_sv_q.nc", "../../Surface Variables/20240201/20240201_marsai_sv_q.nc", 
    "../../Surface Variables/20240301/20240301_marsai_sv_q.nc", "../../Surface Variables/20240401/20240401_marsai_sv_q.nc",
    "../../Surface Variables/20240501/20240501_marsai_sv_q.nc", "../../Surface Variables/20240601/20240601_marsai_sv_q.nc", 
    "../../Surface Variables/20240701/20240701_marsai_sv_q.nc", "../../Surface Variables/20240801/20240801_marsai_sv_q.nc",
    "../../Surface Variables/20240901/20240901_marsai_sv_q.nc", "../../Surface Variables/20241001/20241001_marsai_sv_q.nc", 
    "../../Surface Variables/20241101/20241101_marsai_sv_q.nc", "../../Surface Variables/20241201/20241201_marsai_sv_q.nc"
])

truth_files = sorted([
 #   "../../Surface Variables/20240101/20240101_era5_gcai_sv_q.nc", "../../Surface Variables/20240201/20240201_era5_gcai_sv_q.nc", 
    "../../Surface Variables/20240301/20240301_era5_gcai_sv_q.nc", "../../Surface Variables/20240401/20240401_era5_gcai_sv_q.nc",
    "../../Surface Variables/20240501/20240501_era5_gcai_sv_q.nc", "../../Surface Variables/20240601/20240601_era5_gcai_sv_q.nc", 
    "../../Surface Variables/20240701/20240701_era5_gcai_sv_q.nc", "../../Surface Variables/20240801/20240801_era5_gcai_sv_q.nc",
    "../../Surface Variables/20240901/20240901_era5_gcai_sv_q.nc", "../../Surface Variables/20241001/20241001_era5_gcai_sv_q.nc", 
    "../../Surface Variables/20241101/20241101_era5_gcai_sv_q.nc", "../../Surface Variables/20241201/20241201_era5_gcai_sv_q.nc"
])

variables = ['t2m', 'q', 'u10', 'v10', 'msl']
results = {}

#def daskpreprocess(ds):
#    return ds.chunk({'time': 41, 'latitude': 64, 'longitude': 128})


# Vectorized pearson function
def pearsonr_ufunc(x, y):
    return np.corrcoef(x, y)[0, 1] if np.all(np.isfinite(x)) and np.all(np.isfinite(y)) else np.nan

for var in variables:
    print(f"Starting for {var}")
    fc_stack = []
    truth_stack = []

    for f_path, t_path in zip(forecast_files, truth_files):
        print(f"Loading {f_path}")
        ds_f = preprocess(xr.open_dataset(f_path))
        ds_t = preprocess(xr.open_dataset(t_path))

        ds_t = ds_t.rename({"valid_time": "time"})

         # assign numbers for timesteps instead of datetime stamps
        ds_f = ds_f.assign_coords(time=np.arange(len(ds_f.time)).astype("float64"))
        ds_t = ds_t.assign_coords(time=np.arange(len(ds_t.time)).astype("float64"))

        # Apply dask chunking
      #  ds_f = daskpreprocess(ds_f)
      #  ds_t = daskpreprocess(ds_t)

        forecast, truth = ds_f[var], ds_t[var]
        forecast, truth = xr.align(forecast, truth)

        fc_stack.append(forecast)
        truth_stack.append(truth)
        print("Stacked")

    
    forecast = xr.concat(fc_stack, dim='month')
    truth = xr.concat(truth_stack, dim='month')
    print("joined all Stacked together")

    print(f"  computing correlation for {var}")

    # Rechunk so 'month' is in one block
   # forecast = forecast.chunk({'month': -1})
  #  truth = truth.chunk({'month': -1})
    
    # Compute correlation across 'month' dimension for each time/lat/lon
    corr = xr.apply_ufunc(
        pearsonr_ufunc,
        forecast,
        truth,
        input_core_dims=[['month'], ['month']],
        vectorize=True,
      #  dask='parallelized',
        output_dtypes=[float],
    )
    print("computed correlation")

    # Assign coordinates for clarity
    corr = corr.assign_coords({
    "time": forecast.time,
    "latitude": forecast.latitude,
    "longitude": forecast.longitude
    })

    results[var] = corr
    print(f"✅ done with {var}")

# Convert dictionary to dataset
ds_out = xr.Dataset(results)
print("results turned into Dataset")
# ds_out.load()
print("💾 Saving to NetCDF...")
ds_out.to_netcdf('Global_marsai_CC_MAP_leadtimes.nc')
#print("✅ Correlation dataset saved as 'Global_marsfc_CC_MAP_leadtimes.nc'")

Starting for t2m
Loading ../../Surface Variables/20240301/20240301_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240401/20240401_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240501/20240501_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240601/20240601_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240701/20240701_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240801/20240801_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20240901/20240901_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20241001/20241001_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20241101/20241101_marsai_sv_q.nc
Stacked
Loading ../../Surface Variables/20241201/20241201_marsai_sv_q.nc
Stacked
joined all Stacked together
  computing correlation for t2m


KeyboardInterrupt: 

In [4]:
# Save variables individually to avoid loading everything into memory
for var in ds_out.data_vars:
    print(f"Starting for {var}")
    # compute() and load() load to the memory
    ds_out[var] = ds_out[var].compute()
    print(f"Loaded {var} to memory")
print(f"Start saving to netcdf")
ds_out.to_netcdf("Global_marsai_CC_MAP_leadtimes.nc")

Starting for t2m



KeyboardInterrupt



In [6]:
ds_out.to_netcdf("Global_marsai_CC_MAP_leadtimes.nc")


KeyboardInterrupt

