Skip to content

Commit

Permalink
Extra configuration for bigtable client (#78)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Jun 10, 2021
1 parent 45413ca commit 3eee7c9
Showing 1 changed file with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,38 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
import com.google.cloud.bigtable.hbase.BigtableConfiguration
import feast.ingestion.stores.serialization.AvroSerializer
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory.{
BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING,
BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS,
BIGTABLE_BULK_MAX_ROW_KEY_COUNT,
BIGTABLE_DATA_CHANNEL_COUNT_KEY,
BIGTABLE_EMULATOR_HOST_KEY
}
import org.apache.hadoop.conf.Configuration

class DefaultSource extends CreatableRelationProvider {
import DefaultSource._

override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf("spark.bigtable.projectId"),
sqlContext.getConf("spark.bigtable.instanceId")
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)

if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
"google.bigtable.emulator.endpoint.host",
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
)
}

configureBigTableClient(bigtableConf, sqlContext)

val rel =
new BigTableSinkRelation(
sqlContext,
Expand All @@ -52,4 +64,32 @@ class DefaultSource extends CreatableRelationProvider {
rel.insert(data, overwrite = false)
rel
}

private def configureBigTableClient(bigtableConf: Configuration, sqlContext: SQLContext): Unit = {
val confs = sqlContext.getAllConfs

confs.get(CHANNEL_COUNT_KEY).foreach(bigtableConf.set(BIGTABLE_DATA_CHANNEL_COUNT_KEY, _))
confs.get(MAX_ROW_COUNT_KEY).foreach(bigtableConf.set(BIGTABLE_BULK_MAX_ROW_KEY_COUNT, _))

confs
.get(ENABLE_THROTTLING_KEY)
.foreach(
bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_ENABLE_THROTTLING, _)
)
confs
.get(THROTTLING_THRESHOLD_MILLIS_KEY)
.foreach(
bigtableConf.set(BIGTABLE_BUFFERED_MUTATOR_THROTTLING_THRESHOLD_MILLIS, _)
)
}
}

object DefaultSource {
private val PROJECT_KEY = "spark.bigtable.projectId"
private val INSTANCE_KEY = "spark.bigtable.instanceId"

private val CHANNEL_COUNT_KEY = "spark.bigtable.channelCount"
private val ENABLE_THROTTLING_KEY = "spark.bigtable.enableThrottling"
private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs"
private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount"
}

0 comments on commit 3eee7c9

Please sign in to comment.