Skip to content

Commit

Permalink
BufferTiles will produce an RDD that retains its source's partitioner
Browse files Browse the repository at this point in the history
Signed-off-by: jbouffard <jbouffard@azavea.com>
  • Loading branch information
jbouffard authored and echeipesh committed Jan 12, 2018
1 parent bacc4a1 commit 066587f
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions spark/src/main/scala/geotrellis/spark/buffer/BufferTiles.scala
Expand Up @@ -401,11 +401,11 @@ object BufferTiles {
V <: CellGrid: Stitcher: ClassTag: (? => CropMethods[V])
](rdd: RDD[(K, V)], bufferSize: Int, layerBounds: GridBounds): RDD[(K, BufferedTile[V])] =
apply(
rdd,
{ key: K =>
val k = key.getComponent[SpatialKey]()
layerBounds.contains(k.col, k.row)
},
rdd,
{ key: K =>
val k = key.getComponent[SpatialKey]()
layerBounds.contains(k.col, k.row)
},
{ _: K => BufferSizes(bufferSize, bufferSize, bufferSize, bufferSize) }
)

Expand All @@ -424,7 +424,7 @@ object BufferTiles {
def apply[
K: SpatialComponent: ClassTag,
V <: CellGrid: Stitcher: (? => CropMethods[V])
](layer: RDD[(K, V)],
](layer: RDD[(K, V)],
getBufferSizes: K => BufferSizes): RDD[(K, BufferedTile[V])] = apply(layer, { _: K => true }, getBufferSizes)

/** Buffer the tiles of type V by a constant buffer size.
Expand All @@ -444,7 +444,7 @@ object BufferTiles {
def apply[
K: SpatialComponent: ClassTag,
V <: CellGrid: Stitcher: (? => CropMethods[V])
](layer: RDD[(K, V)],
](layer: RDD[(K, V)],
includeKey: K => Boolean,
getBufferSizes: K => BufferSizes
): RDD[(K, BufferedTile[V])] = {
Expand All @@ -456,12 +456,14 @@ object BufferTiles {
val vmidDirs = Seq(Left, Center, Right)
val botDirs = Seq(BottomLeft, Bottom, BottomRight)

val partitioner = layer.partitioner

val sliced = layer
.flatMap{ case (key, tile) => {
val SpatialKey(x, y) = key.getComponent[SpatialKey]
val cols = tile.cols
val rows = tile.rows

def genSection(neighbor: Direction): (K, (K, Direction, V)) = {
val (xOfs, yOfs) = DirectionOp.offsetOf(neighbor)
val targetKey = key.setComponent(SpatialKey(x + xOfs, y + yOfs))
Expand Down Expand Up @@ -490,18 +492,22 @@ object BufferTiles {
Seq(Left, Right, Bottom, Top, TopLeft, TopRight, BottomLeft, BottomRight).map(genSection): Seq[(K, (K, Direction, V))]
}}

val grouped = sliced.groupByKey
val grouped =
partitioner match {
case Some(p) => sliced.groupByKey(p)
case None => sliced.groupByKey
}

grouped
.flatMapValues{ iter =>
val pieces = iter.map{ case (_, dir, tile) => (dir, tile) }.toMap

if (pieces contains Center) {
val lefts =
val lefts =
Seq(leftDirs, hmidDirs, rightDirs)
.map(_.map{ x => pieces.get(x).map(_.cols).getOrElse(0) }.max)
.foldLeft(Seq(0)){ (acc, x) => acc :+ (acc.last + x) }
val tops =
val tops =
Seq(topDirs, vmidDirs, botDirs)
.map(_.map{ x => pieces.get(x).map(_.rows).getOrElse(0) }.max)
.foldLeft(Seq(0)){ (acc, x) => acc :+ (acc.last + x) }
Expand All @@ -526,10 +532,10 @@ object BufferTiles {
val stitcher = implicitly[Stitcher[V]]
val stitched = stitcher.stitch(toStitch, totalWidth, totalHeight)

Some(BufferedTile(stitched,
GridBounds(lefts(1),
tops(1),
lefts(2) - 1,
Some(BufferedTile(stitched,
GridBounds(lefts(1),
tops(1),
lefts(2) - 1,
tops(2) - 1)))
} else {
None
Expand Down

0 comments on commit 066587f

Please sign in to comment.