Skip to content

Commit

Permalink
Created COGLayerWriterProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>
  • Loading branch information
Jacob Bouffard committed Mar 28, 2018
1 parent c1bad12 commit 5730539
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
54 changes: 39 additions & 15 deletions spark/src/main/scala/geotrellis/spark/io/cog/COGLayerWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ import geotrellis.util.LazyLogging
import org.apache.spark.rdd.RDD
import spray.json._

import java.net.URI
import java.util.ServiceLoader

import scala.reflect._

trait COGLayerWriter extends LazyLogging with Serializable {
abstract class COGLayerWriter[ID] extends LazyLogging with Serializable {
val attributeStore: AttributeStore

def writeCOGLayer[
Expand All @@ -50,32 +53,30 @@ trait COGLayerWriter extends LazyLogging with Serializable {
K: SpatialComponent: Ordering: JsonFormat: ClassTag,
V <: CellGrid: ClassTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]: ? => TileCropMethods[V]: GeoTiffReader: GeoTiffBuilder
](
layerName: String,
id: ID,
tiles: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
tileZoom: Int,
keyIndexMethod: KeyIndexMethod[K]
): Unit = write[K, V](layerName, tiles, tileZoom, keyIndexMethod, NoCompression, None)
): Unit = write[K, V](id, tiles, keyIndexMethod, NoCompression, None)

def write[
K: SpatialComponent: Ordering: JsonFormat: ClassTag,
V <: CellGrid: ClassTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]: ? => TileCropMethods[V]: GeoTiffReader: GeoTiffBuilder
](
layerName: String,
id: ID,
tiles: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
tileZoom: Int,
keyIndexMethod: KeyIndexMethod[K],
compression: Compression,
mergeFunc: Option[(GeoTiff[V], GeoTiff[V]) => GeoTiff[V]]
): Unit =
tiles.metadata.bounds match {
case keyBounds: KeyBounds[K] =>
val cogLayer = COGLayer.fromLayerRDD(tiles, tileZoom, compression = compression)
val cogLayer = COGLayer.fromLayerRDD(tiles, id.asInstanceOf[LayerId].zoom, compression = compression)
// println(cogLayer.metadata.toJson.prettyPrint)
val keyIndexes: Map[ZoomRange, KeyIndex[K]] =
cogLayer.metadata.zoomRangeInfos.
map { case (zr, bounds) => zr -> keyIndexMethod.createIndex(bounds) }.
toMap
writeCOGLayer(layerName, cogLayer, keyIndexes, mergeFunc)
writeCOGLayer(id.asInstanceOf[LayerId].name, cogLayer, keyIndexes, mergeFunc)
case EmptyBounds =>
throw new EmptyBoundsError("Cannot write layer with empty bounds.")
}
Expand All @@ -84,33 +85,31 @@ trait COGLayerWriter extends LazyLogging with Serializable {
K: SpatialComponent: Ordering: JsonFormat: ClassTag,
V <: CellGrid: ClassTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]: ? => TileCropMethods[V]: GeoTiffReader: GeoTiffBuilder
](
layerName: String,
id: ID,
tiles: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
tileZoom: Int,
keyIndex: KeyIndex[K]
): Unit = write[K, V](layerName, tiles, tileZoom, keyIndex, NoCompression, None)
): Unit = write[K, V](id, tiles, keyIndex, NoCompression, None)

def write[
K: SpatialComponent: Ordering: JsonFormat: ClassTag,
V <: CellGrid: ClassTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]: ? => TileCropMethods[V]: GeoTiffReader: GeoTiffBuilder
](
layerName: String,
id: ID,
tiles: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
tileZoom: Int,
keyIndex: KeyIndex[K],
compression: Compression,
mergeFunc: Option[(GeoTiff[V], GeoTiff[V]) => GeoTiff[V]]
): Unit =
tiles.metadata.bounds match {
case keyBounds: KeyBounds[K] =>
val cogLayer = COGLayer.fromLayerRDD(tiles, tileZoom, compression = compression)
val cogLayer = COGLayer.fromLayerRDD(tiles, id.asInstanceOf[LayerId].zoom, compression = compression)
// println(cogLayer.metadata.toJson.prettyPrint)
val keyIndexes: Map[ZoomRange, KeyIndex[K]] =
cogLayer.metadata.zoomRangeInfos.
map { case (zr, _) => zr -> keyIndex }.
toMap

writeCOGLayer(layerName, cogLayer, keyIndexes, mergeFunc)
writeCOGLayer(id.asInstanceOf[LayerId].name, cogLayer, keyIndexes, mergeFunc)
case EmptyBounds =>
throw new EmptyBoundsError("Cannot write layer with empty bounds.")
}
Expand Down Expand Up @@ -162,3 +161,28 @@ trait COGLayerWriter extends LazyLogging with Serializable {
}
}
}

object COGLayerWriter {
def apply(attributeStore: AttributeStore, layerWriterUri: URI): COGLayerWriter[LayerId] = {
import scala.collection.JavaConversions._
ServiceLoader.load(classOf[COGLayerWriterProvider]).iterator()
.find(_.canProcess(layerWriterUri))
.getOrElse(throw new RuntimeException(s"Unable to find a COGLayerWriterProvider for $layerWriterUri"))
.layerWriter(layerWriterUri, attributeStore)
}

def apply(attributeStoreUri: URI, layerWriterUri: URI): COGLayerWriter[LayerId] =
apply(attributeStore = AttributeStore(attributeStoreUri), layerWriterUri)

def apply(uri: URI): COGLayerWriter[LayerId] =
apply(attributeStoreUri = uri, layerWriterUri = uri)

def apply(attributeStore: AttributeStore, layerWriterUri: String): COGLayerWriter[LayerId] =
apply(attributeStore, new URI(layerWriterUri))

def apply(attributeStoreUri: String, layerWriterUri: String): COGLayerWriter[LayerId] =
apply(new URI(attributeStoreUri), new URI(layerWriterUri))

def apply(uri: String): COGLayerWriter[LayerId] =
apply(new URI(uri))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.cog

import geotrellis.spark._
import geotrellis.spark.io._
import org.apache.spark._
import java.net.URI

trait COGLayerWriterProvider {
def canProcess(uri: URI): Boolean

def layerWriter(uri: URI, store: AttributeStore): COGLayerWriter[LayerId]
}

0 comments on commit 5730539

Please sign in to comment.