Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Commit

Permalink
[S2GRAPH-205]: reuse S2Graph object on same executor
Browse files Browse the repository at this point in the history
JIRA:
    [S2GRAPH-205] https://issues.apache.org/jira/browse/S2GRAPH-205

Pull Request:
    Closes #158

Author
    Chul Kang <elric@apache.org>
  • Loading branch information
SteamShon committed Apr 23, 2018
2 parents 5c85dd4 + de3f2aa commit 9dc39ee
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES
Expand Up @@ -35,6 +35,7 @@ Release Notes - S2Graph - Version 0.2.0
* [S2GRAPH-163] - Update version.sbt after release
* [S2GRAPH-180] - Implement missing Management API
* [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset
* [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink

** Bug
* [S2GRAPH-159] - Wrong syntax at a bash script under Linux
Expand Down
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.s2graph.graphql.bind

object AstHelper {
Expand Down
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.s2graph.graphql.types

import org.apache.s2graph.core._
Expand Down
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.s2graph.graphql

import org.apache.s2graph.graphql.repository.GraphRepository
Expand Down
Expand Up @@ -31,9 +31,15 @@ import play.api.libs.json.{JsObject, Json}
import scala.concurrent.ExecutionContext
import scala.util.Try

object S2GraphHelper {
def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
new S2Graph(config)
object S2GraphHelper extends Logger {
private var s2Graph:S2Graph = null

def getS2Graph(config: Config, init:Boolean = false)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
if (s2Graph == null || init) {
logger.info(s"S2Graph initialized..")
s2Graph = new S2Graph(config)
}
s2Graph
}

def buildDegreePutRequests(s2: S2Graph,
Expand Down
Expand Up @@ -29,7 +29,7 @@ import scala.reflect.ClassTag

class LocalBulkLoaderTransformer(val config: Config,
val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] {
val s2: S2Graph = S2GraphHelper.initS2Graph(config)
val s2: S2Graph = S2GraphHelper.getS2Graph(config)

override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = {
val degrees = elements.flatMap { element =>
Expand Down
Expand Up @@ -32,15 +32,15 @@ class SparkBulkLoaderTransformer(val config: Config,

override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = {
val degrees = elements.mapPartitions { iter =>
val s2 = S2GraphHelper.initS2Graph(config)
val s2 = S2GraphHelper.getS2Graph(config)

iter.flatMap { element =>
DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L)
}
}.reduceByKey(_ + _)

degrees.mapPartitions { iter =>
val s2 = S2GraphHelper.initS2Graph(config)
val s2 = S2GraphHelper.getS2Graph(config)

iter.map { case (degreeKey, count) =>
writer.writeDegree(s2)(degreeKey, count)
Expand All @@ -50,15 +50,15 @@ class SparkBulkLoaderTransformer(val config: Config,

override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
val elements = input.mapPartitions { iter =>
val s2 = S2GraphHelper.initS2Graph(config)
val s2 = S2GraphHelper.getS2Graph(config)

iter.flatMap { line =>
reader.read(s2)(line)
}
}

val kvs = elements.mapPartitions { iter =>
val s2 = S2GraphHelper.initS2Graph(config)
val s2 = S2GraphHelper.getS2Graph(config)

iter.map(writer.write(s2)(_))
}
Expand Down
12 changes: 4 additions & 8 deletions s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
Expand Up @@ -20,20 +20,15 @@
package org.apache.s2graph.s2jobs.task

import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.util.ToolRunner
import org.apache.s2graph.core.Management
import org.apache.s2graph.s2jobs.S2GraphHelper
import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
import org.apache.s2graph.spark.sql.streaming.S2SinkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
import org.elasticsearch.spark.sql.EsSparkSQL

import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -233,9 +228,10 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con
}

private def writeBatchWithMutate(df:DataFrame):Unit = {
import scala.collection.JavaConversions._
import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._

import scala.collection.JavaConversions._

val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise())

Expand All @@ -246,7 +242,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con

df.foreachPartition { iters =>
val config = ConfigFactory.parseString(serializedConfig)
val s2Graph = S2GraphHelper.initS2Graph(config)
val s2Graph = S2GraphHelper.getS2Graph(config)

val responses = iters.grouped(groupedSize).flatMap { rows =>
val elements = rows.flatMap(row => reader.read(s2Graph)(row))
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.apache.s2graph.core.S2Graph

import scala.concurrent.ExecutionContext

@deprecated
class S2SinkContext(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global){
println(s">>>> S2SinkContext Created...")
private lazy val s2Graph = new S2Graph(config)
Expand All @@ -32,6 +33,7 @@ class S2SinkContext(config: Config)(implicit ec: ExecutionContext = ExecutionCon
}
}

@deprecated
object S2SinkContext {
private var s2SinkContext:S2SinkContext = null

Expand Down
Expand Up @@ -41,7 +41,7 @@ private [sql] class S2StreamQueryWriter(
commitProtocol: S2CommitProtocol
) extends Serializable with Logger {
private val config = ConfigFactory.parseString(serializedConf)
private val s2SinkContext = S2SinkContext(config)
private val s2Graph = S2GraphHelper.getS2Graph(config)
private val encoder: ExpressionEncoder[Row] = RowEncoder(schema).resolveAndBind()
private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction")

Expand All @@ -50,7 +50,6 @@ private [sql] class S2StreamQueryWriter(
val taskId = s"stage-${taskContext.stageId()}, partition-${taskContext.partitionId()}, attempt-${taskContext.taskAttemptId()}"
val partitionId= taskContext.partitionId()

val s2Graph = s2SinkContext.getGraph
val groupedSize = getConfigString(config, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt
val waitTime = getConfigString(config, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt

Expand Down Expand Up @@ -85,5 +84,5 @@ private [sql] class S2StreamQueryWriter(
}

private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
S2GraphHelper.sparkSqlRowToGraphElement(s2Graph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
}
Expand Up @@ -24,10 +24,9 @@ import java.io.{File, PrintWriter}
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.s2jobs.loader.GraphFileOptions
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

import scala.util.Try
Expand Down Expand Up @@ -65,7 +64,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D
// initialize spark context.
super.beforeAll()

s2 = S2GraphHelper.initS2Graph(s2Config)
s2 = S2GraphHelper.getS2Graph(s2Config)
initTestDataFile
}

Expand Down Expand Up @@ -103,7 +102,6 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D
}

def initTestVertexSchema(s2: S2Graph): ServiceColumn = {
import scala.collection.JavaConverters._
/* initialize model for test */
val management = s2.management

Expand Down
Expand Up @@ -29,7 +29,7 @@ class S2GraphHelperTest extends BaseSparkTest {

println(args)
val taskConf = TaskConf("dummy", "sink", Nil, args)
val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf)
val graphFileOptions = TaskConf.toGraphFileOptions(taskConf)
println(graphFileOptions)
}
}
Expand Up @@ -190,7 +190,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
val input = sc.parallelize(bulkVertexLs)

HFileGenerator.generate(sc, s2Config, input, options)
HFileGenerator.loadIncrementHFile(options)
HFileGenerator.loadIncrementalHFiles(options)

val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
val json = PostProcess.verticesToJson(s2Vertices)
Expand Down

0 comments on commit 9dc39ee

Please sign in to comment.