Skip to content

Commit

Permalink
Merge pull request #206 from jbouffard/fix/rs-threadsafety
Browse files Browse the repository at this point in the history
GeoTiffRasterSource Thread Safety
  • Loading branch information
pomadchin committed Jul 10, 2019
2 parents 2c26205 + 86e8a29 commit effff33
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed
- VLM: Correct incomplete reads when using `MosaicRasterSource`
- VLM: `GeoTiffRasterSource` reads are now thread safe

### Removed
- Summary: Subproject removed. The polygonal summary prototype was moved to GeoTrellis core for the 3.0 release. See: https://github.com/locationtech/geotrellis/blob/master/docs/guide/rasters.rst#polygonal-summary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,31 @@ case class GeoTiffRasterSource(
// TODO: shouldn't GridExtent give me Extent for types other than N ?
Raster(tile, gridExtent.extentFor(gb.toGridType[Long], clamp = false))
}
if (it.hasNext) Some(convertRaster(it.next)) else None

// We want to use this tiff in different `RasterSource`s, so we
// need to lock it in order to garuntee the state of tiff when
// it's being accessed by a thread.
tiff.synchronized { if (it.hasNext) Some(convertRaster(it.next)) else None }
}

def read(bounds: GridBounds[Long], bands: Seq[Int]): Option[Raster[MultibandTile]] = {
val it = readBounds(List(bounds), bands)
if (it.hasNext) Some(it.next) else None

tiff.synchronized { if (it.hasNext) Some(it.next) else None }
}

override def readExtents(extents: Traversable[Extent], bands: Seq[Int]): Iterator[Raster[MultibandTile]] = {
val bounds = extents.map(gridExtent.gridBoundsFor(_, clamp = true))

readBounds(bounds, bands)
}

