## 2026 EY AI & Data Challenge - TerraClimate Data Extraction Notebook

This notebooks demonstrates how to access the TerraClimate dataset. TerraClimate is a dataset of monthly climate and climatic water balance for global terrestrial surfaces from 1958 to the present. These data provide important inputs for ecological and hydrological studies at global scales that require high spatial resolution and time-varying data. All data have monthly temporal resolution and a ~4-km (1/24th degree) spatial resolution. This dataset is provided in Zarr format. 

For more information, visit: [terraclimate- overview](https://planetarycomputer.microsoft.com/dataset/terraclimate#overview) 

## Load In Dependencies
The following code installs the required Python libraries (found in the requirements.txt file) in the Snowflake environment to allow successful execution of the remaining notebook code. After running this code for the first time, it is required to “restart” the kernal so the Python libraries are available in the environment. This is done by selecting the “Connected” menu above the notebook (next to “Run all”) and selecting the “restart kernal” link. Subsequent runs of the notebook do not require this “restart” process.

In [1]:
# !pip install uv
# !uv pip install  -r requirements.txt

In [6]:
# import snowflake
# from snowflake.snowpark.context import get_active_session
# session = get_active_session()

import warnings

from TERRACLIMATE_DATA_EXTRACTION_NOTEBOOK_SNOWFLAKE_rossedit import df_var_append2

warnings.filterwarnings("ignore")

# Data manipulation and analysis
import numpy as np
import pandas as pd

# Multi-dimensional arrays and datasets (e.g., NetCDF, Zarr)
import xarray as xr

from scipy.spatial import cKDTree

# Planetary Computer tools for STAC API access and authentication
import pystac_client
import planetary_computer as pc

from datetime import date
from tqdm import tqdm
import os
from pathlib import Path

In [9]:
extraction_variable = 'ppt'
extract_data_dir = f"./data/terraclimate/terraclimate_{extraction_variable}_rolling_data/"

In [10]:
Path(extract_data_dir).mkdir(parents=True, exist_ok=True)

## Extracting TerraClimate Data Using API Calls

The API-based method allows us to efficiently access **TerraClimate** data for specific regions and time periods through the [Microsoft Planetary Computer](https://planetarycomputer.microsoft.com/), ensuring scalability and reproducibility of the process.

Through the API, we can extract climate variables such as **Potential Evapotranspiration (PET)**, which represents the atmospheric demand for water. This variable provides important insights into surface moisture balance and helps improve the accuracy of water quality modeling.

This approach ensures consistent, automated retrieval of high-resolution climate data that can be easily integrated with satellite-derived features for comprehensive environmental and hydrological analysis.


### Loading and Mapping TerraClimate Data

This section demonstrates how **TerraClimate climate variables**, such as **Potential Evapotranspiration (PET)**, are loaded and mapped to sampling locations.

- The **load_terraclimate_dataset** function opens the TerraClimate Zarr/NetCDF dataset from the Microsoft Planetary Computer, handling storage options automatically.
- The **filterg** function filters the dataset for the desired time range (2011–2015) and the spatial extent corresponding to the study region. The resulting data is converted into a pandas DataFrame with standardized column names.
- The **assign_nearest_climate** function maps each sampling location to its **nearest TerraClimate grid point** using a KD-tree and assigns the climate variable values corresponding to the closest timestamp.

This workflow ensures efficient, reproducible retrieval of climate variables, while allowing participants to work with pre-extracted CSV files for faster benchmarking and analysis.


In [3]:
# --- Filtering function (kept identical) ---
def filterg(ds, var):
    ds_2011_2015 = ds[var].sel(time=slice("2011-01-01", "2015-12-31"))

    df_var_append = []
    for i in tqdm(range(len(ds_2011_2015.time))):
        df_var = ds_2011_2015.isel(time=i).to_dataframe().reset_index()
        df_var_filter = df_var[
            (df_var['lat'] > -35.18) & (df_var['lat'] < -21.72) &
            (df_var['lon'] > 14.97) & (df_var['lon'] < 32.79)
        ]
        df_var_append.append(df_var_filter)

    df_var_final = pd.concat(df_var_append, ignore_index=True)
    print(f"Filtering for {var} completed")

    df_var_final['time'] = df_var_final['time'].astype(str)

    # Column mapping
    col_mapping = {"lat": "Latitude", "lon": "Longitude", "time": "Sample Date"}
    df_var_final = df_var_final.rename(columns=col_mapping)

    return df_var_final

In [15]:
def load_terraclimate_dataset():
    catalog = pystac_client.Client.open(
        "https://planetarycomputer.microsoft.com/api/stac/v1",
        modifier=pc.sign_inplace,
    )
    collection = catalog.get_collection("terraclimate")
    asset = collection.assets["zarr-abfs"]

    if "xarray:storage_options" in asset.extra_fields:
        ds = xr.open_zarr(
            asset.href,
            storage_options=asset.extra_fields["xarray:storage_options"],
            consolidated=True,
        )
    else:
        ds = xr.open_dataset(
            asset.href,
            **asset.extra_fields["xarray:open_kwargs"],
        )

    return ds

In [16]:
# --- Filtering function (kept identical) ---
def filterg1(ds, var):
    ds_2011_2015 = ds[var].sel(time=slice("2011-01-01", "2015-12-31"))

    df_var_append = []
    for i in tqdm(range(len(ds_2011_2015.time))):
        date_str = ds_2011_2015.time[i].values.astype('datetime64[D]').astype(object).isoformat()
        filename = f"terraclimate_features_training_{extraction_variable}_{date_str}.csv"
        if filename not in list(os.listdir(extract_data_dir)):
            print(f"{filename} not in {list(os.listdir(extract_data_dir))}")
            df_var = ds_2011_2015.isel(time=i).to_dataframe().reset_index()
            df_var_filter = df_var[
                (df_var['lat'] > -35.18) & (df_var['lat'] < -21.72) &
                (df_var['lon'] > 14.97) & (df_var['lon'] < 32.79)
            ]
            df_var_append.append(df_var_filter)
            df_var_filter.to_csv(f"./{extract_data_dir}{filename}", index=False)
        else:
            print(f"Skipping ... {filename} already in {list(os.listdir(extract_data_dir))}")

def filterg2(var, df_var_append):
    df_var_final = pd.concat(df_var_append, ignore_index=True)
    print(f"Filtering for {var} completed")

    df_var_final['time'] = df_var_final['time'].astype(str)

    # Column mapping
    col_mapping = {"lat": "Latitude", "lon": "Longitude", "time": "Sample Date"}
    df_var_final = df_var_final.rename(columns=col_mapping)

    return df_var_final

In [17]:
# --- Climate variable assignment function (unchanged logic) ---
def assign_nearest_climate(sa_df, climate_df, var_name):
    """
    Map nearest climate variable values to a new DataFrame 
    containing only the specified variable column.
    """
    sa_coords = np.radians(sa_df[['Latitude', 'Longitude']].values)
    climate_coords = np.radians(climate_df[['Latitude', 'Longitude']].values)

    tree = cKDTree(climate_coords)
    dist, idx = tree.query(sa_coords, k=1)

    nearest_points = climate_df.iloc[idx].reset_index(drop=True)

    sa_df = sa_df.reset_index(drop=True)
    sa_df[['nearest_lat', 'nearest_lon']] = nearest_points[['Latitude', 'Longitude']]

    sa_df['Sample Date'] = pd.to_datetime(sa_df['Sample Date'], dayfirst=True, errors='coerce')
    climate_df['Sample Date'] = pd.to_datetime(climate_df['Sample Date'], dayfirst=True, errors='coerce')

    climate_values = []

    for i in tqdm(range(len(sa_df)), desc=f"Mapping {var_name.upper()} values"):
        sample_date = sa_df.loc[i, 'Sample Date']
        nearest_lat = sa_df.loc[i, 'nearest_lat']
        nearest_lon = sa_df.loc[i, 'nearest_lon']

        subset = climate_df[
            (climate_df['Latitude'] == nearest_lat) &
            (climate_df['Longitude'] == nearest_lon)
        ]

        if subset.empty:
            climate_values.append(np.nan)
            continue

        nearest_idx = (subset['Sample Date'] - sample_date).abs().idxmin()
        climate_values.append(subset.loc[nearest_idx, var_name])

    output_df = pd.DataFrame({var_name: climate_values})

    
    return output_df

### Extracting features for the training dataset

In [18]:
Water_Quality_df = pd.read_csv("data/water_quality_training_dataset.csv")
display(Water_Quality_df.head(5))

Unnamed: 0,Latitude,Longitude,Sample Date,Total Alkalinity,Electrical Conductance,Dissolved Reactive Phosphorus
0,-28.760833,17.730278,02-01-2011,128.912,555.0,10.0
1,-26.861111,28.884722,03-01-2011,74.72,162.9,163.0
2,-26.45,28.085833,03-01-2011,89.254,573.0,80.0
3,-27.671111,27.236944,03-01-2011,82.0,203.6,101.0
4,-27.356667,27.286389,03-01-2011,56.1,145.1,151.0


In [19]:
Water_Quality_df.shape

(9319, 6)

In [72]:
ds = load_terraclimate_dataset()
ds_2011_2015 = ds[extraction_variable].sel(
    time=slice("2010-01-01", "2016-12-31")
    , lat=slice(-21.72, -35.18) # IMPORTANT: latitude often descending
    , lon=slice(14.97, 32.79))

In [74]:
ppt_roll3 = ds_2011_2015.rolling(time=3, center=False).sum().compute()

In [75]:
ppt_roll6 = ds_2011_2015.rolling(time=6, center=False).sum().compute()

In [76]:
ppt_roll12 = ds_2011_2015.rolling(time=12, center=False).sum().compute()

In [77]:
ppt_roll3_df = ppt_roll3.to_dataframe().reset_index()
ppt_roll6_df = ppt_roll6.to_dataframe().reset_index()
ppt_roll12_df = ppt_roll12.to_dataframe().reset_index()

In [89]:
ppt_roll3_df = ppt_roll3_df[(ppt_roll3_df['time'] >= '2011-01-01') & (ppt_roll3_df['time'] <= '2015-12-31')]
ppt_roll6_df = ppt_roll6_df[(ppt_roll6_df['time'] >= '2011-01-01') & (ppt_roll6_df['time'] <= '2015-12-31')]
ppt_roll12_df = ppt_roll12_df[(ppt_roll12_df['time'] >= '2011-01-01') & (ppt_roll12_df['time'] <= '2015-12-31')]

print(f"ppt_roll3_df: {ppt_roll3_df.shape}")
print(f"ppt_roll6_df: {ppt_roll6_df.shape}")
print(f"ppt_roll12_df: {ppt_roll12_df.shape}")

ppt_roll3_df: (8294640, 4)
ppt_roll6_df: (8294640, 4)
ppt_roll12_df: (8294640, 4)


In [90]:
for col in ppt_roll3_df.columns:
    ppt_roll3_df.rename(columns={col:f"{col}_roll3"}, inplace=True)

In [92]:
for col in ppt_roll6_df.columns:
    ppt_roll6_df.rename(columns={col:f"{col}_roll6"}, inplace=True)

In [93]:
for col in ppt_roll12_df.columns:
    ppt_roll12_df.rename(columns={col:f"{col}_roll12"}, inplace=True)

In [94]:
display(ppt_roll3_df.head())

Unnamed: 0,time_roll3,lat_roll3,lon_roll3,ppt_roll3
1658928,2011-01-01,-21.729167,14.979167,128.3
1658929,2011-01-01,-21.729167,15.020833,138.4
1658930,2011-01-01,-21.729167,15.0625,143.3
1658931,2011-01-01,-21.729167,15.104167,146.8
1658932,2011-01-01,-21.729167,15.145833,151.2


In [95]:
display(ppt_roll6_df.head())

Unnamed: 0,time_roll6,lat_roll6,lon_roll6,ppt_roll6
1658928,2011-01-01,-21.729167,14.979167,129.6
1658929,2011-01-01,-21.729167,15.020833,139.7
1658930,2011-01-01,-21.729167,15.0625,144.6
1658931,2011-01-01,-21.729167,15.104167,148.1
1658932,2011-01-01,-21.729167,15.145833,152.7


In [96]:
display(ppt_roll12_df.head())

Unnamed: 0,time_roll12,lat_roll12,lon_roll12,ppt_roll12
1658928,2011-01-01,-21.729167,14.979167,181.4
1658929,2011-01-01,-21.729167,15.020833,195.1
1658930,2011-01-01,-21.729167,15.0625,201.2
1658931,2011-01-01,-21.729167,15.104167,207.1
1658932,2011-01-01,-21.729167,15.145833,213.7


In [99]:
all_rolling_df = pd.concat((ppt_roll3_df, ppt_roll6_df, ppt_roll12_df), ignore_index=True, axis=1)

col_names = list(ppt_roll3_df.columns)
col_names.extend(list(ppt_roll6_df.columns))
col_names.extend(list(ppt_roll12_df.columns))

for cur_col, new_col in zip(all_rolling_df.columns, col_names):
    all_rolling_df.rename(columns={cur_col:new_col}, inplace=True)

display(all_rolling_df.head())

Unnamed: 0,time_roll3,lat_roll3,lon_roll3,ppt_roll3,time_roll6,lat_roll6,lon_roll6,ppt_roll6,time_roll12,lat_roll12,lon_roll12,ppt_roll12
1658928,2011-01-01,-21.729167,14.979167,128.3,2011-01-01,-21.729167,14.979167,129.6,2011-01-01,-21.729167,14.979167,181.4
1658929,2011-01-01,-21.729167,15.020833,138.4,2011-01-01,-21.729167,15.020833,139.7,2011-01-01,-21.729167,15.020833,195.1
1658930,2011-01-01,-21.729167,15.0625,143.3,2011-01-01,-21.729167,15.0625,144.6,2011-01-01,-21.729167,15.0625,201.2
1658931,2011-01-01,-21.729167,15.104167,146.8,2011-01-01,-21.729167,15.104167,148.1,2011-01-01,-21.729167,15.104167,207.1
1658932,2011-01-01,-21.729167,15.145833,151.2,2011-01-01,-21.729167,15.145833,152.7,2011-01-01,-21.729167,15.145833,213.7


In [103]:
# Validate
all_rolling_df[
    ~((all_rolling_df["time_roll3"] == all_rolling_df["time_roll6"])
    & (all_rolling_df["lat_roll3"] == all_rolling_df["lat_roll6"])
    & (all_rolling_df["lon_roll3"] == all_rolling_df["lon_roll6"]))
]

Unnamed: 0,time_roll3,lat_roll3,lon_roll3,ppt_roll3,time_roll6,lat_roll6,lon_roll6,ppt_roll6,time_roll12,lat_roll12,lon_roll12,ppt_roll12


In [104]:
# Validate
all_rolling_df[
    ~((all_rolling_df["time_roll6"] == all_rolling_df["time_roll12"])
    & (all_rolling_df["lat_roll6"] == all_rolling_df["lat_roll12"])
    & (all_rolling_df["lon_roll6"] == all_rolling_df["lon_roll12"]))
]

Unnamed: 0,time_roll3,lat_roll3,lon_roll3,ppt_roll3,time_roll6,lat_roll6,lon_roll6,ppt_roll6,time_roll12,lat_roll12,lon_roll12,ppt_roll12


In [105]:
all_rolling_df.drop(columns=["time_roll6", "lat_roll6", "lon_roll6", "time_roll12", "lat_roll12", "lon_roll12"], inplace=True)
all_rolling_df.rename(columns={
    "time_roll3":"time"
    , "lat_roll3":"lat"
    , "lon_roll3":"lon"
}, inplace=True)

In [107]:
all_rolling_df

Unnamed: 0,time,lat,lon,ppt_roll3,ppt_roll6,ppt_roll12
1658928,2011-01-01,-21.729167,14.979167,128.3,129.6,181.4
1658929,2011-01-01,-21.729167,15.020833,138.4,139.7,195.1
1658930,2011-01-01,-21.729167,15.062500,143.3,144.6,201.2
1658931,2011-01-01,-21.729167,15.104167,146.8,148.1,207.1
1658932,2011-01-01,-21.729167,15.145833,151.2,152.7,213.7
...,...,...,...,...,...,...
9953563,2015-12-01,-35.145833,32.604167,,,
9953564,2015-12-01,-35.145833,32.645833,,,
9953565,2015-12-01,-35.145833,32.687500,,,
9953566,2015-12-01,-35.145833,32.729167,,,


In [110]:
df_var_append2 = []
for date in all_rolling_df['time'].drop_duplicates():
    df_var_append2.append(all_rolling_df[all_rolling_df['time'] == date])

In [108]:
# ds = None
# tc_parameter = None
# for i in range(10):
#     try:
#         print(f"Starting try {i}")
#         # Load TerraClimate dataset, filter (time,region,parameter), filter for nearest parameter values
#         ds = load_terraclimate_dataset()
#         tc_parameter = filterg1(ds,extraction_variable)
#         break
#     except Exception as e:
#         print(f"\tTry {i} Failed with exception:\n\t\t{e}")
#         continue


In [53]:
# from datetime import datetime
#
# filenames = []
# for filename in os.listdir(extract_data_dir):
#     if filename.endswith(".csv"):
#         date_str = filename.split("_")[-1].split(".")[0]
#         date = datetime.strptime(date_str, "%Y-%m-%d").date()
#
#         filenames.append((filename, date))
#
# sorted_filenames = sorted(filenames, key=lambda x: x[1])
# sorted_filenames = [filename for filename, date in sorted_filenames]

In [55]:
# df_var_append2 = [pd.read_csv(f"{extract_data_dir}{filename}") for filename in sorted_filenames]

In [112]:
tc_parameter = filterg2(extraction_variable, df_var_append2)

Filtering for ppt completed


In [117]:
cols_to_process = []
for col in [col for col in tc_parameter.columns if col not in ['Sample Date', 'Latitude', 'Longitude']]:
    cols_to_process.append(col)

In [119]:
Terraclimate_training_dfs = []
for extraction_var in cols_to_process:
    Terraclimate_training_dfs.append(assign_nearest_climate(Water_Quality_df, tc_parameter, extraction_var))

Mapping PPT_ROLL3 values: 100%|██████████| 9319/9319 [01:35<00:00, 97.50it/s] 
Mapping PPT_ROLL6 values: 100%|██████████| 9319/9319 [01:31<00:00, 101.72it/s]
Mapping PPT_ROLL12 values: 100%|██████████| 9319/9319 [01:33<00:00, 99.41it/s] 


In [121]:
Terraclimate_training_df = None

for i, col in enumerate(cols_to_process):
    if i == 0:
        Terraclimate_training_df = Terraclimate_training_dfs[i]
    else:
        Terraclimate_training_df[col] = Terraclimate_training_dfs[i][col]

Terraclimate_training_df['Latitude'] = Water_Quality_df['Latitude']
Terraclimate_training_df['Longitude'] = Water_Quality_df['Longitude']
Terraclimate_training_df['Sample Date'] = Water_Quality_df['Sample Date']

In [131]:
final_cols = ['Latitude', 'Longitude', 'Sample Date']
final_cols.extend(cols_to_process)
Terraclimate_training_df = Terraclimate_training_df[final_cols]
extraction_var_str = ''.join(cols_to_process)
Terraclimate_training_df.to_csv(f'{extract_data_dir}terraclimate_features_training_{extraction_var_str}.csv', index=False)

In [132]:
# Preview File
display(Terraclimate_training_df.head())

Unnamed: 0,Latitude,Longitude,Sample Date,ppt_roll3,ppt_roll6,ppt_roll12
0,-28.760833,17.730278,02-01-2011,41.8,47.2,66.9
1,-26.861111,28.884722,03-01-2011,281.2,516.4,631.8
2,-26.45,28.085833,03-01-2011,273.1,497.6,625.9
3,-27.671111,27.236944,03-01-2011,356.7,590.8,721.3
4,-27.356667,27.286389,03-01-2011,330.3,556.3,685.7


In [60]:
# Terraclimate_training_df.to_csv("/tmp/terraclimate_features_training.csv",index = False)

In [61]:
# session.sql("""
#     PUT file:///tmp/terraclimate_features_training.csv
#     'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge"/versions/live/'
#     AUTO_COMPRESS=FALSE
#     OVERWRITE=TRUE
# """).collect()
#
# print("File saved! Refresh the browser to see the files in the sidebar")

**Note:** If you're using your own workspace, remember to replace "EY-AI-and-Data-Challenge" with your workspace name in the file path.

### Extracting features for the validation dataset

In [133]:
Validation_df=pd.read_csv('./data/submission_template.csv')
display(Validation_df.head())

Unnamed: 0,Latitude,Longitude,Sample Date,Total Alkalinity,Electrical Conductance,Dissolved Reactive Phosphorus
0,-32.043333,27.822778,01-09-2014,,,
1,-33.329167,26.0775,16-09-2015,,,
2,-32.991639,27.640028,07-05-2015,,,
3,-34.096389,24.439167,07-02-2012,,,
4,-32.000556,28.581667,01-10-2014,,,


In [134]:
Validation_df.shape

(200, 6)

In [135]:
# Load TerraClimate dataset, filter (time,region,parameter), filter for nearest parameter values
Terraclimate_validation_dfs = []
for extraction_var in cols_to_process:
    Terraclimate_validation_dfs.append(assign_nearest_climate(Water_Quality_df, tc_parameter, extraction_var))
# Terraclimate_validation_df = assign_nearest_climate(Validation_df, tc_parameter, extraction_variable)

Mapping PPT_ROLL3 values: 100%|██████████| 9319/9319 [01:32<00:00, 100.92it/s]
Mapping PPT_ROLL6 values: 100%|██████████| 9319/9319 [01:30<00:00, 102.62it/s]
Mapping PPT_ROLL12 values: 100%|██████████| 9319/9319 [01:33<00:00, 100.12it/s]


In [136]:
Terraclimate_validation_df = None

for i, col in enumerate(cols_to_process):
    if i == 0:
        Terraclimate_validation_df = Terraclimate_validation_dfs[i]
    else:
        Terraclimate_validation_df[col] = Terraclimate_validation_dfs[i][col]

Terraclimate_validation_df['Latitude'] = Validation_df['Latitude']
Terraclimate_validation_df['Longitude'] = Validation_df['Longitude']
Terraclimate_validation_df['Sample Date'] = Validation_df['Sample Date']

In [137]:
Terraclimate_validation_df = Terraclimate_validation_df[final_cols]
Terraclimate_validation_df.to_csv(f'{extract_data_dir}terraclimate_features_validation_{extraction_var_str}.csv', index=False)

In [138]:
# Preview File
display(Terraclimate_validation_df.head())

Unnamed: 0,Latitude,Longitude,Sample Date,ppt_roll3,ppt_roll6,ppt_roll12
0,-32.043333,27.822778,01-09-2014,41.8,47.2,66.9
1,-33.329167,26.0775,16-09-2015,281.2,516.4,631.8
2,-32.991639,27.640028,07-05-2015,273.1,497.6,625.9
3,-34.096389,24.439167,07-02-2012,356.7,590.8,721.3
4,-32.000556,28.581667,01-10-2014,330.3,556.3,685.7


In [None]:
# Terraclimate_validation_df.to_csv("/tmp/terraclimate_features_validation.csv",index = False)

In [None]:
# session.sql("""
#     PUT file:///tmp/terraclimate_features_validation.csv
#     'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge"/versions/live/'
#     AUTO_COMPRESS=FALSE
#     OVERWRITE=TRUE
# """).collect()
#
# print("File saved! Refresh the browser to see the files in the sidebar")

**Note:** If you're using your own workspace, remember to replace "EY-AI-and-Data-Challenge" with your workspace name in the file path.