In [3]:
import rasterio
import rasterio.features
import rasterio.warp
import geopyspark as gps
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext
from geonotebook.wrappers import TMSRasterData
from osgeo import osr

import os
import math
import boto3

%matplotlib inline

In [4]:
conf = gps.geopyspark_conf("yarn-client", "SRTM Ingest") \
          .set("spark.dynamicAllocation.enabled", "false") \
          .set("spark.executor.instances", "50") \
          .set("spark.executor.memory", "9472M") \
          .set("spark.executor.cores", "4") \
          .set("spark.ui.enabled","true") \
          .set("spark.hadoop.yarn.timeline-service.enabled", False)

In [5]:
sc = SparkContext(conf=conf)

In [6]:
s3 = boto3.client('s3')
def get_raster_s3_objects(bucket, prefix, extension="hgt"):
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
    results = []
    for page in page_iterator:
        for item in page['Contents']:
            if item['Key'].endswith(extension):
                results.append(item)
    return results


In [7]:
object_names = get_raster_s3_objects("mrgeo-source", "srtm-v3-30")

In [8]:
file_names = list(map(lambda d: d['Key'][len('srtm-v3-30/'):], object_names))
print(len(file_names))
print(file_names[0:10])

14265
['N00E006.hgt', 'N00E009.hgt', 'N00E010.hgt', 'N00E011.hgt', 'N00E012.hgt', 'N00E013.hgt', 'N00E014.hgt', 'N00E015.hgt', 'N00E016.hgt', 'N00E017.hgt']


In [9]:
def get_metadata(uri):
    import rasterio
    from osgeo import osr
    import os
    
    if "GDAL_DATA" not in os.environ:
        os.environ["GDAL_DATA"]="/usr/local/lib64/python3.4/site-packages/fiona/gdal_data"
    
    try:
        with rasterio.open(uri) as dataset:
            bounds = dataset.bounds
            height = dataset.height
            width = dataset.width
            crs = dataset.get_crs()
            srs = osr.SpatialReference()
            srs.ImportFromWkt(crs.wkt)
            proj4 = srs.ExportToProj4()
            # ws = [w for (ij, w) in dataset.block_windows()]
            tile_cols = math.floor((width - 1) / 512) * 512 # XXX
            tile_rows = math.floor((height - 1) / 512) * 512 # XXX
            ws = [((x, x + 512), (y, y + 512)) for x in range(0, tile_cols, 512) for y in range(0, tile_rows, 512)]
    except:
            ws = []
            
    def windows(uri, ws):
        for w in ws:
            ((row_start, row_stop), (col_start, col_stop)) = w

            left  = bounds.left + (bounds.right - bounds.left)*(float(col_start)/width)
            right = bounds.left + (bounds.right - bounds.left)*(float(col_stop)/ width)
            bottom = bounds.top + (bounds.bottom - bounds.top)*(float(row_stop)/height)
            top = bounds.top + (bounds.bottom - bounds.top)*(float(row_start)/height)
            extent = gps.Extent(left,bottom,right,top)
                
            new_line = {}
            new_line['uri'] = uri
            new_line['window'] = w
            new_line['projected_extent'] = gps.ProjectedExtent(extent=extent, proj4=proj4)
            yield new_line
    
    return [i for i in windows(uri, ws)]


In [10]:
def get_data(line):
    import rasterio
    
    new_line = line.copy()

    with rasterio.open(line['uri']) as dataset:
        new_line['data'] = dataset.read(1, window=line['window'])
        new_line.pop('window')
        new_line.pop('uri')
    
    return new_line

In [11]:
def filename_to_data(filename):
    import os
    
    full_filename = "/vsicurl/https://s3.amazonaws.com/mrgeo-source/srtm-v3-30/{}".format(filename)
    data = [get_data(line) for line in get_metadata(full_filename)]
    return data

In [12]:
rdd0 = sc.parallelize(file_names)
rdd1 = rdd0.flatMap(filename_to_data)
print(rdd1.count())

698985


In [13]:
rdd2 = rdd1.groupBy(lambda line: line['projected_extent']) # XXX

In [14]:
def make_tiles(line):
    projected_extent = line[0]
    array = np.array([l['data'] for l in line[1]])
    tile = gps.Tile.from_numpy_array(array, no_data_value=0)
    return (projected_extent, tile)

def interesting_tile(line):
    [pe, tile] = line
    return (np.sum(tile[0][0]) != 0)

def square_tile(line):
    [pe, tile] = line
    return tile[0][0].shape == (512,512)

In [15]:
# rdd3 = rdd2.repartition(50 * 1024).map(make_tiles).filter(square_tile)
rdd3 = rdd2.repartition(50 * 1024).map(make_tiles)

In [16]:
raster_layer = gps.RasterLayer.from_numpy_rdd(gps.LayerType.SPATIAL, rdd3)

In [17]:
tiled_raster_layer = raster_layer.tile_to_layout(layout = gps.GlobalLayout(), target_crs=3857)

In [18]:
pyramid = tiled_raster_layer.pyramid()

In [19]:
for layer in pyramid.levels.values():
    gps.write("s3://geotrellis-test/dg-srtm/", "srtm-geopyspark", layer)

Py4JJavaError: An error occurred while calling o143.writeSpatial.
: geotrellis.spark.io.package$LayerWriteError: Failed to write Layer(name = "srtm-geopyspark", zoom = 12)
	at geotrellis.spark.io.s3.S3LayerWriter._write(S3LayerWriter.scala:123)
	at geotrellis.spark.io.s3.S3LayerWriter._write(S3LayerWriter.scala:49)
	at geotrellis.spark.io.LayerWriter$class.write(LayerWriter.scala:153)
	at geotrellis.spark.io.s3.S3LayerWriter.write(S3LayerWriter.scala:49)
	at geopyspark.geotrellis.io.LayerWriterWrapper.writeSpatial(LayerWriterWrapper.scala:91)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 193 (coalesce at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-172-31-21-248.ec2.internal/172.31.21.248:7337
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:361)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:336)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.io.IOException: Failed to connect to ip-172-31-21-248.ec2.internal/172.31.21.248:7337
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
	at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:171)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: ip-172-31-21-248.ec2.internal/172.31.21.248:7337
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	... 2 more

	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1492)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1492)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1717)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
	at geotrellis.spark.io.s3.S3RDDWriter$class.update(S3RDDWriter.scala:83)
	at geotrellis.spark.io.s3.S3RDDWriter$.update(S3RDDWriter.scala:161)
	at geotrellis.spark.io.s3.S3RDDWriter$class.write(S3RDDWriter.scala:54)
	at geotrellis.spark.io.s3.S3RDDWriter$.write(S3RDDWriter.scala:161)
	at geotrellis.spark.io.s3.S3LayerWriter._write(S3LayerWriter.scala:127)
	... 15 more
