# Processing GOES-16 data with Dask & AWS Fargate

This notebook demonstrates how to work with the GOES-16 rainfall rate product available as part of the AWS Public Dataset Program (https://registry.opendata.aws/noaa-goes/).

## Python Imports

In [1]:
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import matplotlib
import xarray as xr
import numpy as np
import s3fs
import fsspec
import dask
from dask.distributed import performance_report
from dask.distributed import Client

font = {'family' : 'sans-serif',
        'weight' : 'normal',
        'size'   : 18}
matplotlib.rc('font', **font)

## Scale out Dask Workers

In [2]:
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
    print("Please manually select your cluster")
cluster = clusters[0]
cluster

'arn:aws:ecs:us-east-1:816257832715:cluster/goes-Fargate-Dask-Cluster'

In [3]:
numWorkers=40
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])

# Set up the Dask Client to talk to our Fargate Dask Distributed Cluster

In [4]:
client = Client('Dask-Scheduler.local-dask:8786')
client

0,1
Client  Scheduler: tcp://Dask-Scheduler.local-dask:8786  Dashboard: http://Dask-Scheduler.local-dask:8787/status,Cluster  Workers: 40  Cores: 80  Memory: 280.00 GB


## Open an Example File and Check the Native Chunking

We want to chunk in a similar way for maximum performance

In [None]:
url = 's3://noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc'
ncfile = fsspec.open(url)
ds = xr.open_dataset(ncfile.open())

for coord in ds.coords:
    if coord == 't':
        print(coord)

## Open all the data for Julian days starting with 17* from 2020 as a single dataset

In [5]:
%%time

s3 = boto3.client("s3")
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket = 'noaa-goes16', Prefix = 'ABI-L2-RRQPEF/2020/')

@dask.delayed
def s3open(path):
    fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
    return fs.open(path)


files_mapper = [s3open("s3://noaa-goes16/" + file['Key']) for page in page_iterator for file in page['Contents']]

CPU times: user 4.81 s, sys: 100 ms, total: 4.92 s
Wall time: 7.66 s


In [6]:
# how many files did we load?

len(files_mapper)

26180

In [7]:
def drop_coords(ds):
    drop_vars = [v for v in ds.data_vars if not v == 'RRQPE']
    ds = ds.drop(drop_vars)
    drop_coords = [v for v in ds.coords if not v == 'x' and not v == 'y' and not v =='t']
    ds = ds.drop(drop_coords)
    return ds #.reset_coords(drop=True)

In [8]:
%%time

# for f in files_mapper:
#    f.seek(0)

with performance_report(filename="dask-open-dataset.html"):
    ds = xr.open_mfdataset(files_mapper, engine='h5netcdf', combine='nested', concat_dim='t', chunks={'x':452, 'y':452}, coords='minimal', compat='override', preprocess=drop_coords, parallel=True)
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


OSError: Timed out trying to connect to 'tcp://Dask-Scheduler.local-dask:8786' after 10 s: Timed out trying to connect to 'tcp://Dask-Scheduler.local-dask:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7fae40c3b690>: ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
ds.info

The `ds.info` output above shows us that there are four dimensions to the data: lat, lon, and time0; and two data variables: air_temperature_at_2_metres, and air_pressure_at_mean_sea_level.

## Convert units to mm/h to mm

In [None]:
ds['RRQPE'] = ds.RRQPE * (1.0 / 6.0)

## Calculate the mean 2-m air temperature for all January

In [None]:
# calculates the precip accumulation
precip_sum = ds['RRQPE'].sum(dim='t', skipna=True)

The expressions above didn’t actually compute anything. They just build the dask task graph. To do the computations, we call the `compute` method:

In [None]:
%%time

with performance_report(filename="dask-report3.html"):
    precip_sum = precip_sum.compute()

In [None]:
import pickle
with open('accum_2020.pickle', 'wb') as handle:
    pickle.dump(precip_sum, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
import pickle
with open('accum_2020.pickle', 'rb') as handle:
    precip_sum = pickle.load(handle)

### Plot Average Surface Temperature

In [None]:
precip_sum.plot(figsize=(10, 10), vmin=0.1, vmax=1000)
plt.title('Precipitation Accumulation')

## Cluster scale down

When we are temporarily done with the cluster we can scale it down to save on costs

In [None]:
numWorkers=0
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])