Skip to content

Commit

Permalink
Resolved #2601 by updating the various read attribute methods so that…
Browse files Browse the repository at this point in the history
… they take into account the LayerType before reading the attribute

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Updated the various COG backends so that they use the new attribute methods

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Created the COGAttributeStoreSpec

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Created the COGFileAttributeStoreSpec

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Created the COGHadoopAttributeStoreSpec

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Created the COGS3AttributeStoreSpec

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Trying to perform the readKeyIndex method on a COGLayer now throws an UnsupportedOperationException

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>
  • Loading branch information
Jacob Bouffard committed May 1, 2018
1 parent 4ea9cc1 commit 158028b
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class S3COGCollectionLayerReader(
](id: LayerId, rasterQuery: LayerQuery[K, TileLayerMetadata[K]]) = {
val header =
try {
attributeStore.read[S3LayerHeader](LayerId(id.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[S3LayerHeader](LayerId(id.name, 0))
} catch {
// to follow GeoTrellis Layer Readers logic
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class S3COGLayerReader(
](id: LayerId, tileQuery: LayerQuery[K, TileLayerMetadata[K]], numPartitions: Int) = {
val header =
try {
attributeStore.read[S3LayerHeader](LayerId(id.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[S3LayerHeader](LayerId(id.name, 0))
} catch {
// to follow GeoTrellis Layer Readers logic
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class S3COGLayerWriter(
val sc = cogLayer.layers.head._2.sparkContext
val samplesAccumulator = sc.collectionAccumulator[IndexedSimpleSource](VRT.accumulatorName(layerName))
val storageMetadata = COGLayerStorageMetadata(cogLayer.metadata, keyIndexes)
attributeStore.write(layerId0, COGAttributeStore.Fields.metadata, storageMetadata)

val header = S3LayerHeader(
keyClass = classTag[K].toString(),
valueClass = classTag[V].toString(),
bucket = bucket,
key = keyPrefix
key = keyPrefix,
layerType = COGLayerType
)
attributeStore.write(layerId0, COGAttributeStore.Fields.header, header)

attributeStore.writeCOGLayerAttributes(layerId0, header, storageMetadata)

val s3Client = getS3Client() // for saving VRT from Accumulator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class S3COGValueReader(
](layerId: LayerId): Reader[K, V] = {
val header =
try {
attributeStore.read[S3LayerHeader](LayerId(layerId.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[S3LayerHeader](LayerId(layerId.name, 0))
} catch {
case e: AttributeNotFoundError => throw new LayerNotFoundError(layerId).initCause(e)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.s3.cog

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.cog._
import geotrellis.spark.io.s3._
import geotrellis.spark.io.s3.testkit._

class COGS3AttributeStoreSpec extends COGAttributeStoreSpec {
val bucket = "attribute-store-test-mock-bucket"
val prefix = "catalog"

lazy val header = S3LayerHeader("geotrellis.spark.SpatialKey", "geotrellis.raster.Tile", bucket, prefix, COGLayerType)
lazy val attributeStore: AttributeStore = new S3AttributeStore(bucket, prefix) {
override val s3Client = new MockS3Client()
}
}
37 changes: 24 additions & 13 deletions spark/src/main/scala/geotrellis/spark/io/AttributeStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.reflect._
import java.net.URI
import java.util.ServiceLoader

trait AttributeStore extends AttributeCaching with LayerAttributeStore {
trait AttributeStore extends AttributeCaching with LayerAttributeStore {
def read[T: JsonFormat](layerId: LayerId, attributeName: String): T
def readAll[T: JsonFormat](attributeName: String): Map[LayerId, T]
def write[T: JsonFormat](layerId: LayerId, attributeName: String, value: T): Unit
Expand Down Expand Up @@ -113,21 +113,32 @@ trait BlobLayerAttributeStore extends AttributeStore {
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(Fields.header).convertTo[H]

def readMetadata[M: JsonFormat](id: LayerId): M =
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(Fields.metadata).convertTo[M]
cacheRead[JsValue](id, Fields.metadata).asJsObject.fields(Fields.metadata).convertTo[M]

def readKeyIndex[K: ClassTag](id: LayerId): KeyIndex[K] = {
layerType(id) match {
// TODO: Find a way to read a single KeyIndex from the COGMetadata
case COGLayerType => throw new UnsupportedOperationException(s"The readKeyIndex cannot be performed on COGLayer: $id")
case AvroLayerType =>
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(AvroLayerFields.keyIndex).convertTo[KeyIndex[K]]
}
}

def readKeyIndex[K: ClassTag](id: LayerId): KeyIndex[K] =
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(Fields.keyIndex).convertTo[KeyIndex[K]]

def readSchema(id: LayerId): Schema =
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(Fields.schema).convertTo[Schema]
layerType(id) match {
case COGLayerType => throw new COGLayerAttributeError("schema", id)
case AvroLayerType =>
cacheRead[JsValue](id, Fields.metadataBlob).asJsObject.fields(AvroLayerFields.schema).convertTo[Schema]
}

def readLayerAttributes[H: JsonFormat, M: JsonFormat, K: ClassTag](id: LayerId): LayerAttributes[H, M, K] = {
val blob = cacheRead[JsValue](id, Fields.metadataBlob).asJsObject
LayerAttributes(
blob.fields(Fields.header).convertTo[H],
blob.fields(Fields.metadata).convertTo[M],
blob.fields(Fields.keyIndex).convertTo[KeyIndex[K]],
blob.fields(Fields.schema).convertTo[Schema]
blob.fields(AvroLayerFields.keyIndex).convertTo[KeyIndex[K]],
blob.fields(AvroLayerFields.schema).convertTo[Schema]
)
}

Expand All @@ -136,8 +147,8 @@ trait BlobLayerAttributeStore extends AttributeStore {
JsObject(
Fields.header -> header.toJson,
Fields.metadata -> metadata.toJson,
Fields.keyIndex -> keyIndex.toJson,
Fields.schema -> schema.toJson
AvroLayerFields.keyIndex -> keyIndex.toJson,
AvroLayerFields.schema -> schema.toJson
)
)
}
Expand Down Expand Up @@ -169,10 +180,10 @@ trait DiscreteLayerAttributeStore extends AttributeStore {
cacheRead[M](id, Fields.metadata)

def readKeyIndex[K: ClassTag](id: LayerId): KeyIndex[K] =
cacheRead[KeyIndex[K]](id, Fields.keyIndex)
cacheRead[KeyIndex[K]](id, AvroLayerFields.keyIndex)

def readSchema(id: LayerId): Schema =
cacheRead[Schema](id, Fields.schema)
cacheRead[Schema](id, AvroLayerFields.schema)

def readLayerAttributes[H: JsonFormat, M: JsonFormat, K: ClassTag](id: LayerId): LayerAttributes[H, M, K] = {
LayerAttributes(
Expand All @@ -186,8 +197,8 @@ trait DiscreteLayerAttributeStore extends AttributeStore {
def writeLayerAttributes[H: JsonFormat, M: JsonFormat, K: ClassTag](id: LayerId, header: H, metadata: M, keyIndex: KeyIndex[K], schema: Schema) = {
cacheWrite(id, Fields.header, header)
cacheWrite(id, Fields.metadata, metadata)
cacheWrite(id, Fields.keyIndex, keyIndex)
cacheWrite(id, Fields.schema, schema)
cacheWrite(id, AvroLayerFields.keyIndex, keyIndex)
cacheWrite(id, AvroLayerFields.schema, schema)
}

def readCOGLayerAttributes[H: JsonFormat, M: JsonFormat](id: LayerId): COGLayerAttributes[H, M] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class COGCollectionLayerReader[ID] { self =>
)(implicit getByteReader: URI => ByteReader, idToLayerId: ID => LayerId): Seq[(K, V)] with Metadata[TileLayerMetadata[K]] = {
val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
try {
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(id.name, 0), "cog_metadata")
attributeStore.readMetadata[COGLayerStorageMetadata[K]](LayerId(id.name, 0))
} catch {
// to follow GeoTrellis Layer Readers logic
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class COGLayerReader[ID] extends Serializable {

val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
try {
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(id.name, 0), COGAttributeStore.Fields.metadata)
attributeStore.readMetadata[COGLayerStorageMetadata[K]](LayerId(id.name, 0))
} catch {
// to follow GeoTrellis Layer Readers logic
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ trait COGLayerWriter extends LazyLogging with Serializable {
case (keyBounds: KeyBounds[K], _) =>
val COGLayerStorageMetadata(metadata, keyIndexes) =
try {
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerName, 0), "cog_metadata")
attributeStore.readMetadata[COGLayerStorageMetadata[K]](LayerId(layerName, 0))
} catch {
// to follow GeoTrellis Layer Readers logic
case e: AttributeNotFoundError => throw new LayerNotFoundError(LayerId(layerName, 0)).initCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait COGValueReader[ID] {
exceptionHandler: K => PartialFunction[Throwable, Nothing] = { key: K => { case e: Throwable => throw e }: PartialFunction[Throwable, Nothing] }
): Reader[K, V] = new Reader[K, V] {
val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0), "cog_metadata")
attributeStore.readMetadata[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0))

def read(key: K): V = {
val (zoomRange, spatialKey, overviewIndex, gridBounds) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class FileCOGLayerReader(

val header =
try {
attributeStore.read[FileLayerHeader](LayerId(id.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[FileLayerHeader](LayerId(id.name, 0))
} catch {
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import geotrellis.raster._
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.io.geotiff._
import geotrellis.spark._
import geotrellis.spark.io.AttributeStore
import geotrellis.spark.io.{AttributeStore, COGLayerType}
import geotrellis.spark.io.cog._
import geotrellis.spark.io.cog.vrt.VRT
import geotrellis.spark.io.cog.vrt.VRT.IndexedSimpleSource
Expand Down Expand Up @@ -39,15 +39,16 @@ class FileCOGLayerWriter(
Filesystem.ensureDirectory(new File(catalogPathFile, layerName).getAbsolutePath)

val storageMetadata = COGLayerStorageMetadata(cogLayer.metadata, keyIndexes)
attributeStore.write(layerId0, COGAttributeStore.Fields.metadata, storageMetadata)

val header =
FileLayerHeader(
keyClass = classTag[K].toString(),
valueClass = classTag[V].toString(),
path = catalogPath
path = catalogPath,
layerType = COGLayerType
)
attributeStore.write(layerId0, COGAttributeStore.Fields.header, header)

attributeStore.writeCOGLayerAttributes(layerId0, header, storageMetadata)

for(zoomRange <- cogLayer.layers.keys.toSeq.sorted(Ordering[ZoomRange].reverse)) {
val keyIndex = keyIndexes(zoomRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class HadoopCOGLayerReader(
](id: LayerId, tileQuery: LayerQuery[K, TileLayerMetadata[K]], numPartitions: Int) = {
val header =
try {
attributeStore.read[HadoopLayerHeader](LayerId(id.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[HadoopLayerHeader](LayerId(id.name, 0))
} catch {
case e: AttributeNotFoundError => throw new LayerNotFoundError(id).initCause(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.spark._
import geotrellis.spark.io.{InvalidLayerIdError, AttributeStore}
import geotrellis.spark.io.{InvalidLayerIdError, AttributeStore, COGLayerType}
import geotrellis.spark.io.cog._
import geotrellis.spark.io.cog.vrt.VRT
import geotrellis.spark.io.cog.vrt.VRT.IndexedSimpleSource
Expand Down Expand Up @@ -47,15 +47,16 @@ class HadoopCOGLayerWriter(
}

val storageMetadata = COGLayerStorageMetadata(cogLayer.metadata, keyIndexes)
attributeStore.write(layerId0, COGAttributeStore.Fields.metadata, storageMetadata)

val header =
HadoopLayerHeader(
keyClass = classTag[K].toString(),
valueClass = classTag[V].toString(),
path = new URI(rootPath)
path = new URI(rootPath),
layerType = COGLayerType
)
attributeStore.write(layerId0, COGAttributeStore.Fields.header, header)

attributeStore.writeCOGLayerAttributes(layerId0, header, storageMetadata)

for(zoomRange <- cogLayer.layers.keys.toSeq.sorted(Ordering[ZoomRange].reverse)) {
val vrt = VRT(cogLayer.metadata.tileLayerMetadata(zoomRange.minZoom))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class HadoopCOGValueReader(

val header =
try {
attributeStore.read[HadoopLayerHeader](LayerId(layerId.name, 0), COGAttributeStore.Fields.header)
attributeStore.readHeader[HadoopLayerHeader](LayerId(layerId.name, 0))
} catch {
case e: AttributeNotFoundError => throw new LayerNotFoundError(layerId).initCause(e)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.cog

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.testkit.testfiles.cog._
import geotrellis.spark.summary._
import geotrellis.raster.io._
import geotrellis.raster.histogram._
import geotrellis.spark.testkit._

import org.scalatest._
import spray.json._
import spray.json.DefaultJsonProtocol._

abstract class COGAttributeStoreSpec
extends FunSpec
with Matchers
with TestEnvironment
with COGTestFiles {
def attributeStore: AttributeStore
def header: LayerHeader

val cogLayer = COGLayer.fromLayerRDD(spatialCea, zoomLevelCea)
val keyIndexes = cogLayer.metadata.zoomRangeInfos.map { case (z, b) => z -> ZCurveKeyIndexMethod.createIndex(b) }.toMap
val storageMetadata = COGLayerStorageMetadata(cogLayer.metadata, keyIndexes)

val layerId = LayerId("test-cog-layer", 0)

it("should write the COGLayerAttributes") {
attributeStore.writeCOGLayerAttributes(layerId, header, storageMetadata)
}

it("should read the COGLayerAttributes") {
attributeStore.readCOGLayerAttributes[LayerHeader, COGLayerStorageMetadata[SpatialKey]](layerId)
}

it("should be a COGLayer") {
attributeStore.isCOGLayer(layerId) should be (true)
}

it("should read the metadata of the catalog") {
attributeStore.readMetadata[COGLayerStorageMetadata[SpatialKey]](layerId)
}

it("should read the header of the catalog") {
attributeStore.readHeader[LayerHeader](layerId)
}

it("should read the keyIndexes of the catalog") {
attributeStore.readKeyIndexes[SpatialKey](layerId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.file.cog

import geotrellis.spark._
import geotrellis.spark.io.{LayerHeader, COGLayerType}
import geotrellis.spark.io.cog._
import geotrellis.spark.io.file._

class COGFileAttributeStoreSpec extends COGAttributeStoreSpec {
lazy val attributeStore = FileAttributeStore(outputLocalPath)
lazy val header = FileLayerHeader("geotrellis.spark.SpatialKey", "geotrellis.raster.Tile", outputLocalPath, COGLayerType)
}
Loading

0 comments on commit 158028b

Please sign in to comment.