Skip to content
This repository has been archived by the owner on Jun 7, 2023. It is now read-only.

Commit

Permalink
SQS notifications.
Browse files Browse the repository at this point in the history
  • Loading branch information
lossyrob committed Sep 24, 2015
1 parent 3eb956c commit c59bfe3
Show file tree
Hide file tree
Showing 19 changed files with 174 additions and 192 deletions.
20 changes: 20 additions & 0 deletions README.md
Expand Up @@ -91,6 +91,26 @@ aws s3 cp target/scala-2.10/... s3://oam-tiler-emr/mosaic.jar

There are shell scripts that will run this against EM in the root of the repository. It uses the `awscli` tool to run EMR commands.

#### upload-code.sh

This will build the JAR file for the mosaic step and upload the `chunk.py` and `mosaic.jar` to the appropriate place in s3.

#### launch-cluster.sh

This will launch a long-running EMR cluster to run steps against. This is useful for testing, when you want to avoid waiting for
a cluster to be bootstrapped while running jobs in a row.

This will output a cluster id, which should be set into the `add-step*` scripts so that they run against the correct cluster.

#### add-steps.sh, add-step1.sh, add-step2.sh

This will run the tiling process against a long-running cluster deployed with `launch-cluster.sh`. Just make sure to set the correct
`CLUSTER_ID` at the top. Also, make sure to set the `NUM_EXECUTORS` and `EXECUTOR_MEMORY` correctly, based on the number of nodes in
the cluster (one executor per core per worker node). `2304m` is the amount of memory to give one executor for the 4-core `m3.xlarge` nodes.

Also, you'll need to set `REQUEST_URI` and `WORKSPACE_URI`, which set the request for the chunk step and the location of the workspace for
the second step, respectively.

## Timing Notes:

10 m3.xlarge worker nodes and 1 m3.xlarge master at spot prices (around 5 cents an hour)
Expand Down
7 changes: 5 additions & 2 deletions add-step1.sh
@@ -1,11 +1,14 @@
CLUSTER_ID=j-1SBFC4U63PC79
CLUSTER_ID=j-3HD42KV231ZH5

DRIVER_MEMORY=2g
NUM_EXECUTORS=40
EXECUTOR_MEMORY=2304m
EXECUTOR_CORES=1

REQUEST_URI=s3://workspace-oam-hotosm-org/test-req-partial.json
WORKSPACE_URI=s3://workspace-oam-hotosm-org/emr-test-job-partial

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps \
Name=CHUNK,ActionOnFailure=CONTINUE,Type=Spark,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,s3://oam-tiler-emr/chunk.py,s3://workspace-oam-hotosm-org/test-req-full.json]
Name=CHUNK,ActionOnFailure=CONTINUE,Type=Spark,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,s3://oam-tiler-emr/chunk.py,$REQUEST_URI]
10 changes: 7 additions & 3 deletions add-step2.sh
@@ -1,11 +1,15 @@
CLUSTER_ID=j-RRHMKKWUYLTT

DRIVER_MEMORY=3G
DRIVER_MEMORY=2g
NUM_EXECUTORS=40
EXECUTOR_MEMORY=3G
EXECUTOR_MEMORY=2304m
EXECUTOR_CORES=1

REQUEST_URI=s3://workspace-oam-hotosm-org/test-req-partial.json
WORKSPACE_URI=s3://workspace-oam-hotosm-org/emr-test-job-partial


aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps \
Name=MOSAIC,ActionOnFailure=CONTINUE,Type=Spark,Jar=s3://oam-tiler-emr/mosaic.jar,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,--class,org.hotosm.oam.Main,s3://oam-tiler-emr/mosaic.jar,s3://workspace-oam-hotosm-org/emr-test-job-full/step1_result.json]
Name=MOSAIC,ActionOnFailure=CONTINUE,Type=Spark,Jar=s3://oam-tiler-emr/mosaic.jar,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,--class,org.hotosm.oam.Main,s3://oam-tiler-emr/mosaic.jar,$WORKSPACE_URI/step1_result.json]
9 changes: 6 additions & 3 deletions add-steps.sh
@@ -1,12 +1,15 @@
CLUSTER_ID=j-RRHMKKWUYLTT
CLUSTER_ID=j-2R48YBD6AJTWQ

DRIVER_MEMORY=2g
NUM_EXECUTORS=40
EXECUTOR_MEMORY=2304m
EXECUTOR_CORES=1

