Skip to content

Commit

Permalink
Add geotrellis.spark.util.SerializableConfiguration and fix HBase con…
Browse files Browse the repository at this point in the history
…nection again
  • Loading branch information
echeipesh committed Oct 4, 2017
1 parent 615d881 commit 57800a6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
Expand Up @@ -79,7 +79,7 @@ object HBaseRDDWriter {
if(partition.nonEmpty) {
instance.withConnectionDo { connection =>
val mutator = connection.getBufferedMutator(table)
val _table = instance.getConnection.getTable(table)
val tableConnection = connection.getTable(table)

partition.foreach { recs =>
val id = recs._1
Expand All @@ -94,7 +94,7 @@ object HBaseRDDWriter {
new RowFilter(CompareOp.EQUAL, new BinaryComparator(HBaseKeyEncoder.encode(layerId, id)))
)
)
val scanner = _table.getScanner(scan)
val scanner = tableConnection.getScanner(scan)
val results: Vector[(K,V)] = scanner.iterator.asScala.toVector.flatMap{ result =>
val bytes = result.getValue(tilesCF, "")
AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand All @@ -120,7 +120,7 @@ object HBaseRDDWriter {
mutator.mutate(put)
}

_table.close()
tableConnection.close()
mutator.flush()
mutator.close()
}
Expand Down
Expand Up @@ -24,7 +24,6 @@ import geotrellis.spark.io.hadoop.formats.FilterMapFileInputFormat
import geotrellis.spark.io.index._
import geotrellis.spark.partition._
import geotrellis.spark.util._
import geotrellis.spark.util.KryoWrapper
import geotrellis.util.LazyLogging

import org.apache.avro.Schema
Expand All @@ -46,19 +45,6 @@ object HadoopRDDWriter extends LazyLogging {
*/
final val DefaultIndexInterval = 4

// From https://github.com/apache/spark/blob/3b049abf102908ca72674139367e3b8d9ffcc283/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
private class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = {
value = new Configuration(false)
value.readFields(in)
}
}

/**
* When record being written would exceed the block size of the current MapFile
* opens a new file to continue writing. This allows to split partition into block-sized
Expand Down
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.util

import org.apache.hadoop.conf.Configuration
import java.io.{ObjectInputStream, ObjectOutputStream}

class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = {
value = new Configuration(false)
value.readFields(in)
}
}

0 comments on commit 57800a6

Please sign in to comment.