# Benchmarking: VIKING20X workload

In [None]:
import xarray
import dask.distributed

In [None]:
import subprocess

## Preparations
We need a local SSH key pair for SSH-based worker spawning to work properly.

In [None]:
!ssh-keygen -b 4096 -t rsa -f id_rsa_esmvfc -C esmvfc -P ''

In [None]:
!ls id_rsa* -la

In [None]:
!cp ${HOME}/.ssh/authorized_keys ${HOME}/.ssh/authorized_keys_$(date '+%s')

In [None]:
!cat id_rsa_esmvfc.pub >> ${HOME}/.ssh/authorized_keys

## Open Dask cluster

In [None]:
# This instantiates a Dask scheduler without Dask workers, which will be spawned manually.
cluster = dask.distributed.LocalCluster(
    n_workers=0, ip='0.0.0.0'
)

In [None]:
client = dask.distributed.Client(cluster)

In [None]:
client.scheduler_info

In [None]:
client

Prepare Dask worker machines to be added,

In [None]:
scheduler_ip='10.199.124.103:44545' # see client.scheduler_info above!
nthreads='7'; memory_limit='32GiB' # should be comparable to gcloud Dask worker machines!

In [None]:
worker_targets = ["khoeflich@scalc{:02d}.geomar.de".format(machine_id) for machine_id in range(4,14)]

In [None]:
import random
random.seed(56)

In [None]:
for _ in range(89):
    random.shuffle(worker_targets)

In [None]:
worker_targets

In [None]:
#worker_targets = ['khoeflich@scalc04.geomar.de']

In [None]:
def add_dask_worker(ssh_target, scheduler_ip, nthreads, memory_limit):
    worker_command = "ssh "+ssh_target+" -i id_rsa_esmvfc singularity run --bind /data/user/khoeflich/github/ESM-VFC-cloud-project/ /home/khoeflich/ESM-VFC-cloud-project/performance/pangeo-notebook_2021.07.17.sif dask-worker "+scheduler_ip+" --nthreads "+nthreads+" --memory-limit "+memory_limit
    subprocess.Popen(worker_command, shell=True)

In [None]:
#for ssh_target in worker_targets:
#    add_dask_worker(ssh_target, scheduler_ip, nthreads)

In [None]:
len(cluster.scheduler_info.get('workers').keys())

## Specify VIKING20X dataset

In [None]:
ds_tgrid = xarray.open_zarr('/data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_T.zarr')

In [None]:
ds_ugrid = xarray.open_zarr('/data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_U.zarr')

In [None]:
ds_vgrid = xarray.open_zarr('/data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_V.zarr')

Uncompressed size,

In [None]:
print(ds_tgrid.nbytes/1e9, 'in GB')
print(ds_ugrid.nbytes/1e9, 'in GB')
print(ds_vgrid.nbytes/1e9, 'in GB')

Compressed size,

In [None]:
!du -sh /data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_T.zarr/
!du -sh /data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_U.zarr/
!du -sh /data/user/khoeflich/github/ESM-VFC-cloud-project/VIKING20X.L46-KFS003_1m_grid_V.zarr/

## Run performance experiment

In [None]:
import time, datetime

exp_name='scalc'
remaining_worker_targets = worker_targets.copy()
no_of_realizations = 25

now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
walltime = ['machine,workers,walltime']

# Measuring.

while len(remaining_worker_targets) > 0:
    
    # Start additional workers.

    for _ in range(2):
        add_dask_worker(remaining_worker_targets.pop(0), scheduler_ip, nthreads, memory_limit)
        
    # Wait for workers to be available.
    
    while len(cluster.scheduler_info.get('workers').keys()) < len(worker_targets)-len(remaining_worker_targets):
        time.sleep(5)
    
    # Warming up.
    
    for _ in range(3):
        ds_tgrid['votemper'].mean(['deptht', 'time_counter', 'x', 'y']).compute()
        ds_tgrid['vosaline'].mean(['deptht', 'time_counter', 'x', 'y']).compute()
        ds_ugrid['vozocrtx'].mean(['depthu', 'time_counter', 'x', 'y']).compute()
        ds_vgrid['vomecrty'].mean(['depthv', 'time_counter', 'x', 'y']).compute()

    # Do the calculation.

    for _ in range(no_of_realizations):
        
        workers=len(cluster.scheduler_info.get('workers').keys())

        start_time_in_sec = time.time() # time in seconds

        ds_tgrid['votemper'].mean(['deptht', 'time_counter', 'x', 'y']).compute()
        ds_tgrid['vosaline'].mean(['deptht', 'time_counter', 'x', 'y']).compute()
        ds_ugrid['vozocrtx'].mean(['depthu', 'time_counter', 'x', 'y']).compute()
        ds_vgrid['vomecrty'].mean(['depthv', 'time_counter', 'x', 'y']).compute()

        end_time_in_sec = time.time() # time in seconds

        output_row = "{},{},{}".format(exp_name, workers, end_time_in_sec-start_time_in_sec)
        walltime.append(output_row)

In [None]:
client.close(); cluster.close()

Write results to disk,

In [None]:
import csv

with open('./viking20x_logs/'+now+'_'+exp_name+'.log', 'w') as file:
    wr = csv.writer(file, delimiter='\n')
    wr.writerow(walltime)

## Python environment

In [None]:
pip list

In [None]:
!conda list --explicit