REQUEST_URI=s3://workspace-oam-hotosm-org/test-req-partial.json
WORKSPACE_URI=s3://workspace-oam-hotosm-org/emr-test-job-partial

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps \
Name=CHUNK,ActionOnFailure=CONTINUE,Type=Spark,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,s3://oam-tiler-emr/chunk.py,s3://workspace-oam-hotosm-org/test-req3.json] \
Name=MOSAIC,ActionOnFailure=CONTINUE,Type=Spark,Jar=s3://oam-tiler-emr/mosaic.jar,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,--class,org.hotosm.oam.Main,s3://oam-tiler-emr/mosaic.jar,s3://workspace-oam-hotosm-org/emr-test-job-full/step1_result.json]
Name=CHUNK,ActionOnFailure=CONTINUE,Type=Spark,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,s3://oam-tiler-emr/chunk.py,$REQUEST_URI] \
Name=MOSAIC,ActionOnFailure=CONTINUE,Type=Spark,Jar=s3://oam-tiler-emr/mosaic.jar,Args=[--deploy-mode,cluster,--driver-memory,$DRIVER_MEMORY,--num-executors,$NUM_EXECUTORS,--executor-memory,$EXECUTOR_MEMORY,--executor-cores,$EXECUTOR_CORES,--class,org.hotosm.oam.Main,s3://oam-tiler-emr/mosaic.jar,$WORKSPACE_URI/step1_result.json]
2 changes: 1 addition & 1 deletion bootstrap.sh
Expand Up @@ -3,5 +3,5 @@
sudo yum-config-manager --enable epel
sudo yum -y install geos proj proj-nad proj-epsg
sudo ln -s /usr/lib64/libproj.so.0 /usr/lib64/libproj.so
curl http://data.stamen.com.s3.amazonaws.com/cloudatlas/gdal-1.11.2-amz1.tar.gz | sudo tar zxf - -C /usr/local
curl http://oam-server-tiler.s3.amazonaws.com/emr/gdal-1.11.2-amz1.tar.gz | sudo tar zxf - -C /usr/local
sudo GDAL_CONFIG=/usr/local/bin/gdal-config pip-2.7 install boto3 rasterio mercantile psutil
151 changes: 19 additions & 132 deletions chunk/chunk.py
Expand Up @@ -20,15 +20,19 @@

from affine import Affine

APP_NAME = "Reproject and chunk"
APP_NAME = "OAM Tiler Chunk"
TILE_DIM = 1024
OUTPUT_FILE_NAME = "step1_result.json"
SNS_TOPIC = "arn:aws:sns:us-east-1:670261699094:oam-tiler-status"
SNS_REGION = "us-east-1"
STATUS_QUEUE = "https://sqs.us-east-1.amazonaws.com/670261699094/oam-server-tiler-status"
STATUS_QUEUE_REGION = "us-east-1"

def notify(m):
client = boto3.client('sns', region_name=SNS_REGION)
res = client.publish(TopicArn=SNS_TOPIC, Message=json.dumps(m))
client = boto3.client('sqs', region_name=STATUS_QUEUE_REGION)
res = client.send_message(
QueueUrl=STATUS_QUEUE,
MessageBody=json.dumps(m)
)

if res['ResponseMetadata']['HTTPStatusCode'] != 200:
raise Exception(json.dumps(res))

Expand All @@ -39,7 +43,7 @@ def notify_success(jobId):
notify({ "jobId": jobId, "stage": "chunk", "status": "FINISHED" })

def notify_failure(jobId, error_message):
notify({ "jobId": jobId, "stage": "chunk", "status": "FAILED", "error": error_message})
notify({ "jobId": jobId, "stage": "chunk", "status": "FAILED", "error": error_message })

def get_filename(uri):
return os.path.splitext(os.path.basename(uri))[0]
Expand All @@ -53,7 +57,7 @@ def mkdir_p(dir):
else: raise

UriSet = namedtuple('UriSet', 'source_uri workspace_target workspace_source_uri image_folder order')
ImageSource = namedtuple('ImageSource', "source_uri src_bounds src_shape src_crs zoom ll_bounds tile_bounds image_folder order")
ImageSource = namedtuple('ImageSource', "origin_uri source_uri src_bounds src_shape src_crs zoom ll_bounds tile_bounds image_folder order")
ChunkTask = namedtuple('ChunkTask', "source_uri target_meta target")

