# Notebook to process data in SMCE helio-public S3 bucket using Dask

This is a simple example showing how to get a list of FITS files, run them through Dask workers to pull them from disk and examine the header keyword(s). 

You can use this notebook to test out various parameters you might feed to Dask; Consider the 'batch size', number of workers, number of cores per worker and memory per worker. Use the dashboard link to inspect how Dask is performing. Try both manual and automatic scaling strategies. See if you can get it to process 100 files in 20 sec or less!

## First:  What is S3? 

S3 stands for "Simple Storage Service," which provides object storage for for AWS.  https://aws.amazon.com/s3/ 

It allows people to query and access data from a common location reference.  The buckets can be made <a href="https://stackoverflow.com/q/16784052">web accessible to users outside of daskhub</a> if web access is enabled.    

S3 buckets are individual storage elements. 

## Accessing S3 buckets

To <a href= "https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3/ls.html">get a list of the S3 buckets</a> on the SMCE Daskhub, enter this at a terminal prompt : <br>
`aws s3 ls`

To view the contents of a specific bucket, reference it with s3:// <br>
`aws s3 ls s3://helio-public/`
>            PRE SDO/
>            PRE SOHO/

(Note: "PRE" stands for prefix, so SDO/ is an AWS prefix with name SDO.) 
<hr>

The external reference for this bucket is https://helio-public.s3.us-east-1.amazonaws.com/

## Basic commands using S3 buckets

To create a new directory, just reference it:<br>
`aws s3 ls s3://helio-public/yourname/yourdir`

then you can copy to the bucket as if it was a unix folder:<br>
`aws s3 cp yourfile s3://helio-public/yourname/yourdir`

if you need access to a bucket that has restricted access, you have to run aws-mfa first:<br>
`~/aws-mfa default`

you may need to change it to have execute permission first:<br>
`chmod 755 ~/aws-mfa`

In [1]:
import boto3
import dask
import io
import logging
import s3fs

from astropy.io import fits
from dask.distributed import Client
from os import listdir
from os.path import isfile, join
from re import search

## 0. Configuration 

In [2]:
# name of the bucket to upload to
bucket_name = 'helio-public'

# location in the bucket to use
bucket_path = '/SDO/AIA/'

# number of workers to use, for automatic scaling, our max number
n_workers = 4

# memory per worker (in Gb)
w_memory = 2

# cores per worker
w_cores = 2

# number of files to test against
n_files = 100

# Number of files we release to be worked on by all workers at a time
# the higher the number the more files being processed concurrently, but also
# the greater the memory consumed. 
batch_size = 50

# use Manual (if False, then uses Automatic scaling)
use_manual_scaling = True

## 1. Initialize the cluster and assign the client to the cluster, display the cluster widget

In [3]:
from dask_gateway import Gateway, GatewayCluster
gateway = Gateway()
options = gateway.cluster_options()

# We're setting some defaults here just for grins... 
# I like the pangeo/base-notebook image for the workers since it has almost every library you'd need on a worker
# In our environment, without setting these, the widget will default to the same image that the notebook itself is running, 
# as well as 2 cores and 4GB memory per worker

options.worker_cores=w_cores
options.worker_memory=w_memory
# options

In [4]:
cluster = gateway.new_cluster(options)
client = cluster.get_client()

if use_manual_scaling:
    
    # manual scaling (n_workers defined above)
    cluster.scale(n_workers)
    
else:
    
    # Adaptively scale between 1 and n_workers (the max)
    cluster.adapt(minimum=1, maximum=n_workers)

# uncomment this if you want to use the GUI
#cluster

In [5]:
# create client, show url we can go to to monitor progress
client = Client(cluster)
client

0,1
Client  Scheduler: gateway://traefik-daskhub-dask-gateway.daskhub:80/daskhub.8f7fddba87cb4e7daae1e2cfc2e2c531  Dashboard: /services/dask-gateway/clusters/daskhub.8f7fddba87cb4e7daae1e2cfc2e2c531/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## 2. Scan data from bucket and make a simple list of file names

In [6]:
# initialize connection to S3 bucket
s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)

In [7]:
# get our list of files/s3 objects

# Iterates through all the objects, doing the pagination for you. Each obj
# is an ObjectSummary, so it doesn't contain the body. You'll need to call
# get to get the whole body.
s3_files = []
for obj in bucket.objects.all():
    key = obj.key

    if search ('fits', key):
        s3_files.append(obj.key)

s3_files[0]

'SDO/AIA/AIA_L4_20141018_000001_94.fits'

## 3. Define some routines we will use for doing work with Dask

In [9]:
def open_fits_s3 (s3_file_name:str, bucket_name:str)->object:
    
    """ Open a FITS file on an S3 bucket, pass back a binary blob of file info.
    """
     
    fs = s3fs.S3FileSystem()
    
    with fs.open(bucket_name+'/'+s3_file_name, 'rb') as f:
        #fits_hdul = fits.open(io.BytesIO(f.read()))
        # fits_hdul.info()
    
        # return bytes
        return f.read()

    return None

def get_header(f_bytes:object)->str:
    
    """ Marshal the binary blob into a astropy FITS object, then read the header and pass back 
    """
    try: 
        hdul = fits.open(io.BytesIO(f_bytes))
        return hdul[0].header
    except Exception as ex:
        # bad file? Some of these are missing the END keyword!!
        # for now, lets simply side-step it and move on
        pass
    
    return None
    
def work_on_data (client:dask.distributed.client.Client, bucket_name:str, files:list=[])->int:
    
    """ 
    Main routine which Dask will use to 'do work'. Each worker will run this.
    """
    
    # pull files from S3 as bytes
    f_bytes_list = client.map(open_fits_s3, files, bucket_name=bucket_name)
    
    # read bytes as FITS file, grab header
    hduls = client.map(get_header, f_bytes_list)
    
    # trigger distributed task, marshall result back to local memory
    headers = client.gather(hduls)
    
    # return a result of some kind, lets parse gathered headers to return value of the 
    # AIL1FILE keyword
    return [hdr['AIL1FILE'] for hdr in headers if hdr != None]


## 4. Do the cloud processing, using Dask to 'burst' into other VMs
Using our gathered list of FITS files, chunk it out in batches and provide file list chunks to the workers

In [None]:
%%time
def chunks(lst, n):
    """ program to divide our file list into chunks for each worker """
    n = max(1, n)
    return (lst[i:i+n] for i in range(0, len(lst), n))

print (f"workers: {n_workers}, cores/worker:{w_cores}, mem/worker: {w_memory}")

for files_to_process in chunks(s3_files[:n_files], batch_size):

    r = work_on_data(client, bucket_name, files_to_process)
    print (f"client:%s Finished %s files" % (client,len(r)))


workers: 4, cores/worker:2, mem/worker: 2


In [None]:
print (r[0])