# Modify the chunks for an existing file

In [2]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [3]:
import podpac
from podpac import Node
from podpac import alglib
import traitlets as tl
import logging
import time
import inspect
logger = logging.getLogger('podpac')
logger.setLevel(logging.INFO)

In [7]:
SRC_FILE = r's3://podpac-drought-monitor-s3/SMAP_CATS_2.zarr'
DST_FILE = r's3://podpac-drought-monitor-s3/SMAP_CATS_CHUNKED.zarr'
CHUNKS = {'lat': 256, 'lon': 256, 'time': 16}

In [5]:
settings = {
    "FUNCTION_NAME": "podpac-drought-monitor-lambda-compute-stats",
    "S3_BUCKET_NAME": "podpac-drought-monitor-s3",
    "FUNCTION_ROLE_NAME": "podpac-drought-monitor-role",
    "MULTITHREADING": True,
    "N_THREADS": 64,
    "AWS_ACCESS_KEY_ID": podpac.settings["AWS_ACCESS_KEY_ID"],
    "AWS_SECRET_ACCESS_KEY": podpac.settings["AWS_SECRET_ACCESS_KEY"],
    "AWS_REGION_NAME": podpac.settings["AWS_REGION_NAME"],
    "AWS_BUDGET_AMOUNT": 100,
    "AWS_BUDGET_EMAIL": podpac.settings["AWS_BUDGET_EMAIL"],
    "FUNCTION_DEPENDENCIES_KEY": "podpac_deps.zip",
}
podpac.settings.update(settings)
# settings

In [6]:
N_WORKERS = 25

In [8]:
node = podpac.data.Zarr(
    source=SRC_FILE,
    time_key='time',
    lat_key='lat',
    lon_key='lon',
    skip_validation=True,
)
# node

In [10]:
coords_mp = node.coordinates#.select({"lat": [], "lon": []})
coords_mp

Coordinates (EPSG:4326)
	lat: ArrayCoordinates1d(lat): Bounds[-84.65642547607422, 84.65641784667969], N[1624], ctype['midpoint']
	lon: ArrayCoordinates1d(lon): Bounds[-179.95314025878906, 179.95314025878906], N[3856], ctype['midpoint']
	time: ArrayCoordinates1d(time): Bounds[1.0, 366.0], N[366], ctype['midpoint']

In [11]:
node_p = podpac.managers.Lambda(
    source=node,
    eval_settings=settings,
    eval_timeout=1.25
)

In [12]:
node_p.get_budget()
node_p.describe()


Lambda Node (staged)
    Function
        Name: podpac-drought-monitor-lambda-compute-stats
        Description: PODPAC Lambda Function (https://podpac.org)
        ARN: None
        Triggers: ['eval']
        Handler: handler.handler
        Environment Variables: {}
        Timeout: 600 seconds
        Memory: 2048 MB
        Tags: {}
        Source Dist: s3://podpac-dist/dev/podpac_dist.zip
        Source Dependencies: s3://podpac-dist/dev/podpac_deps.zip
        Last Modified: None
        Version: None
        Restrict Evaluation: []

    S3
        Bucket: podpac-drought-monitor-s3
        Tags: {}
        Input Folder: input/
        Output Folder: output/

    Role
        Name: podpac-drought-monitor-role
        Description: PODPAC Lambda Role
        ARN: None
        Policy Document: {'Version': '2012-10-17', 'Statement': [{'Effect': 'Allow', 'Action': ['s3:PutObject', 's3:GetObject', 's3:DeleteObject', 's3:ReplicateObject', 's3:ListBucket', 's3:ListMultipartUploadParts', 

In [13]:
node_mp = podpac.core.managers.parallel.ParallelAsyncOutputZarr(
    source=node_p, 
    number_of_workers=N_WORKERS,
    chunks=CHUNKS,
    zarr_file=DST_FILE, init_file_mode='a',
    skip_existing=False,  # Set this to True on subsequent runs
    list_dir=False,  # Set this to True on subsequent runs
    aws_config_kwargs=dict(max_pool_connections=N_WORKERS*10),  # This is needed to avoid a warning about the number of open connections
    start_i = 0  # I was developing the software as I was doing this computation, so I made improvements along the way and did not want to start over
)
node_mp

<ParallelAsyncOutputZarr(source=Lambda (staged)
	Name: podpac-drought-monitor-lambda-compute-stats
	Source: Zarr
	Bucket: podpac-drought-monitor-s3
	Triggers: ['eval']
	Role: podpac-drought-monitor-role
, number_of_workers=25, chunks={'lat': 256, 'lon': 256, 'time': 12})>

In [None]:
# time.sleep(3600 * 2)
o_mp = node_mp.eval(coords_mp)

INFO:podpac.core.managers.parallel:Submitting source 7
INFO:podpac.core.managers.parallel:Submitting source 21
INFO:podpac.core.managers.parallel:Submitting source 10
INFO:podpac.core.managers.parallel:Submitting source 3
INFO:podpac.core.managers.parallel:Submitting source 19
INFO:podpac.core.managers.parallel:Submitting source 23
INFO:podpac.core.managers.parallel:Submitting source 17
INFO:podpac.core.managers.parallel:Submitting source 11
INFO:podpac.core.managers.parallel:Submitting source 14
INFO:podpac.core.managers.parallel:Submitting source 20
INFO:podpac.core.managers.parallel:Submitting source 6
INFO:podpac.core.managers.parallel:Submitting source 5
INFO:podpac.core.managers.parallel:Added all chunks to worker pool. Now waiting for results.
INFO:podpac.core.managers.parallel:(0:00:00): Waiting for results: 1 / 3472
INFO:podpac.core.managers.parallel:Submitting source 9
INFO:podpac.core.managers.parallel:Submitting source 24
INFO:podpac.core.managers.parallel:Submitting source

In [None]:
# Check the results
out_f = podpac.data.Zarr(
    source=DST_FILE,
)
out_f

In [None]:
d0 = cats.dataset['d0'][:6, :6, :]