Skip to content

Commit

Permalink
added lambda support for query and point selection
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed May 5, 2020
1 parent 228c190 commit 3a901f2
Show file tree
Hide file tree
Showing 23 changed files with 784 additions and 1,798 deletions.
36 changes: 33 additions & 3 deletions awslambda/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,45 @@ ZIPFILE="function.zip"
if [ -f ${ZIPFILE} ]; then
rm ${ZIPFILE}
fi

run_pyflakes () {
SRC=$1
pyflakes="../pyflakes.sh"
echo "running pyflakes on $SRC files"
if [ $(${pyflakes} -count ${SRC}) -ge 1 ]; then
echo "pyflakes errors in ${SRC}..."
${pyflakes} $SRC
exit 1
fi
}

dolint=1
if [ $# -gt 0 ]; then
if [ $1 == "-h" ] || [ $1 == "--help" ]; then
echo "Usage: build.sh [--nolint]"
exit 1
fi
if [ $1 == "--nolint" ]; then
echo "no pyflakes"
dolint=0
fi
fi

if [ $dolint ]; then
echo "dolint"
run_pyflakes "chunkread"
run_pyflakes "chunkread/hsds"
run_pyflakes "chunkread/hsds/util"
fi


zip ${ZIPFILE} chunkread/lambda_function.py
zip ${ZIPFILE} chunkread/__init__.py
zip ${ZIPFILE} chunkread/hsds/*.py
zip ${ZIPFILE} chunkread/hsds/util/*.py

pip install --target ./package numpy
#pip install --target ./package aiobotocore
#pip install --target ./package aiohttp
#pip install --target ./package numba
#pip install --target ./package numba # this will make the image too large...

cd package
zip -r9 ${OLDPWD}/function.zip .
Expand Down
130 changes: 99 additions & 31 deletions awslambda/chunkread/hsds/chunkread.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import time
import base64
import numpy as np
from . import config
from . import hsds_logger as log
from .util.dsetUtil import getChunkLayout, getDeflateLevel, isShuffle
from .util.hdf5dtype import createDataType, getItemSize
from .util.chunkUtil import getChunkSize, chunkReadSelection, chunkReadPoints
from .util.chunkUtil import getChunkSize, chunkReadSelection, chunkReadPoints, chunkQuery
from .util.idUtil import getS3Key
from .util.storUtil import getStorBytes
from .util.arrayUtil import bytesToArray, arrayToBytes
Expand All @@ -16,7 +17,41 @@ def get_app():
app["bucket_name"] = config.get("bucket_name")
return app

def get_chunk(app, chunk_id, dset_json, bucket=None, s3path=None, s3offset=0, s3size=0):
def get_chunk(app, params):

if "chunk_id" not in params:
msg = "chunk_id not in params"
log.warn(msg)
raise KeyError()
chunk_id = params["chunk_id"]

if "dset_json" not in params:
msg = "dset_json not in params"
log.warn(msg)
raise KeyError()
dset_json = params["dset_json"]

if "bucket" in params:
bucket = params["bucket"]
else:
bucket = config.get("bucket_name")
if "s3path" in params:
s3path = params["s3path"]
else:
s3path = None
if not bucket and not s3path:
msg = "bucket or s3path not specified"
log.error(msg)
raise KeyError()

if "s3offset" in params:
s3offset = params["s3offset"]
else:
s3offset = 0
if "s3size" in params:
s3size = params["s3size"]
else:
s3size = 0

chunk_arr = None
chunk_dims = getChunkLayout(dset_json)
Expand Down Expand Up @@ -64,6 +99,21 @@ def get_chunk(app, chunk_id, dset_json, bucket=None, s3path=None, s3offset=0, s3

# read hyperslab from chunk
def read_hyperslab(app, params):
chunk_arr = get_chunk(app, params)

if "slices" in params:
arr = chunkReadSelection(chunk_arr, slices=params["slices"])
else:
arr = chunk_arr

bdata = arrayToBytes(arr)
base64data = base64.b64encode(bdata)

return base64data.decode('ascii')

#
# read point selection from chunk
def read_points(app, params):
if "chunk_id" not in params:
msg = "chunk_id not in params"
log.warn(msg)
Expand All @@ -76,30 +126,42 @@ def read_hyperslab(app, params):
raise KeyError()
dset_json = params["dset_json"]

if "bucket" in params:
bucket = params["bucket"]
else:
bucket = config.get("bucket_name")
if not bucket:
msg = "bucket not specified"
if "point_arr" not in params:
msg = "point_arr not in params"
log.warn(msg)
raise KeyError()
point_data_b64 = params["point_arr"]
point_data = base64.b64decode(point_data_b64)

chunk_arr = get_chunk(app, chunk_id, dset_json, bucket=bucket)

if "slices" in params:
arr = chunkReadSelection(chunk_arr, slices=params["slices"])
else:
arr = chunk_arr
if "num_points" not in params:
msg = "num_points not in params"
log.warn(msg)
raise KeyError()
num_points = params["num_points"]

chunk_arr = get_chunk(app, params)
rank = len(chunk_arr.shape)

point_dt = np.dtype('uint64')
point_shape = (num_points, rank)
point_arr = bytesToArray(point_data, point_dt, point_shape)

chunk_layout = getChunkLayout(dset_json)

point_shape = (num_points, rank)

point_arr = bytesToArray(point_data, point_dt, point_shape)

arr = chunkReadPoints(chunk_id=chunk_id, chunk_layout=chunk_layout, chunk_arr=chunk_arr, point_arr=point_arr)

bdata = arrayToBytes(arr)
base64data = base64.b64encode(bdata)

return base64data.decode('ascii')

#
# read point selection from chunk
def read_points(app, params):
# query chunk contents
def read_query(app, params):
if "chunk_id" not in params:
msg = "chunk_id not in params"
log.warn(msg)
Expand All @@ -112,26 +174,32 @@ def read_points(app, params):
raise KeyError()
dset_json = params["dset_json"]

if "bucket" in params:
bucket = params["bucket"]
else:
bucket = config.get("bucket_name")
if not bucket:
msg = "bucket not specified"
log.warn(msg)
if "query" not in params:
log.error("no query specified")
raise KeyError()

if "point_arr" not in params:
msg = "point_arr not in params"
log.warn(msg)
raise KeyError()
query = params["query"]

point_arr = params["point_arr"]
log.debug(f"read_query -- chunk_id: {chunk_id} query: {query}")

chunk_layout = getChunkLayout(dset_json)

chunk_arr = get_chunk(app, chunk_id, dset_json, bucket=bucket)
chunk_arr = get_chunk(app, params)

arr = chunkReadPoints(chunk_id=chunk_id, chunk_layout=chunk_layout, chunk_arr=chunk_arr, point_arr=point_arr)
if "slices" in params:
selection = params["slices"]
else:
selection = None

if "limit" in params:
limit = params["limit"]
else:
limit=None

read_resp = chunkQuery(chunk_id=chunk_id, chunk_layout=chunk_layout, chunk_arr=chunk_arr, slices=selection,
query=query, limit=limit, return_json=True)

log.debug(f"read_query -- returning: {read_resp}")

return read_resp

return arr
2 changes: 1 addition & 1 deletion awslambda/chunkread/hsds/hsds_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
#
import os


app = None # global app handle
if "LOG_LEVEL" in os.environ:
log_level = os.environ["LOG_LEVEL"]
else:
log_level = "DEBUG"

def debug(msg):
print("log_level:", log_level)
if log_level == "DEBUG":
print("DEBUG> " + msg)
if app:
Expand Down

0 comments on commit 3a901f2

Please sign in to comment.