# ðŸ““ ACCESS-ESM15-write-zarr

**Author:** Thomas Moore  
**Date:** 2025-10-31
**Updated:** YYYY-MM-DD (if applicable)  
**Environment:** `pangeo_csepta` running on `Gadi` ARE  
**Tags:** sandbox, ARD, ACCESS-ESM1.5

---

### ðŸ“˜ Description

This notebook attempts to bring together all the disperate workplows for this task over the year(s).  Issue: [https://github.com/Thomas-Moore-Creative/Pacific-Tuna-Climate-Response/issues/51](https://github.com/Thomas-Moore-Creative/Pacific-Tuna-Climate-Response/issues/51) and sub-issues

In [1]:
Author = {"name": "Thomas Moore", "affiliation": "CSIRO", "email": "thomas.moore@csiro.au", "orcid": "0000-0003-3930-1946"}

# Software

### ACDtools

In [2]:
!pip install --user -e /g/data/es60/users/thomas_moore/code/ACDtools

Obtaining file:///g/data/es60/users/thomas_moore/code/ACDtools
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
[?25hBuilding wheels for collected packages: ACDtools
  Building editable for ACDtools (pyproject.toml) ... [?25ldone
[?25h  Created wheel for ACDtools: filename=acdtools-0.1-0.editable-py3-none-any.whl size=3553 sha256=3ad6270b8f5325bc968cd37bcba9d5374b414a6f9573f887e869d50483e51336
  Stored in directory: /scratch/es60/thomas_moore/tmp/pip-ephem-wheel-cache-giwui4in/wheels/b6/a3/f2/6ce45fbdc116ad50e421d6a11cb060cc796e867501807af446
Successfully built ACDtools
Installing collected packages: ACDtools
  Attempting uninstall: ACDtools
    Found existing installation: ACDtools 0.1
    Uninstalling ACDtools-0.1:
      Successfully uninstalled ACDtools-0.1
Successfully installed AC

In [3]:
# Enable autoreload in the notebook
%load_ext autoreload
%autoreload 1 
%aimport ACDtools.util
%aimport ACDtools.ard
%aimport ACDtools.ocean
%aimport ACDtools.plot
# Importing from your local package util.py
from ACDtools import util
from ACDtools import ard
from ACDtools import ocean
from ACDtools import plot

In [4]:
import datetime
from pprint import pprint
import intake
import sys
from contextlib import redirect_stdout
import math

## Cluster

In [5]:
client, cluster = util.start_dask_cluster_from_config('netcdf_work')

Cluster started with 28 workers.
Dashboard available at: /proxy/8787/status


## Functions

In [6]:
# develop any new functions here

# settings

In [7]:
job_config_path = '/g/data/es60/users/thomas_moore/code/ACDtools/job_config.yaml'
job_config_dict = ACDtools.util.load_config(job_config_path)

# workflow with ACCESS-NRI catalog that uses NCI catalog

In [8]:
catalog = intake.cat.access_nri

In [9]:
%%time
# --- build log filename dynamically ---
logfile = f"{job_config_dict['paths']['log_dir']}log_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"

with open(logfile, "w") as f, redirect_stdout(f):
    #####
    print('Running: ' +
          job_config_dict['catalog_search_query_dict']['source_id'] +
          ' ' + job_config_dict['catalog_search_query_dict']['experiment_id'] +
          ' ' + job_config_dict['catalog_search_query_dict']['variable_id'] +
          ' ' + job_config_dict['catalog_search_query_dict']['table_id'] +
          ' ' + job_config_dict['catalog_search_query_dict']['version']
         )
    pprint(job_config_dict)
    # load catalog
    cmip6_fs38_datastore = ACDtools.util.load_cmip6_fs38_datastore()
    # search catalog for list of files
    search = cmip6_fs38_datastore.search(**job_config_dict['catalog_search_query_dict'])
    # load into one object using xarray kwags for chunking and handling cftime
    ####
    #ds = load_ACCESS_ESM(search,use_cftime=True,chunking_key=job_config_dict['chunking_key'])
    ####
    ds = ACDtools.ard.load_ACCESS_ESM_ensemble(search,use_cftime=True,chunking_key=job_config_dict['chunking_key'])
    # save and drop multidimensional coordinates
    ds = ACDtools.ard.save_n_drop_multidim_lat_lon(ds,save_coords_dir=job_config_dict['paths']['save_coords_dir'],
                                      variable_name = job_config_dict['catalog_search_query_dict']['variable_id'])
    # remove encoding
    ACDtools.util.remove_encoding(ds)
    # write out zarr
    current_datetime = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    print('Started zarr write at '+current_datetime)
    filename = (
    f"{job_config_dict['paths']['write_dir']}"
    + f"{job_config_dict['catalog_search_query_dict']['source_id']}."
    + f"{job_config_dict['catalog_search_query_dict']['experiment_id']}."
    + f"{job_config_dict['catalog_search_query_dict']['variable_id']}."
    + f"base.v{current_datetime}.zarr"
    )
    print('Zarr filename = '+filename )
    ds.to_zarr(filename,consolidated=True)
    print('Finished at '+ current_datetime)
    ##### start log cluster settings
    # Get existing client (do NOT create a new one)
    client = client.current()   # or Client() if not yet connected

    info = client.scheduler_info()["workers"]

    # total cores (threads)
    n_workers = len(info)
    total_threads = sum(w.get("nthreads", 0) for w in info.values())

    # helper: MiB/GB conversion using powers of 1024
    def to_gib(bytes_):
        return bytes_ / (1024 ** 3)

    # 1) Try configured memory_limit
    limits = {addr: w.get("memory_limit", 0) for addr, w in info.items()}

    # 2) For any worker reporting 0/None (unlimited), fetch actual host RAM via psutil on the worker
    need_actual = [addr for addr, lim in limits.items() if not lim or lim == 0 or math.isclose(lim, 0.0)]
    
    if need_actual:
        # run a tiny function on workers to read psutil.virtual_memory().total
        def _get_total_ram():
            try:
                import psutil
                return psutil.virtual_memory().total
            except Exception:
                # fallback: dask.system.MEMORY_LIMIT may help on some installs
                try:
                    from dask.system import MEMORY_LIMIT
                    return MEMORY_LIMIT or 0
                except Exception:
                    return 0
    
        actual = client.run(_get_total_ram)  # returns {worker_addr: bytes}
        # fill in missing/zero limits with actual system RAM
        for addr in need_actual:
            if addr in actual and actual[addr]:
                limits[addr] = actual[addr]
    
    total_mem_bytes = sum(limits.values())
    total_mem_gib = to_gib(total_mem_bytes)/n_workers
    
    # Build a short log block
    log_lines = [
        "Dask cluster resources:",
        f"  Workers:      {n_workers}",
        f"  Total cores:  {total_threads}",
        f"  Total memory: {total_mem_gib:.2f} GiB",
    ]
    
    print("\n".join(log_lines))
    ########### end log cluster settings
    print("==== Log complete ====")
# ---------------------------------------------------------
print(f"âœ… Log written to {logfile}")


âœ… Log written to /scratch/es60/ard/models/ACCESS-ESM15/ARD/logs/log_20251106_152528.txt
CPU times: user 2min 9s, sys: 16.1 s, total: 2min 25s
Wall time: 2min 31s


# THE END