In [1]:
%matplotlib inline

In [2]:
!pip install -r requirements.txt



In [3]:
%%sh 
if ! az account show ; then
    az login --tenant fdpo.onmicrosoft.com 
fi


{
  "environmentName": "AzureCloud",
  "homeTenantId": "16b3c013-d300-468d-ac64-7eda0820b6d3",
  "id": "95bc36f4-2bb8-479c-abf2-f70e831fc224",
  "isDefault": true,
  "managedByTenants": [
    {
      "tenantId": "2f4a9838-26b7-47ee-be60-ccc1fdec5953"
    }
  ],
  "name": "MCAPS-Hybrid-REQ-53059-2023-luvinton",
  "state": "Enabled",
  "tenantId": "16b3c013-d300-468d-ac64-7eda0820b6d3",
  "user": {
    "name": "luvinton@microsoft.com",
    "type": "user"
  }
}


In [4]:
import adlfs
import dask
import fsspec
import h5netcdf
import io
import importlib
import matplotlib.pyplot as plt
import matplotlib
import math
import netCDF4
import os
import time
import xarray as xr
from dask.distributed import Client, performance_report, PipInstall, WorkerPlugin, Environ
from dask_cloudprovider.azure import AzureVMCluster
from sshfs import SSHFileSystem


In [5]:
font = {'family' : 'sans-serif',
        'weight' : 'normal',
        'size'   : 18}
matplotlib.rc('font', **font)
years = [str(year) for year in range(2000, 2022)]
months = ["{:02d}".format(month) for month in range(1,12)]
adls_account = os.environ['ADLS_ACCOUNT']
adls_key = os.environ['ADLS_KEY']
sftp_user = os.environ['SFTP_USER']
sftp_pass=os.environ['SFTP_PASS']
adls_root = 'abfss://era5-pds@{account}/'.format(account=adls_account)
sftp_root = '{account}.blob.core.windows.net'.format(account=adls_account)
fuse_root = '/tmp/era5-pds/'
https_root = 'https://{account}.blob.core.windows.net/era5-pds/'.format(account=adls_account)
https_sas = os.environ['HTTPS_SAS']
file_pattern = '{year}/{month}/data/air_temperature_at_2_metres.nc'
adls_files = [adls_root + file_pattern.format(year=year, month=month) for year in years for month in months][:1] #[:1] for testing
sftp_files = [file_pattern.format(year=year, month=month) for year in years for month in months][:1]
https_files = [https_root + file_pattern.format(year=year, month=month) + https_sas for year in years for month in months ][:12]
https_files[:5]

