Skip to content

Commit

Permalink
Add S3COGLayer Updater specs
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Mar 8, 2018
1 parent e356892 commit 343fe53
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
@@ -1,9 +1,6 @@
package geotrellis.spark.io.s3.cog

import com.amazonaws.services.s3.model.AmazonS3Exception
import geotrellis.raster._
import geotrellis.raster.crop._
import geotrellis.raster.merge._
import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.spark._
Expand All @@ -13,13 +10,13 @@ import geotrellis.spark.io.s3.{S3AttributeStore, S3Client, S3RDDWriter, makePath
import geotrellis.spark.io.cog._
import geotrellis.spark.io.cog.vrt.VRT
import geotrellis.spark.io.cog.vrt.VRT.IndexedSimpleSource
import scala.util.Try

import spray.json.JsonFormat
import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest}

import java.io.ByteArrayInputStream

import scala.util.Try
import scala.reflect.ClassTag

class S3COGLayerWriter(
Expand Down Expand Up @@ -56,7 +53,7 @@ class S3COGLayerWriter(
case _ => false
}

for { (zoomRange, cogs) <- cogLayer.layers.toSeq.sortBy(_._1)(Ordering[ZoomRange].reverse) } {
for((zoomRange, cogs) <- cogLayer.layers.toSeq.sortBy(_._1)(Ordering[ZoomRange].reverse)) {
val vrt = VRT(cogLayer.metadata.tileLayerMetadata(zoomRange.minZoom))

// Make RDD[(String, GeoTiff[T])]
Expand All @@ -77,9 +74,7 @@ class S3COGLayerWriter(

(s"${keyPath(key)}.${Extension}", cog)
}
.foreachPartition { partition =>
asyncWriter.write(getS3Client(), partition, mergeFunc, Some(retryCheck))
}
.foreachPartition { partition => asyncWriter.write(getS3Client(), partition, mergeFunc, Some(retryCheck)) }

// Save Accumulator
val bytes =
Expand Down
Expand Up @@ -35,7 +35,8 @@ class COGS3SpaceTimeSpec
with COGSpaceTimeKeyIndexMethods
with TestEnvironment
with COGTestFiles
with COGCoordinateSpaceTimeSpec {
with COGCoordinateSpaceTimeSpec
with COGLayerUpdateSpaceTimeTileSpec {

registerAfterAll { () =>
MockS3Client.reset()
Expand Down
13 changes: 7 additions & 6 deletions spark/src/main/scala/geotrellis/spark/io/AsyncWriter.scala
Expand Up @@ -17,7 +17,8 @@
package geotrellis.spark.io

import java.util.concurrent.Executors
import scala.util.Try

import scala.util.{Failure, Success, Try}
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, nondeterminism}

Expand Down Expand Up @@ -51,11 +52,11 @@ abstract class AsyncWriter[Client, V, E](threads: Int) extends Serializable {
mergeFunc match {
case Some(fn) =>
// TODO: match on this failure to retry reads
val oldValue = readRecord(client, key).get
(key, fn(oldValue, newValue))

case None =>
newRecord
readRecord(client, key) match {
case Success(oldValue) => (key, fn(oldValue, newValue))
case Failure(_) => newRecord
}
case None => newRecord
}
}

Expand Down

0 comments on commit 343fe53

Please sign in to comment.