def vsi_curlify(uri):
Expand Down Expand Up @@ -97,23 +101,6 @@ def write_bytes_to_target(target_uri, contents):
with open(output_path, "w") as f:
f.write(contents)

# def write_path_to_target(target_uri, src_path):
# parsed_target = urlparse(target_uri)
# if parsed_target.scheme == "s3":
# client = boto3.client("s3")

# bucket = parsed_target.netloc
# key = parsed_target.path[1:]

# extra_args = { "ACL": "public-read", "ContentType": "image/tiff" }

# client.upload_file(src_path, bucket, key, ExtraArgs = extra_args)
# else:
# output_path = target_uri
# mkdir_p(os.path.dirname(output_path))

# shutil.copy(src_path, output_path)

def create_uri_sets(images, workspace_uri):
result = []
workspace_keys = []
Expand Down Expand Up @@ -174,41 +161,14 @@ def copy_to_workspace(source_uri, dest_uri):

write_bytes_to_target(dest_uri, contents)

# def copy_to_workspace(source_uri, dest_uri):
# """
# Translates an image from a URI to a compressed, tiled GeoTIFF version in the workspace
# """
# creation_options = {
# "driver": "GTiff",
# "tiled": True,
# "compress": "lzw",
# "predictor": 2, # 3 for floats, 2 otherwise
# "sparse_ok": True
# }

# tmp_path = tempfile.mktemp()
# try:
# with rasterio.open(source_uri, "r") as src:
# meta = src.meta.copy()
# meta.update(creation_options)

# with rasterio.open(tmp_path, "w", **meta) as tmp:
# tmp.write(src.read())

# write_path_to_target(dest_uri, tmp_path)
# finally:
# if os.path.exists(tmp_path):
# print "Deleting %s" % (tmp_path)
# os.remove(tmp_path)

def get_zoom(resolution, tile_dim):
zoom = math.log((2 * math.pi * 6378137) / (resolution * tile_dim)) / math.log(2)
if zoom - int(zoom) > 0.20:
return int(zoom) + 1
else:
return int(zoom)

def create_image_source(source_uri, image_folder, order, tile_dim):
def create_image_source(origin_uri, source_uri, image_folder, order, tile_dim):
with rasterio.drivers():
with rasterio.open(source_uri) as src:
shape = src.shape
Expand Down Expand Up @@ -240,7 +200,8 @@ def create_image_source(source_uri, image_folder, order, tile_dim):
min_tile = mercantile.tile(ll_bounds[0], ll_bounds[3], zoom)
max_tile = mercantile.tile(ll_bounds[2], ll_bounds[1], zoom)

