Skip to content

Commit

Permalink
Remove Full Outer Join From Accumulo Backend
Browse files Browse the repository at this point in the history
  • Loading branch information
James McClain authored and echeipesh committed Oct 4, 2017
1 parent 71cfd5c commit 1daf5a4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 28 deletions.
Expand Up @@ -113,27 +113,38 @@ class AccumuloLayerWriter(
val updatedMetadata: M =
metadata.merge(rdd.metadata)

val updatedRdd: RDD[(K, V)] =
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema

// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
AccumuloRDDWriter.write(updatedRdd, instance, encodeKey, options.writeStrategy, table)
options.writeStrategy match {
case _: HdfsWriteStrategy =>
val updatedRdd: RDD[(K, V)] =
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
AccumuloRDDWriter.write(updatedRdd, instance, encodeKey, options.writeStrategy, table)
case _ =>
// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
AccumuloRDDWriter.update(
rdd, instance, encodeKey, options.writeStrategy, table,
Some(writerSchema), mergeFunc
)
}
}

// Layer Writing
Expand Down
Expand Up @@ -16,21 +16,23 @@

package geotrellis.spark.io.accumulo

import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.util.KryoWrapper
import geotrellis.spark.{Boundable, KeyBounds}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.util.KryoWrapper

import org.apache.accumulo.core.client.mapreduce.{AccumuloInputFormat, InputFormatBase}
import org.apache.accumulo.core.data.{Range => AccumuloRange, Value, Key}
import org.apache.accumulo.core.util.{Pair => AccumuloPair}
import org.apache.avro.Schema
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import scala.reflect.ClassTag
import scala.collection.JavaConverters._
import scala.reflect.ClassTag


object AccumuloRDDReader {
def read[K: Boundable: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag](
Expand Down
Expand Up @@ -18,15 +18,16 @@ package geotrellis.spark.io.accumulo

import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import org.apache.avro.Schema
import geotrellis.spark.util.KryoWrapper

import org.apache.accumulo.core.data.{Key, Range, Value}
import org.apache.accumulo.core.security.Authorizations
import org.apache.avro.Schema
import org.apache.hadoop.io.Text

import org.apache.spark.rdd.RDD

import org.apache.accumulo.core.data.{Key, Value}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

object AccumuloRDDWriter {

Expand All @@ -36,6 +37,16 @@ object AccumuloRDDWriter {
encodeKey: K => Key,
writeStrategy: AccumuloWriteStrategy,
table: String
): Unit = update(raster, instance, encodeKey, writeStrategy, table, None, None)

private[accumulo] def update[K: AvroRecordCodec, V: AvroRecordCodec](
raster: RDD[(K, V)],
instance: AccumuloInstance,
encodeKey: K => Key,
writeStrategy: AccumuloWriteStrategy,
table: String,
writerSchema: Option[Schema],
mergeFunc: Option[(V,V) => V]
): Unit = {
implicit val sc = raster.sparkContext

Expand All @@ -44,14 +55,41 @@ object AccumuloRDDWriter {

instance.ensureTableExists(table)

val kwWriterSchema = KryoWrapper(writerSchema)

val kvPairs: RDD[(Key, Value)] =
raster
// Call groupBy with numPartitions; if called without that argument or a partitioner,
// groupBy will reuse the partitioner on the parent RDD if it is set, which could be typed
// on a key type that may no longer by valid for the key type of the resulting RDD.
.groupBy({ row => encodeKey(row._1) }, numPartitions = raster.partitions.length)
.map { case (key, pairs) =>
(key, new Value(AvroEncoder.toBinary(pairs.toVector)(codec)))
.map { case (key, _kvs1) =>
val kvs1: Vector[(K,V)] = _kvs1.toVector
val kvs2: Vector[(K,V)] =
if (mergeFunc != None) {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(new Range(key.getRow))
scanner.fetchColumnFamily(key.getColumnFamily)
val result: Vector[(K,V)] = scanner.iterator().asScala.toVector.flatMap({ entry =>
val value = entry.getValue
AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(codec.schema), value.get)(codec)
})
scanner.close
result
} else Vector.empty
val kvs: Vector[(K, V)] = mergeFunc match {
case Some(fn) =>
(kvs2 ++ kvs1)
.groupBy({ case (k,v) => k })
.map({ case (k, kvs) =>
val vs = kvs.map({ case (k,v) => v }).toSeq
val v: V = vs.tail.foldLeft(vs.head)(fn)
(k, v) })
.toVector
case None => kvs1
}

(key, new Value(AvroEncoder.toBinary(kvs)(codec)))
}

writeStrategy.write(kvPairs, instance, table)
Expand Down

0 comments on commit 1daf5a4

Please sign in to comment.