override def readBounds(bounds: Traversable[GridBounds[Long]], bands: Seq[Int]): Iterator[Raster[MultibandTile]] = {
val geoTiffTile = tiff.tile.asInstanceOf[GeoTiffMultibandTile]
val intersectingBounds: Seq[GridBounds[Int]] =
bounds.flatMap(_.intersection(this.gridBounds)).
toSeq.map(b => b.toGridType[Int])
bounds
.flatMap(_.intersection(this.gridBounds)).toSeq
.map(b => b.toGridType[Int])

geoTiffTile.crop(intersectingBounds, bands.toArray).map { case (gb, tile) =>
convertRaster(Raster(tile, gridExtent.extentFor(gb.toGridType[Long], clamp = true)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,9 @@ case class GeoTiffReprojectRasterSource(
protected lazy val backTransform = Transform(crs, baseCRS)

override lazy val gridExtent: GridExtent[Long] = reprojectOptions.targetRasterExtent match {
case Some(targetRasterExtent) =>
targetRasterExtent.toGridType[Long]

case None =>
ReprojectRasterExtent(baseGridExtent, transform, reprojectOptions)
}
case Some(targetRasterExtent) => targetRasterExtent.toGridType[Long]
case None => ReprojectRasterExtent(baseGridExtent, transform, reprojectOptions)
}

lazy val resolutions: List[GridExtent[Long]] =
gridExtent :: tiff.overviews.map(ovr => ReprojectRasterExtent(ovr.rasterExtent.toGridType[Long], transform))
Expand All @@ -72,17 +69,19 @@ case class GeoTiffReprojectRasterSource(

def read(extent: Extent, bands: Seq[Int]): Option[Raster[MultibandTile]] = {
val bounds = gridExtent.gridBoundsFor(extent, clamp = false)
val it = readBounds(List(bounds), bands)
if (it.hasNext) Some(it.next) else None

read(bounds, bands)
}

def read(bounds: GridBounds[Long], bands: Seq[Int]): Option[Raster[MultibandTile]] = {
val it = readBounds(List(bounds), bands)
if (it.hasNext) Some(it.next) else None

closestTiffOverview.synchronized { if (it.hasNext) Some(it.next) else None }
}

override def readExtents(extents: Traversable[Extent], bands: Seq[Int]): Iterator[Raster[MultibandTile]] = {
val bounds = extents.map(gridExtent.gridBoundsFor(_, clamp = true))

readBounds(bounds, bands)
}

Expand All @@ -95,8 +94,12 @@ case class GeoTiffReprojectRasterSource(
val targetRasterExtent = RasterExtent(
extent = gridExtent.extentFor(targetPixelBounds, clamp = true),
cols = targetPixelBounds.width.toInt,
rows = targetPixelBounds.height.toInt)
val sourceExtent = targetRasterExtent.extent.reprojectAsPolygon(backTransform, 0.001).envelope
rows = targetPixelBounds.height.toInt
)

// A tmp workaround for https://github.com/locationtech/proj4j/pull/29
// Stacktrace details: https://github.com/geotrellis/geotrellis-contrib/pull/206#pullrequestreview-260115791
val sourceExtent = Proj4Transform.synchronized(targetRasterExtent.extent.reprojectAsPolygon(backTransform, 0.001).envelope)
val sourcePixelBounds = closestTiffOverview.rasterExtent.gridBoundsFor(sourceExtent, clamp = true)
(sourcePixelBounds, targetRasterExtent)
}}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ case class GeoTiffResampleRasterSource(

override lazy val gridExtent: GridExtent[Long] = resampleGrid(tiff.rasterExtent.toGridType[Long])
lazy val resolutions: List[GridExtent[Long]] = {
val ratio = gridExtent.cellSize.resolution / tiff.rasterExtent.cellSize.resolution
gridExtent :: tiff.overviews.map { ovr =>
val re = ovr.rasterExtent
val CellSize(cw, ch) = re.cellSize
new GridExtent[Long](re.extent, CellSize(cw * ratio, ch * ratio))
}
val ratio = gridExtent.cellSize.resolution / tiff.rasterExtent.cellSize.resolution
gridExtent :: tiff.overviews.map { ovr =>
val re = ovr.rasterExtent
val CellSize(cw, ch) = re.cellSize
new GridExtent[Long](re.extent, CellSize(cw * ratio, ch * ratio))
}
}

@transient protected lazy val closestTiffOverview: GeoTiff[MultibandTile] =
tiff.getClosestOverview(gridExtent.cellSize, strategy)
Expand All @@ -70,13 +70,14 @@ case class GeoTiffResampleRasterSource(

def read(extent: Extent, bands: Seq[Int]): Option[Raster[MultibandTile]] = {
val bounds = gridExtent.gridBoundsFor(extent, clamp = false)
val it = readBounds(List(bounds), bands)
if (it.hasNext) Some(it.next) else None

read(bounds, bands)
}

def read(bounds: GridBounds[Long], bands: Seq[Int]): Option[Raster[MultibandTile]] = {
val it = readBounds(List(bounds), bands)
if (it.hasNext) Some(it.next) else None

closestTiffOverview.synchronized { if (it.hasNext) Some(it.next) else None }
}

override def readExtents(extents: Traversable[Extent], bands: Seq[Int]): Iterator[Raster[MultibandTile]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2019 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.contrib.vlm.geotiff

import geotrellis.contrib.vlm.{RasterSource, Resource}
import geotrellis.proj4.CRS
import geotrellis.raster.resample._

import cats.instances.future._
import cats.instances.list._
import cats.syntax.traverse._
import org.scalatest.{AsyncFunSpec, Matchers}

import scala.concurrent.{ExecutionContext, Future}

class GeoTiffRasterSourceMultiThreadingSpec extends AsyncFunSpec with Matchers {
val url = Resource.path("img/aspect-tiled.tif")
val source = GeoTiffRasterSource(url)

implicit val ec = ExecutionContext.global

val iterations = (0 to 100).toList

/**
* readBounds and readExtends are not covered by these tests since these methods return an [[Iterator]].
* Due to the [[Iterator]] lazy nature, the lock in these cases should be done on the user side.
* */
def testMultithreading(rs: RasterSource): Unit = {
it("read") {
val res = iterations.map { _ => Future { rs.read() } }.sequence.map(_.flatten)
res.map { rasters => rasters.length shouldBe iterations.length }
}

it("readBounds - Option") {
val res =
iterations
.map { _ => Future { rs.read(rs.gridBounds, 0 until rs.bandCount) } }
.sequence
.map(_.flatten)

res.map { rasters => rasters.length shouldBe iterations.length }
}
}

describe("GeoTiffRasterSource should be threadsafe") {
testMultithreading(source)
}

describe("GeoTiffRasterReprojectSource should be threadsafe") {
testMultithreading(source.reproject(CRS.fromEpsgCode(4326)))
}

describe("GeoTiffRasterResampleSource should be threadsafe") {
testMultithreading(source.resample((source.cols * 0.95).toInt , (source.rows * 0.95).toInt, NearestNeighbor))
}
}

0 comments on commit effff33

Please sign in to comment.