return ImageSource(source_uri=source_uri,
return ImageSource(origin_uri=origin_uri,
source_uri=source_uri,
src_bounds=src.bounds,
src_shape=src.shape,
src_crs=src.crs,
Expand Down Expand Up @@ -336,72 +297,14 @@ def process_chunk_task(task):

write_bytes_to_target(task.target, contents)

# def process_chunk_task(task):
# """
# Chunks the image into tile_dim x tile_dim tiles,
# and saves them to the target folder (s3 or local)

# Returns the extent of the output raster.
# """

# creation_options = {
# "driver": "GTiff",
# "crs": "EPSG:3857",
# "tiled": True,
# "compress": "deflate",
# "predictor": 2, # 3 for floats, 2 otherwise
# "sparse_ok": True
# }

# tmp_path = tempfile.mktemp()
# try:
# with rasterio.open(task.source_uri, "r") as src:
# meta = src.meta.copy()
# meta.update(creation_options)
# meta.update(task.target_meta)

# cols = meta["width"]
# rows = meta["height"]

# # tmp_path = "/vsimem/" + get_filename(task.target)

# with rasterio.open(tmp_path, "w", **meta) as tmp:
# # Reproject the src dataset into image tile.
# warped = []
# for bidx in src.indexes:
# source = rasterio.band(src, bidx)
# warped.append(numpy.zeros((cols, rows), dtype=meta['dtype']))

# warp.reproject(
# source=source,
# src_nodata=0,
# destination=warped[bidx - 1],
# dst_transform=meta["transform"],
# dst_crs=meta["crs"],
# resampling=RESAMPLING.bilinear
# )

# # check for chunks containing only zero values
# if not any(map(lambda b: b.any(), warped)):
# return

# # write out our warped data to the vsimem raster
# for bidx in src.indexes:
# tmp.write_band(bidx, warped[bidx - 1])

# write_bytes_to_target(task.target, tmp_path)
# finally:
# if os.path.exists(tmp_path):
# print "Deleting %s" % (tmp_path)
# os.remove(tmp_path)

def construct_image_info(image_source):
extent = { "xmin": image_source.ll_bounds[0], "ymin": image_source.ll_bounds[1],
"xmax": image_source.ll_bounds[2], "ymax": image_source.ll_bounds[3] }

gridBounds = { "colMin" : image_source.tile_bounds[0], "rowMin": image_source.tile_bounds[1],
"colMax": image_source.tile_bounds[2], "rowMax": image_source.tile_bounds[3] }
return {
"sourceUri": image_source.origin_uri,
"extent" : extent,
"zoom" : image_source.zoom,
"gridBounds" : gridBounds,
Expand Down Expand Up @@ -431,7 +334,7 @@ def addInPlace(self, sources1, sources2):
request_uri = sys.argv[1]

# If there's more arguements, its to turn off notifications
publish_notification = True
publish_notifications = True
if len(sys.argv) == 3:
publish_notifications = False

Expand Down Expand Up @@ -462,7 +365,7 @@ def addInPlace(self, sources1, sources2):
image_source_accumulator = sc.accumulator([], ImageSourceAccumulatorParam())

def create_image_sources(uri_set, acc):
image_source = create_image_source(uri_set.workspace_source_uri, uri_set.image_folder, uri_set.order, tile_dim)
image_source = create_image_source(uri_set.source_uri, uri_set.workspace_source_uri, uri_set.image_folder, uri_set.order, tile_dim)
acc += [image_source]
return image_source

Expand All @@ -481,13 +384,13 @@ def uri_set_copy(uri_set):
image_sources = image_source_accumulator.value
print "Processed %d images into %d chunks" % (len(image_sources), chunks_count)

input = map(construct_image_info, sorted(image_sources, key=lambda im: im.order))
input_info = map(construct_image_info, sorted(image_sources, key=lambda im: im.order))

result = {
"jobId": jobId,
"target": target,
"tileSize": tile_dim,
"input": input
"input": input_info
}

# Save off result
Expand Down Expand Up @@ -516,19 +419,3 @@ def uri_set_copy(uri_set):
tile_dim = TILE_DIM

run_spark_job(tile_dim)

# source_uri = "/Users/rob/proj/oam/data/postgis-gt-faceoff/raw/356f564e3a0dc9d15553c17cf4583f21-24.tif"
# image_folder = "/Users/rob/proj/oam/data/workspace2/test"

# # source_uri = "/Users/rob/proj/oam/data/postgis-gt-faceoff/raw/356f564e3a0dc9d15553c17cf4583f21-6.tif"
# # image_folder = "/Users/rob/proj/oam/data/workspace/356f564e3a0dc9d15553c17cf4583f21-6"
# source_uri = "/Users/rob/proj/oam/data/postgis-gt-faceoff/raw/LC81420412015111LGN00_bands_432.tif"
# image_folder = "/Users/rob/proj/oam/data/workspace/LC81420412015111LGN00_bands_432"

# image_source = create_image_source(source_uri, image_folder, 0, tile_dim)
# chunk_tasks = generate_chunk_tasks(image_source, tile_dim)

# for task in filter(lambda x: x.target.endswith('193205/109909.tif'), chunk_tasks):
# print task
# print process_chunk_task(task)
# print construct_image_info(image_source)
44 changes: 44 additions & 0 deletions configurations.json
@@ -0,0 +1,44 @@
[
{
"Classification": "hadoop-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"GDAL_DATA": "/usr/local/share/gdal",
"LD_LIBRARY_PATH": "/usr/local/lib",
"PYSPARK_PYTHON": "python27",
"PYSPARK_DRIVER_PYTHON": "python27"
}
}
]
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"GDAL_DATA": "/usr/local/share/gdal",
"LD_LIBRARY_PATH": "/usr/local/lib",
"PYSPARK_PYTHON": "python27",
"PYSPARK_DRIVER_PYTHON": "python27"
}
}
]
},
{
"Classification": "yarn-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"GDAL_DATA": "/usr/local/share/gdal",
"LD_LIBRARY_PATH": "/usr/local/lib",
"PYSPARK_PYTHON": "python27",
"PYSPARK_DRIVER_PYTHON": "python27"
}
}
]
}
]

0 comments on commit c59bfe3

Please sign in to comment.