Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[S2GRAPH-221]: Unify configurations for bulk and mutate in S2GraphSink. #173

Merged
merged 4 commits into from Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
Expand Up @@ -171,4 +171,13 @@ object GraphUtil {
def stringToOption(s: String): Option[String] = {
Option(s).filter(_.trim.nonEmpty)
}

def toLabelMapping(lableMapping: String): Map[String, String] = {
(for {
token <- lableMapping.split(",")
inner = token.split(":") if inner.length == 2
} yield {
(inner.head, inner.last)
}).toMap
}
}
45 changes: 5 additions & 40 deletions s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
Expand Up @@ -52,42 +52,7 @@ object S2Graph {
val FetchAllLimit = 10000000
val DefaultFetchLimit = 1000

private val DefaultConfigs: Map[String, AnyRef] = Map(
"hbase.zookeeper.quorum" -> "localhost",
"hbase.table.name" -> "s2graph",
"hbase.table.compression.algorithm" -> "gz",
"phase" -> "dev",
"db.default.driver" -> "org.h2.Driver",
"db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
"db.default.password" -> "graph",
"db.default.user" -> "graph",
"cache.max.size" -> java.lang.Integer.valueOf(0),
"cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
"resource.cache.max.size" -> java.lang.Integer.valueOf(1000),
"resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
"hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
"hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
"hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000),
"max.retry.number" -> java.lang.Integer.valueOf(100),
"lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
"max.back.off" -> java.lang.Integer.valueOf(100),
"back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
"delete.all.fetch.count" -> java.lang.Integer.valueOf(200),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"query.future.cache.max.size" -> java.lang.Integer.valueOf(1000),
"query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
"query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"s2graph.storage.backend" -> "hbase",
"query.hardlimit" -> java.lang.Integer.valueOf(100000),
"hbase.zookeeper.znode.parent" -> "/hbase",
"query.log.sample.rate" -> Double.box(0.05)
)
private val DefaultConfigs = S2GraphConfigs.DEFAULT_CONFIGS

var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
val numOfThread = Runtime.getRuntime.availableProcessors()
Expand Down Expand Up @@ -131,13 +96,13 @@ object S2Graph {
}

def initStorage(graph: S2GraphLike, config: Config)(ec: ExecutionContext): Storage = {
val storageBackend = config.getString("s2graph.storage.backend")
val storageBackend = config.getString(S2GraphConfigs.S2GRAPH_STORE_BACKEND)
logger.info(s"[InitStorage]: $storageBackend")

storageBackend match {
case "hbase" =>
hbaseExecutor =
if (config.getString("hbase.zookeeper.quorum") == "localhost")
if (config.getString(S2GraphConfigs.HBaseConfigs.HBASE_ZOOKEEPER_QUORUM) == "localhost")
AsynchbaseStorage.initLocalHBase(config)
else
null
Expand Down Expand Up @@ -208,8 +173,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override val config = _config.withFallback(S2Graph.DefaultConfig)

val storageBackend = Try {
config.getString("s2graph.storage.backend")
}.getOrElse("hbase")
config.getString(S2GraphConfigs.S2GRAPH_STORE_BACKEND)
}.getOrElse(S2GraphConfigs.DEFAULT_S2GRAPH_STORE_BACKEND)

Schema.apply(config)
Schema.loadCache()
Expand Down
179 changes: 179 additions & 0 deletions s2core/src/main/scala/org/apache/s2graph/core/S2GraphConfigs.scala
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.s2graph.core

object S2GraphConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
S2GRAPH_STORE_BACKEND -> DEFAULT_S2GRAPH_STORE_BACKEND,
PHASE -> DEFAULT_PHASE
)
lazy val DEFAULT_CONFIGS = DEFAULTS ++
S2GraphConfigs.HBaseConfigs.DEFAULTS ++
S2GraphConfigs.DBConfigs.DEFAULTS ++
S2GraphConfigs.CacheConfigs.DEFAULTS ++
S2GraphConfigs.ResourceCacheConfigs.DEFAULTS ++
S2GraphConfigs.MutatorConfigs.DEFAULTS ++
S2GraphConfigs.QueryConfigs.DEFAULTS ++
S2GraphConfigs.FutureCacheConfigs.DEFAULTS ++
S2GraphConfigs.LogConfigs.DEFAULTS

val S2GRAPH_STORE_BACKEND = "s2graph.storage.backend"
val DEFAULT_S2GRAPH_STORE_BACKEND = "hbase"

val PHASE = "phase"
val DEFAULT_PHASE = "dev"

object HBaseConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
HBASE_ZOOKEEPER_QUORUM -> DEFAULT_HBASE_ZOOKEEPER_QUORUM,
HBASE_ZOOKEEPER_ZNODE_PARENT -> DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT,
HBASE_TABLE_NAME -> DEFAULT_HBASE_TABLE_NAME,
HBASE_TABLE_COMPRESSION_ALGORITHM -> DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM,
HBASE_CLIENT_RETRIES_NUMBER -> DEFAULT_HBASE_CLIENT_RETRIES_NUMBER,
HBASE_RPCS_BUFFERED_FLUSH_INTERVAL -> DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL,
HBASE_RPC_TIMEOUT -> DEFAULT_HBASE_RPC_TIMEOUT
)
val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
val DEFAULT_HBASE_ZOOKEEPER_QUORUM = "localhost"

