# Processing Example for AWS illustrating a simple Algorithm to Test Infrastucture

This is based on using an environment simliar to the one that is created from the ASDI CMPI example: https://github.com/awslabs/amazon-asdi/tree/main/examples/cmip6 (this needs to run in us-east-1 , I think as the CMPI references failed from London)

If you want to run this you may need to install some addtional libraries using ```conda install``` from a Terminal

In [None]:
%matplotlib inline
import xarray as xr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import intake
import boto3
import botocore
import datetime
import s3fs
import fsspec
import dask
#import sys
#import os
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from dask.distributed import performance_report, Client, progress, LocalCluster

font = {'family' : 'sans-serif',
        'weight' : 'normal',
        'size'   : 18}
matplotlib.rc('font', **font)

In [None]:
import json
import sys
import boto3
import os
from boto3.dynamodb.conditions import Key, Attr

In [None]:
#some paramters for regridding
regrid_lon=np.arange(0.0,360.0,0.1)
regrid_lat=np.arange(-90.0,90.0,0.1)
regrid_method='slinear'

# Connect to Dask cluster scheduler


In [None]:
import dask.array as da
from dask_worker_pools import pool, propagate_pools, visualize_pools


In [None]:
from dask.distributed import Client
import lz4
# Client.get_versions('self', check=True)
client = Client('Dask-Scheduler.local-dask:8786')
# client = Client('Dask-Scheduler.local-dask:8786',serializers=['dask', 'pickle'],
#                deserializers=['dask', 'pickle']
#               )

In [None]:
client.scheduler_info()['workers']

# Get the Climate Prediction

Using the ASDI CMPI6 data which has a ZARR index we can select some data from a scenario. (this is adapted from examples in https://github.com/awslabs/amazon-asdi/tree/main/examples )

In [None]:
catalog = intake.open_esm_datastore('https://cmip6-pds.s3.amazonaws.com/pangeo-cmip6.json')

In [None]:
%store -r activity_id
%store -r variable_id
%store -r table_id
variable_ids = variable_id # tas is air temperature at 2m above surface
table_id = table_id # Monthly data from Atmosphere - would really like this to be daily, but run out of memeory in client ('day' is the id)
grid = 'gn' #

# Records for Institution, experiment, and source_id are stored in https://github.com/WCRP-CMIP/CMIP6_CVs
experiment_id = 'ssp245' #['ssp126', 'ssp245', 'ssp370', 'ssp585'] 
activity_ids = activity_id # Search Scenarios & CMIP activities only
institution_id = 'MOHC' #just looking at our data in this example

print(activity_id)
print(variable_id)
print(table_id)

In [None]:
res = catalog.search(activity_id=activity_ids, experiment_id=experiment_id, variable_id=variable_ids, grid_label=grid, table_id=table_id, institution_id=institution_id)
display(res.df)

In [None]:
session = boto3.session.Session()
my_region = session.region_name

In [None]:
%store -r host
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, my_region)
index_name = 'cmip6-pds' ##Update Index name as needed

In [None]:
opensearch_client = OpenSearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
        )

def search_cmip_query(q):
    
    queryy = {
      'size': 5,
      'query': {
        'multi_match': {
          'query': q,
            'fields': ['fileName']
        }
      }
    }

    respons = opensearch_client.search(
        body = queryy,
        index = index_name
    )
    
    res = [i['_source']['fileName'] for i in respons['hits']['hits']]
    d_pool = [d['_source']['dask_pool'] for d in respons['hits']['hits']]
    regio = [f['_source']['region'] for f in respons['hits']['hits']]
    res_fil = list(set([r.split(q.split('/')[-2])[0]+q.split('/')[-2]+"/" for r in res]))[0]
    regio = list(set(regio))[0]
    d_pool = list(set(d_pool))[0]
    return res_fil, regio, d_pool

In [None]:
res.df['zstore']

In [None]:
data_region = []
dask_pool = []
index_name

In [None]:
for s3_path in res.df['zstore']:
#     print(s3_path)
    query_param = s3_path.split(index_name+'/')[1]
    local_pth, regi, d_pool = search_cmip_query(query_param)
#     print(regi)
    data_region.append(regi)
    dask_pool.append(d_pool)
    res.df['zstore'] = res.df['zstore'].replace([s3_path], local_pth)

data_region = list(set(data_region))[0]
region_dask_pool = list(set(dask_pool))[0]

In [None]:

print("Region: {}".format(data_region))
print("Dask Pool: {}".format(dask_pool))

In [None]:
display(res.df)

In [None]:
files_mapper = res.df['zstore'].tolist()


In [None]:
files_mapper

In [None]:
prediction_pool_region = 'us-west-2'
%store prediction_pool_region

In [None]:


%%time
with pool(prediction_pool_region):
    datasets = xr.open_mfdataset(files_mapper, engine='zarr', parallel=True, decode_times=True, consolidated=True)
    #datasets = res.to_dataset_dict(zarr_kwargs={'consolidated': True, 'decode_times': True})

In [None]:
predictive_data_set = xr.Dataset()


# just select the tail of the date over time (2090-2100)
for i in datasets:
    ds = datasets[i]
    print(ds)
    total_times = ds['time'].size
    start_index = total_times - (10*12) #this is in months. REally want it to be days
    ds2 = ds.isel(time=np.arange(start_index,total_times)) #last 10 years of data
    predictive_data_set = xr.merge([predictive_data_set, ds2], compat='override')

predictive_data_set
%store predictive_data_set



In [None]:
cost_information = '\033[1m' + "The below information is for demonstration only." + '\033[0m' + " \nThe total cost for this run was: $X \nThe amount you have remaining in your account is: $Y \nTo upgrade, click this link"
%store cost_information