# Client

In [1]:
from pathlib import Path
import glob
import os
import xarray as xr

In [2]:
import dask
# dask.config.set(scheduler='distributed')

In [3]:
import dask.array as da

In [4]:
# !!!!Import this cell from the dask dashboard!!!!!
from dask.distributed import Client

client = Client("tcp://127.0.0.1:43341")
client


+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4     | 4.4.4  | None      | None    |
+---------+--------+-----------+---------+


0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:43341,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: 45 minutes ago,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:45877,Total threads: 1
Dashboard: http://127.0.0.1:34505/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:45933,
Local directory: /gpfsm/dnb33/tdirs/batch/slurm.47756083.hzafar/dask-scratch-space/worker-15jrtjto,Local directory: /gpfsm/dnb33/tdirs/batch/slurm.47756083.hzafar/dask-scratch-space/worker-15jrtjto
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 141.25 MiB,Spilled bytes: 0 B
Read bytes: 760.18 kiB,Write bytes: 250.50 kiB

0,1
Comm: tcp://127.0.0.1:37079,Total threads: 1
Dashboard: http://127.0.0.1:40825/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:40475,
Local directory: /gpfsm/dnb33/tdirs/batch/slurm.47756083.hzafar/dask-scratch-space/worker-qq3aikgj,Local directory: /gpfsm/dnb33/tdirs/batch/slurm.47756083.hzafar/dask-scratch-space/worker-qq3aikgj
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 141.11 MiB,Spilled bytes: 0 B
Read bytes: 759.59 kiB,Write bytes: 250.27 kiB


In [5]:
# Check what's actually installed on workers
def check_worker_packages(dask_worker):
    import subprocess
    result = subprocess.run(['pip', 'list'], capture_output=True, text=True)
    return result.stdout

# Run on a worker
futures = client.run(check_worker_packages)
print(list(futures.values())[0])  # Check first worker

Package                           Version
--------------------------------- --------------
aiohappyeyeballs                  2.4.3
aiohttp                           3.10.10
aiosignal                         1.3.1
alembic                           1.13.3
annotated-types                   0.7.0
anyio                             4.6.2.post1
argon2-cffi                       23.1.0
argon2-cffi-bindings              21.2.0
arrow                             1.3.0
asttokens                         2.4.1
async_generator                   1.10
async-lru                         2.0.4
attrs                             24.2.0
Babel                             2.14.0
batchspawner                      1.3.0
beautifulsoup4                    4.12.3
bleach                            6.1.0
blinker                           1.8.2
bokeh                             3.6.0
branca                            0.7.2
Brotli                            1.1.0
cached-property                   1.5.2
certifi         

## Debug

In [None]:
# check versions explicitly
import dask
print(f"Client dask version: {dask.__version__}")
print(f"Cluster info: {client.scheduler_info()}")

In [None]:
# Check if your client is actually connected
print(f"Client: {client}")
print(f"Client status: {client.status}")
print(f"Dashboard: {client.dashboard_link}")

# Check cluster info
try:
    print(f"Scheduler info: {client.scheduler_info()}")
    print(f"Workers: {len(client.scheduler_info()['workers'])}")
except Exception as e:
    print(f"Connection error: {e}")

In [None]:
# Check what client dask operations are using
from dask.distributed import get_client

try:
    active_client = get_client()
    print(f"Active client: {active_client}")
    print(f"Is this your client? {active_client is client}")
except ValueError:
    print("No active client found!")

In [None]:
client = dask.distributed.get_client()
print(client.dashboard_link)  # Check dashboard for worker health
client.cluster.scheduler.identity()  # Check scheduler info

In [None]:
# Force a simple dask computation to test


test_array = da.ones((1000, 1000), chunks=(100, 100))
result = test_array.sum().compute()  # This should show up in dashboard
print(f"Test result: {result}")

In [None]:
# Force computation through your client
with client:
    test_array = da.ones((1000, 1000), chunks=(100, 100))
    result = test_array.sum().compute()

# Micasa data test

In [None]:
# Micasa data directory
NOBACKUP = Path("/discover/nobackup/hzafar")
MICASA_DATA_PATH = Path(NOBACKUP) / "ghgc" / "micasa" / "micasa-data" / "daily"

In [None]:
# Test a random month
list_mon = sorted(list(MICASA_DATA_PATH.glob('2016/01/*.nc4')))
# print(list_mon[0])

In [None]:
# Test a random year
list_year = sorted(list(MICASA_DATA_PATH.glob('2016/??/*.nc4')))
# print(list_year[0], list_year[-1], len(list_year),sep='\n')

In [None]:
import time 
start = time.time()
ds = xr.open_mfdataset(list_mon)
print(f"Time taken: {time.time() - start:.2f} seconds")

In [None]:
ds.NPP.data

In [None]:
type(ds.NPP.data)

In [None]:
ds.NPP.load()

In [None]:
# Explicitly use your client context
with client:
    ds = xr.open_mfdataset(list_mon)
    result = ds.NPP.load()

In [None]:
# Force computation through your client
with client:
    test_array = da.ones((1000, 1000), chunks=(100, 100))
    result = test_array.sum().compute()

In [None]:
%%time
# ds = xr.open_mfdataset(path_sorted_yr, combine='nested', concat_dim='time', parallel=True)
ds = xr.open_mfdataset(path_sorted_yr, parallel=True,
                       decode_coords=False, 
                       # decode_cf = False,
                       concat_dim='time', combine="nested", data_vars="minimal", coords="minimal",  compat="override")



In [None]:
ds

### Old Testing

In [None]:
# %%time
# ds = xr.open_mfdataset(list_mon, 
#                        # parallel=True,
#                        # decode_coords=False, 
#                        # decode_cf = False,
#                        concat_dim='time', combine="nested", 
#                        data_vars="minimal", coords="minimal",  
#                        compat="override")


In [None]:
# Trying to speed up dask (did not work)
# ds = xr.open_mfdataset(path_sorted_mon, combine='nested',concat_dim='time', parallel=True)
# ds = xr.open_mfdataset(path_sorted_mon,  data_vars='minimal', coords='minimal', compat='override', parallel=True)
# ds = xr.open_mfdataset(path_sorted_mon, parallel=True,decode_cf=False)
# ds = xr.open_mfdataset(path_sorted_mon, parallel=True,decode_cf=False,decode_times=False)
# ds = xr.open_mfdataset(path_sorted_mon, parallel=True,decode_times=False)
# ds = xr.open_mfdataset(path_sorted_mon, parallel=True,data_vars="minimal", coords="minimal", compat="override", decode_times=False)
# ds = xr.open_mfdataset(path_sorted_mon, parallel=True,decode_coords=False)