val HBASE_ZOOKEEPER_ZNODE_PARENT = "hbase.zookeeper.znode.parent"
val DEFAULT_HBASE_ZOOKEEPER_ZNODE_PARENT = "/hbase"

val HBASE_TABLE_NAME = "hbase.table.name"
val DEFAULT_HBASE_TABLE_NAME = "s2graph"

val HBASE_TABLE_COMPRESSION_ALGORITHM = "hbase.table.compression.algorithm"
val DEFAULT_HBASE_TABLE_COMPRESSION_ALGORITHM = "gz"

val HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number"
val DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = java.lang.Integer.valueOf(20)

val HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = "hbase.rpcs.buffered_flush_interval"
val DEFAULT_HBASE_RPCS_BUFFERED_FLUSH_INTERVAL = java.lang.Short.valueOf(100.toShort)

val HBASE_RPC_TIMEOUT = "hbase.rpc.timeout"
val DEFAULT_HBASE_RPC_TIMEOUT = java.lang.Integer.valueOf(600000)
}
object DBConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
DB_DEFAULT_DRIVER -> DEFAULT_DB_DEFAULT_DRIVER,
DB_DEFAULT_URL -> DEFAULT_DB_DEFAULT_URL,
DB_DEFAULT_PASSWORD -> DEFAULT_DB_DEFAULT_PASSWORD,
DB_DEFAULT_USER -> DEFAULT_DB_DEFAULT_USER
)
val DB_DEFAULT_DRIVER = "db.default.driver"
val DEFAULT_DB_DEFAULT_DRIVER = "org.h2.Driver"

val DB_DEFAULT_URL = "db.default.url"
val DEFAULT_DB_DEFAULT_URL = "jdbc:h2:file:./var/metastore;MODE=MYSQL"

val DB_DEFAULT_PASSWORD = "db.default.password"
val DEFAULT_DB_DEFAULT_PASSWORD = "graph"

val DB_DEFAULT_USER = "db.default.user"
val DEFAULT_DB_DEFAULT_USER = "graph"
}
object CacheConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
CACHE_MAX_SIZE -> DEFAULT_CACHE_MAX_SIZE,
CACHE_TTL_SECONDS -> DEFAULT_CACHE_TTL_SECONDS
)
val CACHE_MAX_SIZE = "cache.max.size"
val DEFAULT_CACHE_MAX_SIZE = java.lang.Integer.valueOf(0)

val CACHE_TTL_SECONDS = "cache.ttl.seconds"
val DEFAULT_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1)
}
object ResourceCacheConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
RESOURCE_CACHE_MAX_SIZE -> DEFAULT_RESOURCE_CACHE_MAX_SIZE,
RESOURCE_CACHE_TTL_SECONDS -> DEFAULT_RESOURCE_CACHE_TTL_SECONDS
)
val RESOURCE_CACHE_MAX_SIZE = "resource.cache.max.size"
val DEFAULT_RESOURCE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(1000)

