In [1]:
import pyqg
import icechunk as ic
import xarray as xr
import zarr
import threading
import queue


In [2]:
storage = ic.s3_storage(
    bucket="icechunk-test",
    prefix="dvc-webinar/pyqg-demo-2/",
)
repo = ic.Repository.open(storage)
repo

<icechunk.repository.Repository at 0x7ff320f94550>

In [3]:
beta = beta=3.0e-11
rd = 15000.0
run_name = f'beta={int(beta * 1e12)},rd={int(rd / 1000)}'
run_name

'beta=30,rd=15'

In [4]:
year = 24*60*60*360.
m = pyqg.QGModel(
    beta=beta, rd=rd,
    tmax=10*year, twrite=10000,
    tavestart=1e99 # never average
)
m.run()

INFO:  Logger initialized
INFO: Step: 10000, Time: 7.20e+07, KE: 2.88e-05, CFL: 0.019
INFO: Step: 20000, Time: 1.44e+08, KE: 2.05e-04, CFL: 0.062
INFO: Step: 30000, Time: 2.16e+08, KE: 2.27e-04, CFL: 0.071
INFO: Step: 40000, Time: 2.88e+08, KE: 2.11e-04, CFL: 0.061


In [5]:
vars_to_keep = ['q']

In [6]:
ds = m.to_dataset()
session = repo.writable_session("main")
ds[vars_to_keep].to_zarr(session.store, group=run_name, zarr_format=3, consolidated=False)
session.commit(f"{run_name} - wrote initial snapshot @ t={m.t}")

'G8RVTE5KRR4MG646QHJ0'

In [8]:
NSAMPLES=200

q = queue.Queue()
lock = threading.Lock()

def worker():
    while True:
        (ds, store, group_name) = q.get()
        # lock is probably unncessary
        with lock:
            ds.to_zarr(store, group=group_name, zarr_format=3, consolidated=False, append_dim="time")
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

session = repo.writable_session("main")

tsnapint = 24*60*60 * 30
m.tmax += NSAMPLES * tsnapint

for _ in m.run_with_snapshots(tsnapint=tsnapint):
    q.put((m.to_dataset()[vars_to_keep], session.store, run_name))

print(q.qsize())
q.join()

session.rebase(ic.ConflictDetector())
session.commit(f"{run_name} wrote data")

INFO: Step: 120000, Time: 8.64e+08, KE: 2.14e-04, CFL: 0.062
INFO: Step: 130000, Time: 9.36e+08, KE: 2.26e-04, CFL: 0.068
INFO: Step: 140000, Time: 1.01e+09, KE: 2.26e-04, CFL: 0.063
INFO: Step: 150000, Time: 1.08e+09, KE: 2.04e-04, CFL: 0.067
INFO: Step: 160000, Time: 1.15e+09, KE: 2.23e-04, CFL: 0.071
INFO: Step: 170000, Time: 1.22e+09, KE: 2.23e-04, CFL: 0.069
INFO: Step: 180000, Time: 1.30e+09, KE: 2.48e-04, CFL: 0.068


1


'9Q44QH2W8650B62R48KG'

In [None]:
session = repo.readonly_session(branch="main")
ds = xr.open_dataset(session.store, group=run_name, engine="zarr", zarr_format=3, consolidated=False)
ds

In [10]:
repo.ancestry(branch="main")

[SnapshotInfo(id="9Q44QH2W8650B62R48KG", parent_id="F3JVZWAZA851WC6G70S0", written_at=datetime.datetime(2025,2,18,18,47,32,745213, tzinfo=datetime.timezone.utc), message="beta=30,rd..."),
 SnapshotInfo(id="F3JVZWAZA851WC6G70S0", parent_id="05MT43VT7MMJP30G4BGG", written_at=datetime.datetime(2025,2,18,18,47,5,114833, tzinfo=datetime.timezone.utc), message="beta=30,rd..."),
 SnapshotInfo(id="05MT43VT7MMJP30G4BGG", parent_id="G8RVTE5KRR4MG646QHJ0", written_at=datetime.datetime(2025,2,18,18,46,43,150782, tzinfo=datetime.timezone.utc), message="beta=15,rd..."),
 SnapshotInfo(id="G8RVTE5KRR4MG646QHJ0", parent_id="1VQAD5AFG6HHRNBH4PHG", written_at=datetime.datetime(2025,2,18,18,46,42,24310, tzinfo=datetime.timezone.utc), message="beta=30,rd..."),
 SnapshotInfo(id="1VQAD5AFG6HHRNBH4PHG", parent_id="BA338MZRMR4XZMQHSJ60", written_at=datetime.datetime(2025,2,18,18,46,9,112293, tzinfo=datetime.timezone.utc), message="beta=15,rd..."),
 SnapshotInfo(id="BA338MZRMR4XZMQHSJ60", parent_id="PPMBH8J5QJD