When working with GeoTrellis, often the first task is to load a set of rasters to perform reprojection, mosaicing and pyramiding before saving them as a GeoTrellis layer. It is possible, and not too difficult, to use core GreoTrellis features to write a program to accomplish this task. However, after writing a number of such programs we noticed two patterns emerge:
- Often an individual ETL process will require some modification that is orthogonal to the core ETL logic
- When designing an ETL process it is useful to first run it a smaller dataset, perhaps locally, as a verification
- Once written it would be useful to re-run the same ETL process with different input and output storage media
To assist these patterns spark-etl
project implements a plugin architecture for tile input sources and output sinks which allows you to write a compact ETL program without having to specify the type and the configuration of the input and output at compile time. The ETL process is broken into three stages: load
, tile
, and save
. This affords an opportunity to modify the dataset using any of the GeoTrellis operations in between the stages.
import geotrellis.raster.Tile
import geotrellis.spark._
import geotrellis.spark.etl.Etl
import geotrellis.spark.etl.config.EtlConf
import geotrellis.spark.util.SparkUtils
import geotrellis.vector.ProjectedExtent
import org.apache.spark.SparkConf
object GeoTrellisETL {
type I = ProjectedExtent // or TemporalProjectedExtent for temporal ingest
type K = SpatialKey // or SpaceTimeKey for temporal ingest
type V = Tile // or MultibandTile to ingest multiband tile
def main(args: Array[String]): Unit = {
implicit val sc = SparkUtils.createSparkContext("GeoTrellis ETL", new SparkConf(true))
try {
EtlConf(args) foreach { conf =>
/* parse command line arguments */
val etl = Etl(conf)
/* load source tiles using input module specified */
val sourceTiles = etl.load[I, V]
/* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
val (zoom, tiled) = etl.tile[I, V, K](sourceTiles)
/* save and optionally pyramid the mosaiced layer */
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
}
} finally {
sc.stop()
}
}
}
Above is just Etl.ingest
function implementation, so it is possible to rewrite same functionality:
import geotrellis.spark._
import geotrellis.raster.Tile
import geotrellis.spark.util.SparkUtils
import geotrellis.vector.ProjectedExtent
import org.apache.spark.SparkConf
object SinglebandIngest {
def main(args: Array[String]): Unit = {
implicit val sc = SparkUtils.createSparkContext("GeoTrellis ETL SinglebandIngest", new SparkConf(true))
try {
Etl.ingest[ProjectedExtent, SpatialKey, Tile](args)
} finally {
sc.stop()
}
}
}
Etl.ingest
function can be used with following types variations:
Etl.ingest[ProjectedExtent, SpatialKey, Tile]
Etl.ingest[ProjectedExtent, SpatialKey, MultibandTile]
Etl.ingest[TemporalProjectedExtent, SpaceTimeKey, Tile]
Etl.ingest[TemporalProjectedExtent, SpaceTimeKey, MultibandTile]
For temporal ingest TemporalProjectedExtent
and SpaceTimeKey
should be used, for spatial ingest ProjectedExtent
and SpatialKey
.
The above sample application can be placed in a new SBT project that has a dependency on "org.locationtech.geotrellis" %% "geotrellis-spark-etl" % s"$VERSION"
in addition to dependency on spark-core
. and built into an assembly with sbt-assembly
plugin. You should be careful to include a assemblyMergeStrategy
for sbt assembly plugin as it is provided in spark-etl build file.
At this point you would create a seperate App
object for each one of your ETL configs.
For convinence and as an example the spark-etl
project provides two App
objects that perform vanilla ETL:
geotrellis.spark.etl.SinglebandIngest
geotrellis.spark.etl.MultibandIngest
You may use them by building an assembly jar of spark-etl
project as follows:
cd geotrellis
./sbt
sbt> project spark-etl
sbt> assembly
The assembly jar will be placed in geotrellis/spark-etl/target/scala-2.11
directory.
For maximum flexibility it is desirable to run spark jobs with spark-submit
. In order to achieve this spark-core
dependency must be listed as provided
and sbt-assembly
plugin used to create the fat jar as described above. Once the assembly jar is read outputs and inputs can be setup through command line arguments like so:
#!/bin/sh
export JAR="geotrellis-etl-assembly-1.0.0-SNAPSHOT.jar"
spark-submit \
--class geotrellis.spark.etl.SinglebandIngest \
--master local[*] \
--driver-memory 2G \
$JAR \
--backend-profiles "file://backend-profiles.json" \
--input "file://input.json" \
--output "file://output.json"
Note that the arguments before the $JAR
configure SparkContext
and arguments after configure GeoTrellis ETL inputs and outputs.
Option | Description |
---|---|
backend-profiles | Path to a json file (local fs / hdfs) with credentials for ingest datasets (required field) |
input | Path to a json file (local fs / hdfs) with datasets to ingest, with optional credentials |
output | Path to a json file (local fs / hdfs) with output backend params to ingest, with optional credentials |
{
"backend-profiles": [{
"name": "accumulo-name",
"type": "accumulo",
"zookeepers": "zookeepers",
"instance": "instance",
"user": "user",
"password": "password"
},
{
"name": "cassandra-name",
"type": "cassandra",
"allowRemoteDCsForLocalConsistencyLevel": false,
"localDc": "datacenter1",
"usedHostsPerRemoteDc": 0,
"hosts": "hosts",
"replicationStrategy": "SimpleStrategy",
"replicationFactor": 1,
"user": "user",
"password": "password"
}]
}
Sets of named profiles for each backend.
{
"backend":{
"type":"accumulo",
"path":"output",
"profile":"accumulo-name"
},
"breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
"reprojectMethod":"buffered",
"cellSize":{
"width":256.0,
"height":256.0
},
"encoding":"geotiff",
"tileSize":256,
"layoutExtent":{
"xmin":1.0,
"ymin":2.0,
"xmax":3.0,
"ymax":4.0
},
"resolutionThreshold":0.1,
"pyramid":true,
"resampleMethod":"nearest-neighbor",
"keyIndexMethod":{
"type":"zorder"
},
"layoutScheme":"zoomed",
"cellType":"int8",
"crs":"EPSG:3857"
}
Key | Value |
---|---|
backend | Backend description is presented below |
breaks | Breaks string for render output (optional field) |
partitions | Partitions number during pyramid build |
reprojectMethod | buffered , per-tile |
cellSize | Cell size |
encoding | png , geotiff for render output |
tileSize | Tile size (optional field)If not set, the default size of output tiles is 256x256 |
layoutExtent | Layout extent (optional field) |
resolutionThreshold | Resolution for user defined Layout Scheme (optional field) |
pyramid | true , false -ingest with or without building a pyramid |
resampleMethod | nearest-neighbo r , bilinear , cubic-convoluti on , cubic-spline , lanczos |
keyIndexMethod | zorder , row-major , hilbert |
layoutScheme | tms , floating (optional field) |
cellType | int8 , int16 , etc... (optional field) |
crs | Destination crs name (example: EPSG:3857) (optional field) |
Key | Value |
---|---|
type | Input backend type (file / hadoop / s3 / accumulo / cassandra) |
path | Input path (local path / hdfs), or s3:// url |
profile | Profile name to use for input |
Layout Scheme | Options |
---|---|
zoomed | Zoomed layout scheme |
floating | Floating layout scheme in a native projection |
Key | Options |
---|---|
type | zorder , row-major , hilbert |
temporalResolution | Temporal resolution for temporal indexing (optional field) |
timeTag | Time tag name for input geotiff tiles (optional field) |
timeFormat | Time format to parse time stored in time tag geotiff tag (optional field) |
[{
"format": "geotiff",
"name": "test",
"cache": "NONE",
"noData": 0.0,
"clip": {
"xmin":1.0,
"ymin":2.0,
"xmax":3.0,
"ymax":4.0
},
"backend": {
"type": "hadoop",
"path": "input"
}
}]
Key | Value |
---|---|
format | Format of the tile files to be read (ex: geotiff) |
name | Input dataset name |
cache | Spark RDD cache strategy |
noData | NoData value |
clip | Extent in target CRS to clip the input source |
crs | Destination crs name (example: EPSG:3857) (optional field) |
maxTleSize | Inputs will be broken up into smaller tiles of the given size (optional field)(example: 256 returns 256x256 tiles) |
numPartitions | How many partitions Spark should make when repartioning (optional field) |
Format | Options |
---|---|
geotiff | Spatial ingest |
temporal-geotiff | Temporal ingest |
Input | Options |
---|---|
hadoop | path (local path / hdfs) |
s3 | s3:// url |
Output | Options |
---|---|
hadoop | Path |
accumulo | Table name |
cassandra | Table name with keysapce (keyspace.tablename) |
s3 | s3:// url |
render | Path |
Accumulo output module has two write strategies:
hdfs
strategy uses Accumulo bulk importsocket
strategy uses AccumuloBatchWriter
When using hdfs
strategy ingestPath
argument will be used as the temporary directory where records will be written for use by Accumulo bulk import. This directory should ideally be an HDFS path.
GeoTrellis is able to tile layers in either ZoomedLayoutScheme
, matching TMS pyramid, or FloatingLayoutScheme
, matching the native resolution of input raster. These alternatives may be selecting by using the layoutScheme
option.
Note that ZoomedLayoutScheme
needs to know the world extent, which it gets from the CRS, in order to build the TMS pyramid layout. This will likely cause resampling of input rasters to match the resolution of the TMS levels.
On other hand FloatingLayoutScheme
will discover the native resolution and extent and partition it by given tile size without resampling.
You may bypass the layout scheme logic by providing layoutExtent
, cellSize
, and cellType
instead of the layoutScheme
option. Together with tileSize
option this is enough to fully define the layout and start the tiling process.
spark-etl
project supports two methods of reprojection: buffered
and per-tile
. They provide a trade-off between accuracy and flexibility.
Buffered reprojection method is able to sample pixels past the tile boundaries by performing a neighborhood join. This method is the default and produces the best results. However it requires that all of the source tiles share the same CRS.
Per tile reproject method can not consider pixels past the individual tile boundaries, even if they exist elsewhere in the dataset. Any pixels past the tile boundaries will be as NODATA
when interpolating. This restriction allows for source tiles to have a different projections per tile. This is an effective way to unify the projections for instance when projection from multiple UTM projections to WebMercator.
render
output module is different from other modules in that it does not save a GeoTrellis layer but rather provides a way to render a layer, after tiling and projection, to a set of images. This is useful to either verify the ETL process or render a TMS pyramid.
The path
module argument is actually a path template, that allows the following substitution:
{x}
tile x coordinate{y}
tile y coordinate{z}
layer zoom level{name}
layer name
A sample render output configuration template could be:
{
"path": "s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png",
"ingestType":{
"format":"geotiff",
"output":"render"
}
In order to provide your own input or output modules you must extend InputPlugin and OutputPlugin and register them in the Etl
constructor via a TypedModule
.
Standard ETL assembly provides two classes to ingest objects: class to ingest singleband tiles and class to ingest multiband tiles. The class name to ingest singleband tiles is geotrellis.spark.etl.SinglebandIngest
and to ingest multiband tiles is geotrellis.spark.etl.MultibandIngest
.
Every example can be launched using:
#!/bin/sh
export JAR="geotrellis-etl-assembly-0.10-SNAPSHOT.jar"
spark-submit \
--class geotrellis.spark.etl.{SinglebandIngest | MultibandIngest} \
--master local[*] \
--driver-memory 2G \
$JAR \
--input "file://input.json" \
--output "file://output.json" \
--backend-profiles "file://backend-profiles.json"
backend-profiles.json
:
{
"backend-profiles":[
{
"name":"accumulo-name",
"type":"accumulo",
"zookeepers":"zookeepers",
"instance":"instance",
"user":"user",
"password":"password"
},
{
"name":"cassandra-name",
"type":"cassandra",
"allowRemoteDCsForLocalConsistencyLevel":false,
"localDc":"datacenter1",
"usedHostsPerRemoteDc":0,
"hosts":"hosts",
"replicationStrategy":"SimpleStrategy",
"replicationFactor":1,
"user":"user",
"password":"password"
}
]
}
output.json
:
{
"backend":{
"type":"accumulo",
"path":"output",
"profile":"accumulo-name"
},
"breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
"reprojectMethod":"buffered",
"cellSize":{
"width":256.0,
"height":256.0
},
"encoding":"geotiff",
"tileSize":256,
"layoutExtent":{
"xmin":1.0,
"ymin":2.0,
"xmax":3.0,
"ymax":4.0
},
"resolutionThreshold":0.1,
"pyramid":true,
"resampleMethod":"nearest-neighbor",
"keyIndexMethod":{
"type":"zorder"
},
"layoutScheme":"zoomed",
"cellType":"int8",
"crs":"EPSG:3857"
}
input.json
:
{
"format": "geotiff",
"name": "test",
"cache": "NONE",
"noData": 0.0,
"backend": {
"type": "hadoop",
"path": "input"
}
}
Backend JSON examples (local fs)
"backend": {
"type": "hadoop",
"path": "file:///Data/nlcd/tiles"
}
Backend JSON example (hdfs)
"backend": {
"type": "hadoop",
"path": "hdfs://nlcd/tiles"
}
Backend JSON example (s3)
"backend": {
"type": "s3",
"path": "s3://com.azavea.datahub/catalog"
}
Backend JSON example (accumulo)
"backend": {
"type": "accumulo",
"profile": "accumulo-gis",
"path": "nlcdtable"
}
Backend JSON example (set of PNGs into S3)
"backend": {
"type": "render",
"path": "s3://tms-bucket/layers/{name}/{z}-{x}-{y}.png"
}
Backend JSON example (set of PNGs into hdfs or local fs)
"backend": {
"type": "render",
"path": "hdfs://path/layers/{name}/{z}-{x}-{y}.png"
}