val RESOURCE_CACHE_TTL_SECONDS = "resource.cache.ttl.seconds"
val DEFAULT_RESOURCE_CACHE_TTL_SECONDS = java.lang.Integer.valueOf(-1)
}
object MutatorConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
MAX_RETRY_NUMBER -> DEFAULT_MAX_RETRY_NUMBER,
LOCK_EXPIRE_TIME -> DEFAULT_LOCK_EXPIRE_TIME,
MAX_BACK_OFF -> DEFAULT_MAX_BACK_OFF,
BACK_OFF_TIMEOUT -> DEFAULT_BACK_OFF_TIMEOUT,
HBASE_FAIL_PROB -> DEFAULT_HBASE_FAIL_PROB,
DELETE_ALL_FETCH_SIZE -> DEFAULT_DELETE_ALL_FETCH_SIZE,
DELETE_ALL_FETCH_COUNT -> DEFAULT_DELETE_ALL_FETCH_COUNT
)
val MAX_RETRY_NUMBER = "max.retry.number"
val DEFAULT_MAX_RETRY_NUMBER = java.lang.Integer.valueOf(100)

val LOCK_EXPIRE_TIME = "lock.expire.time"
val DEFAULT_LOCK_EXPIRE_TIME = java.lang.Integer.valueOf(1000 * 60 * 10)

val MAX_BACK_OFF = "max.back.off"
val DEFAULT_MAX_BACK_OFF = java.lang.Integer.valueOf(100)

val BACK_OFF_TIMEOUT = "back.off.timeout"
val DEFAULT_BACK_OFF_TIMEOUT = java.lang.Integer.valueOf(1000)

val HBASE_FAIL_PROB = "hbase.fail.prob"
val DEFAULT_HBASE_FAIL_PROB = java.lang.Double.valueOf(-0.1)

val DELETE_ALL_FETCH_SIZE = "delete.all.fetch.size"
val DEFAULT_DELETE_ALL_FETCH_SIZE = java.lang.Integer.valueOf(1000)

val DELETE_ALL_FETCH_COUNT = "delete.all.fetch.count"
val DEFAULT_DELETE_ALL_FETCH_COUNT = java.lang.Integer.valueOf(200)
}
object QueryConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
QUERY_HARDLIMIT -> DEFAULT_QUERY_HARDLIMIT
)
val QUERY_HARDLIMIT = "query.hardlimit"
val DEFAULT_QUERY_HARDLIMIT = java.lang.Integer.valueOf(100000)
}
object FutureCacheConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
FUTURE_CACHE_MAX_SIZE -> DEFAULT_FUTURE_CACHE_MAX_SIZE,
FUTURE_CACHE_EXPIRE_AFTER_WRITE -> DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE,
FUTURE_CACHE_EXPIRE_AFTER_ACCESS -> DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS,
FUTURE_CACHE_METRIC_INTERVAL -> DEFAULT_FUTURE_CACHE_METRIC_INTERVAL
)
val FUTURE_CACHE_MAX_SIZE = "future.cache.max.size"
val DEFAULT_FUTURE_CACHE_MAX_SIZE = java.lang.Integer.valueOf(100000)

val FUTURE_CACHE_EXPIRE_AFTER_WRITE = "future.cache.expire.after.write"
val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_WRITE = java.lang.Integer.valueOf(10000)

val FUTURE_CACHE_EXPIRE_AFTER_ACCESS = "future.cache.expire.after.access"
val DEFAULT_FUTURE_CACHE_EXPIRE_AFTER_ACCESS = java.lang.Integer.valueOf(5000)

val FUTURE_CACHE_METRIC_INTERVAL = "future.cache.metric.interval"
val DEFAULT_FUTURE_CACHE_METRIC_INTERVAL = java.lang.Integer.valueOf(60000)
}
object LogConfigs {
lazy val DEFAULTS: Map[String, AnyRef] = Map(
QUERY_LOG_SAMPLE_RATE -> DEFAULT_QUERY_LOG_SAMPLE_RATE
)
val QUERY_LOG_SAMPLE_RATE = "query.log.sample.rate"
val DEFAULT_QUERY_LOG_SAMPLE_RATE = Double.box(0.05)
}
}
Expand Up @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.util.ToolRunner
import org.apache.s2graph.core.S2GraphConfigs
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
Expand All @@ -52,15 +53,19 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
regionLocator.getStartKeys
}