['https://mossadls.blob.core.windows.net/era5-pds/2000/01/data/air_temperature_at_2_metres.nc?sv=2021-10-04&st=2023-03-14T16%3A26%3A28Z&se=2023-08-15T15%3A26%3A00Z&sr=c&sp=rl&sig=IG8DV%2FqMv0xdtBcZQFFYE6ACDPdMG%2B3%2BPevb%2FTuLOWU%3D',
 'https://mossadls.blob.core.windows.net/era5-pds/2000/02/data/air_temperature_at_2_metres.nc?sv=2021-10-04&st=2023-03-14T16%3A26%3A28Z&se=2023-08-15T15%3A26%3A00Z&sr=c&sp=rl&sig=IG8DV%2FqMv0xdtBcZQFFYE6ACDPdMG%2B3%2BPevb%2FTuLOWU%3D',
 'https://mossadls.blob.core.windows.net/era5-pds/2000/03/data/air_temperature_at_2_metres.nc?sv=2021-10-04&st=2023-03-14T16%3A26%3A28Z&se=2023-08-15T15%3A26%3A00Z&sr=c&sp=rl&sig=IG8DV%2FqMv0xdtBcZQFFYE6ACDPdMG%2B3%2BPevb%2FTuLOWU%3D',
 'https://mossadls.blob.core.windows.net/era5-pds/2000/04/data/air_temperature_at_2_metres.nc?sv=2021-10-04&st=2023-03-14T16%3A26%3A28Z&se=2023-08-15T15%3A26%3A00Z&sr=c&sp=rl&sig=IG8DV%2FqMv0xdtBcZQFFYE6ACDPdMG%2B3%2BPevb%2FTuLOWU%3D',
 'https://mossadls.blob.core.windows.net/era5-pds/2000/0

In [6]:
cluster_spec = {
        'location': 'eastus',
        'resource_group': 'mo_dev',
        'vnet': 'mo_dev',
        'security_group': 'mo_dev-default-nsg-eastus',
        'n_workers': math.ceil(len(https_files) / 2),
        'disk_size': 100,
        'vm_size': 'Standard_D2s_v3', #default
        'scheduler_vm_size': 'Standard_D4s_v3' #bigger
}

In [7]:
fs = fsspec.open(https_files[0])
ncfile = fs.open()
ds = xr.open_dataset(ncfile)
ds.air_temperature_at_2_metres.encoding

{'chunksizes': (24, 100, 100),
 'fletcher32': False,
 'shuffle': True,
 'zlib': True,
 'complevel': 4,
 'source': '<File-like object HTTPFileSystem, https://mossadls.blob.core.windows.net/era5-pds/2000/01/data/air_temperature_at_2_metres.nc?sv=2021-10-04&st=2023-03-14T16%3A26%3A28Z&se=2023-08-15T15%3A26%3A00Z&sr=c&sp=rl&sig=IG8DV%2FqMv0xdtBcZQFFYE6ACDPdMG%2B3%2BPevb%2FTuLOWU%3D>',
 'original_shape': (744, 721, 1440),
 'dtype': dtype('<f4'),
 '_FillValue': 9.96921e+36}

In [8]:
from dask_cloudprovider.azure import AzureVMCluster
cluster = AzureVMCluster(**cluster_spec)

Creating scheduler instance
Assigned public IP
Network interface ready
Creating VM
Created VM dask-c5a43ecd-scheduler
Waiting for scheduler to run at 13.68.138.156:8786
Scheduler is running


  next(self.gen)


Creating worker instance
Network interface ready
Creating VM
Created VM dask-c5a43ecd-worker-e5c2c00c


In [9]:
client = Client(cluster)


+---------+----------------+----------------+---------+
| Package | Client         | Scheduler      | Workers |
+---------+----------------+----------------+---------+
| python  | 3.10.9.final.0 | 3.8.16.final.0 | None    |
+---------+----------------+----------------+---------+


In [10]:
#check that workers have joined cluster in UI
client

0,1
Connection method: Cluster object,Cluster type: dask_cloudprovider.AzureVMCluster
Dashboard: http://13.68.138.156:8787/status,

0,1
Dashboard: http://13.68.138.156:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tls://10.0.0.4:8786,Workers: 0
Dashboard: http://10.0.0.4:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [11]:
while len(client.scheduler_info()['workers']) != cluster_spec['n_workers']:
    print(f'awaiting workers: {len(client.scheduler_info()["workers"])}/{cluster_spec["n_workers"]}')
    time.sleep(10)


awaiting workers: 0/1
awaiting workers: 0/1
awaiting workers: 0/1
awaiting workers: 0/1


KeyboardInterrupt: 

In [None]:
packages = []
with open('requirements_worker.txt', 'r') as r_file:
    packages = [line for line in r_file]

worker_deps_plugin = PipInstall(packages=packages, pip_options=["--upgrade"])

In [None]:
client.register_worker_plugin(worker_deps_plugin)

In [None]:
@dask.delayed
def https_open(path):
    return fsspec.open(path).open()

files_mapper_https = [https_open(file) for file in sftp_files]

In [None]:
@dask.delayed
def adl_open(path):
    file = fsspec.open(path, account_name=adls_account, account_key=adls_key).open()
    return io.BytesIO(file.read())

files_mapper = [adl_open(file) for file in adls_files]

In [None]:
ds = xr.open_mfdataset(files_mapper_https, engine='h5netcdf', chunks={'lon':200,'lat':200,'time0':720}, concat_dim='time0', combine='nested', coords='minimal', compat='override', parallel=True)

In [None]:
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))
ds.info

In [None]:
temp_mean = ds['air_temperature_at_2_metres'].mean(dim='time0').compute()

In [None]:
temp_mean.plot(figsize=(20, 10))
plt.title('2000-2022 Mean 2-m Air Temperature')

In [None]:
client.close()
cluster.close()

In [None]:
fsspec.available_protocols()