Skip to content

Commit

Permalink
rasterizeWithValue method extension for RDD[G<: Geometry]
Browse files Browse the repository at this point in the history
  • Loading branch information
echeipesh committed Aug 31, 2017
1 parent 812b5c5 commit fda3f99
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 38 deletions.
1 change: 1 addition & 0 deletions spark/src/main/scala/geotrellis/spark/package.scala
Expand Up @@ -55,6 +55,7 @@ package object spark
with partition.Implicits
with reproject.Implicits
with resample.Implicits
with rasterize.Implicits
with sigmoidal.Implicits
with split.Implicits
with stitch.Implicits
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2017 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.rasterize

import geotrellis.raster._
import geotrellis.raster.rasterize._
//import geotrellis.raster.rasterize.Rasterizer
import geotrellis.spark._
import geotrellis.spark.tiling._
import geotrellis.spark.rasterize._
import geotrellis.util.MethodExtensions
import geotrellis.vector.Geometry
import org.apache.spark._
import org.apache.spark.rdd._

/**
* Extension methods for invoking the rasterizer on RDD of Geometry objects.
*/
trait GeometryRDDRasterizeMethods[G <: Geometry] extends MethodExtensions[RDD[G]] {
def rasterizeWithValue(
value: Double,
layout: LayoutDefinition
)(
cellType: CellType = IntConstantNoDataCellType,
includePartial: Boolean = true,
sampleType: PixelSampleType = PixelIsPoint,
partitioner: Partitioner = new HashPartitioner(self.getNumPartitions)
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
RasterizeRDD.fromGeometry(self, value, layout, cellType,
RasterizeRDD.Options(
rasterizerOptions = Rasterizer.Options(includePartial, sampleType),
partitioner = Some(partitioner)
)
)
}
}
27 changes: 27 additions & 0 deletions spark/src/main/scala/geotrellis/spark/rasterize/Implicits.scala
@@ -0,0 +1,27 @@
/*
* Copyright 2017 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.rasterize

import geotrellis.vector.Geometry
import org.apache.spark.rdd.RDD

object Implicits extends Implicits

trait Implicits {
implicit class withGeometryRDDRasterizeMethods[G <: Geometry](val self: RDD[G])
extends GeometryRDDRasterizeMethods[G]
}
60 changes: 33 additions & 27 deletions spark/src/main/scala/geotrellis/spark/rasterize/RasterizeRDD.scala
Expand Up @@ -11,48 +11,33 @@ import org.apache.spark.rdd._
import scala.collection.immutable.VectorBuilder

object RasterizeRDD {
/** Rasterize geometries using a constant value */
def fromGeometry[G <: Geometry, T: Numeric](
geoms: RDD[G],
layout: LayoutDefinition,
ct: CellType,
value: T,
options: Options
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
fromGeometry(geoms, layout, ct, value, new HashPartitioner(geoms.sparkContext.defaultParallelism), options)
}
case class Options(
rasterizerOptions: Rasterizer.Options = Rasterizer.Options.DEFAULT,
partitioner: Option[Partitioner] = None
)

/** Rasterize geometries using a constant value */
def fromGeometry[G <: Geometry, T: Numeric](
geoms: RDD[G],
layout: LayoutDefinition,
ct: CellType,
value: T,
options: Options,
numPartitions: Int
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
fromGeometry(geoms, layout, ct, value, new HashPartitioner(numPartitions), options)
object Options {
val DEFAULT = Options()
}

/** Rasterize geometries using a constant value */
def fromGeometry[G <: Geometry, T: Numeric](
def fromGeometry[G <: Geometry](
geoms: RDD[G],
value: Double,
layout: LayoutDefinition,
ct: CellType,
value: T,
partitioner: Partitioner,
options: Options
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
val intValue = implicitly[Numeric[T]].toInt(value)
val dblValue = implicitly[Numeric[T]].toDouble(value)
val options = Options(includePartial=true, sampleType=PixelIsArea)
val intValue = d2i(value)
val dblValue = value
val rasterExtent = RasterExtent(layout.extent, layout.layoutCols, layout.layoutRows)
val partitioner = options.partitioner.getOrElse(new HashPartitioner(geoms.getNumPartitions))

// key the geometry to intersecting tiles so it can be rasterized in the map-side combine
val keyed: RDD[(SpatialKey, (Geometry, RasterExtent))] =
geoms.flatMap { geom =>
var buff = Map.empty[SpatialKey, (Geometry, RasterExtent)]
geom.foreach(rasterExtent, options){ (col, row) =>
geom.foreach(rasterExtent, options.rasterizerOptions){ (col, row) =>
val key = SpatialKey(col, row)
val keyRasterExtent = RasterExtent(layout.mapTransform(key), layout.tileCols, layout.tileRows)
// have to check because MultiLine can cause repeat visits from same geometry
Expand Down Expand Up @@ -94,4 +79,25 @@ object RasterizeRDD {

ContextRDD(tiles.asInstanceOf[RDD[(SpatialKey, Tile)]], layout)
}

// /** Rasterize geometries using a constant value */
// def fromGeometry[G <: Geometry, T: Numeric](
// geoms: RDD[G],
// value: T,
// layout: LayoutDefinition,
// ct: CellType,
// options: Options
// ): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
// fromGeometry(geoms, layout, ct, value, new HashPartitioner(geoms.sparkContext.defaultParallelism), options)
// }

// /** Rasterize geometries using a constant value */
// def fromGeometry[G <: Geometry, T: Numeric](
// geoms: RDD[G],
// value: T
// layout: LayoutDefinition,
// ct: CellType,
// ): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
// fromGeometry(geoms, layout, ct, value, new HashPartitioner(geoms.sparkContext.defaultParallelism), Options.DEFAULT)
// }
}
Expand Up @@ -50,11 +50,7 @@ class RasterizeRDDSpec extends FunSpec with Matchers
val layout = TileLayout(3,3,256,256)
val ld = LayoutDefinition(septaExtent, layout)

val rasterizedRdd = RasterizeRDD.fromGeometry(
linesRdd, ld,
IntConstantNoDataCellType,
1,
Options.DEFAULT)
val rasterizedRdd = linesRdd.rasterizeWithValue(1, ld)(cellType = IntConstantNoDataCellType)
val actual = rasterizedRdd.stitch()

// rasterizing a single 768x768 tile would actuall produce numerical differencies
Expand Down Expand Up @@ -85,14 +81,9 @@ class RasterizeRDDSpec extends FunSpec with Matchers
val ld = LayoutDefinition(huc10.envelope, layout)

val polyRdd = sc.parallelize(huc10.polygons)
val rasterizedRdd = RasterizeRDD.fromGeometry(
polyRdd, ld,
IntConstantNoDataCellType,
1,
Options.DEFAULT)
val rasterizedRdd = polyRdd.rasterizeWithValue(1, ld)(cellType = IntConstantNoDataCellType)
val actual = rasterizedRdd.stitch()


val expected: Tile = {
for {
tileCol <- 0 until 3
Expand Down

0 comments on commit fda3f99

Please sign in to comment.