def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = {
def toHBaseConfig(zkQuorum: String, tableName: String): Configuration = {
val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName)
hbaseConf.set(S2GraphConfigs.HBaseConfigs.HBASE_ZOOKEEPER_QUORUM, zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

hbaseConf
}

def toHBaseConfig(options: GraphFileOptions): Configuration = {
toHBaseConfig(options.zkQuorum, options.tableName)
}

def getStartKeys(numRegions: Int): Array[Array[Byte]] = {
val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
Expand Down Expand Up @@ -88,14 +93,28 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
options: GraphFileOptions): Unit = {
val hbaseConfig = toHBaseConfig(options)

generateHFile(sc, s2Config, kvs, hbaseConfig, options.tableName,
options.numRegions, options.output, options.incrementalLoad, options.compressionAlgorithm)
}

def generateHFile(sc: SparkContext,
s2Config: Config,
kvs: RDD[KeyValue],
hbaseConfig: Configuration,
tableName: String,
numRegions: Int,
outputPath: String,
incrementalLoad: Boolean = false,
compressionAlgorithm: String = "lz4"): Unit = {
val table = TableName.valueOf(tableName)
val startKeys =
if (options.incrementalLoad) {
if (incrementalLoad) {
// need hbase connection to existing table to figure out the ranges of regions.
getTableStartKeys(hbaseConfig, TableName.valueOf(options.tableName))
getTableStartKeys(hbaseConfig, table)
} else {
// otherwise we do not need to initialize Connection to hbase cluster.
// only numRegions determine region's pre-split.
getStartKeys(numRegions = options.numRegions)
getStartKeys(numRegions = numRegions)
}

val hbaseSc = new HBaseContext(sc, hbaseConfig)
Expand All @@ -106,21 +125,21 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
Seq((k -> v)).toIterator
}

val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase
val compressionAlgorithmClass = Algorithm.valueOf(compressionAlgorithm).getName.toUpperCase
val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass,
BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase)

val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions)


hbaseSc.bulkLoad(kvs, TableName.valueOf(options.tableName), startKeys, flatMap, options.output, familyOptionsMap.asJava)
hbaseSc.bulkLoad(kvs, table, startKeys, flatMap, outputPath, familyOptionsMap.asJava)
}

override def generate(sc: SparkContext,
config: Config,
rdd: RDD[String],
options: GraphFileOptions): Unit = {
val transformer = new SparkBulkLoaderTransformer(config, options)
val transformer = new SparkBulkLoaderTransformer(config, options.labelMapping, options.buildDegree)

implicit val reader = new TsvBulkFormatReader
implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
Expand All @@ -130,11 +149,14 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
HFileGenerator.generateHFile(sc, config, kvs, options)
}

def loadIncrementalHFiles(options: GraphFileOptions): Int = {
def loadIncrementalHFiles(inputPath: String, tableName: String): Int = {
/* LoadIncrementHFiles */
val hfileArgs = Array(options.output, options.tableName)
val hbaseConfig = HBaseConfiguration.create()
ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs)
ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), Array(inputPath, tableName))
}

def loadIncrementalHFiles(options: GraphFileOptions): Int = {
loadIncrementalHFiles(options.output, options.tableName)
}

def tableSnapshotDump(ss: SparkSession,
Expand All @@ -143,7 +165,6 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] {
restorePath: String,
tableNames: Seq[String],
columnFamily: String = "e",
elementType: String = "IndexEdge",
batchSize: Int = 1000,
labelMapping: Map[String, String] = Map.empty,
buildDegree: Boolean = false): RDD[Seq[Cell]] = {
Expand Down
Expand Up @@ -103,7 +103,7 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] {
s2Config: Config,
input: RDD[String],
options: GraphFileOptions): RDD[KeyValue] = {
val transformer = new SparkBulkLoaderTransformer(s2Config, options)
val transformer = new SparkBulkLoaderTransformer(s2Config, options.labelMapping, options.buildDegree)

implicit val reader = new TsvBulkFormatReader
implicit val writer = new KeyValueWriter(options.autoEdgeCreate, options.skipError)
Expand Down