From f2999c2fa5fbc18361b66a152fd482d71743bf39 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 3 Dec 2016 17:54:54 +0530 Subject: [PATCH] [SNAP-1190] Changes for updates to spark layer (#435) - register LaunchTasks with PooledKryoSerializer if present - avoid sending useless TaskState.RUNNING updates (heartbeat messages are separate) - adding the new kafka-sql project --- .../SnappyCoarseGrainedExecutorBackend.scala | 14 +++++++++++++- .../spark/serializer/PooledKryoSerializer.scala | 8 ++++++++ settings.gradle | 2 ++ spark | 2 +- store | 2 +- 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala index 0f2dc3b5cc..74cbb929f3 100644 --- a/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala +++ b/cluster/src/main/scala/org/apache/spark/executor/SnappyCoarseGrainedExecutorBackend.scala @@ -17,14 +17,15 @@ package org.apache.spark.executor import java.net.URL +import java.nio.ByteBuffer import com.pivotal.gemfirexd.internal.engine.store.GemFireStore import io.snappydata.cluster.ExecutorInitiator -import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rpc.RpcEnv import org.apache.spark.sql.SnappyContext +import org.apache.spark.{SparkEnv, TaskState} class SnappyCoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -36,6 +37,7 @@ class SnappyCoarseGrainedExecutorBackend( env: SparkEnv) extends CoarseGrainedExecutorBackend(rpcEnv, driverUrl, executorId, hostName, cores, userClassPath, env) { + override def onStop() { SnappyContext.clearStaticArtifacts() exitWithoutRestart() @@ -50,6 +52,16 @@ class SnappyCoarseGrainedExecutorBackend( userClassPath, new SnappyUncaughtExceptionHandler(this), isLocal = false) + /** + * Avoid sending any message for TaskState.RUNNING which serves no purpose. + */ + override def statusUpdate(taskId: Long, state: TaskState.TaskState, + data: ByteBuffer): Unit = { + if ((state ne TaskState.RUNNING) || data.hasRemaining) { + super.statusUpdate(taskId, state, data) + } + } + /** * Snappy addition (Replace System.exit with exitExecutor). We could have * added functions calling System.exit to SnappyCoarseGrainedExecutorBackend diff --git a/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala index 9a4455b1e8..407b1d049d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/PooledKryoSerializer.scala @@ -136,6 +136,14 @@ final class PooledKryoSerializer(conf: SparkConf) kryo.register(classOf[MultiBucketExecutorPartition], new KryoSerializableSerializer) + try { + val launchTasksClass = Utils.classForName( + "org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTasks") + kryo.register(launchTasksClass, new KryoSerializableSerializer) + } catch { + case _: ClassNotFoundException => // ignore + } + kryo } diff --git a/settings.gradle b/settings.gradle index 528ad20822..f2c99202b9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -54,6 +54,7 @@ if (new File(rootDir, 'spark/build.gradle').exists()) { include ':snappy-spark:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion include ':snappy-spark:snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion include ':snappy-spark:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion + include ':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion include ':snappy-spark:snappy-spark-examples_' + scalaBinaryVersion include ':snappy-spark:snappy-spark-repl_' + scalaBinaryVersion include ':snappy-spark:snappy-spark-launcher_' + scalaBinaryVersion @@ -89,6 +90,7 @@ if (new File(rootDir, 'spark/build.gradle').exists()) { project(':snappy-spark:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/flume-sink" as File project(':snappy-spark:snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-8" as File project(':snappy-spark:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-10" as File + project(':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-10-sql" as File project(':snappy-spark:snappy-spark-examples_' + scalaBinaryVersion).projectDir = "$rootDir/spark/examples" as File project(':snappy-spark:snappy-spark-repl_' + scalaBinaryVersion).projectDir = "$rootDir/spark/repl" as File project(':snappy-spark:snappy-spark-launcher_' + scalaBinaryVersion).projectDir = "$rootDir/spark/launcher" as File diff --git a/spark b/spark index 4c69ea99be..93b80ac6d1 160000 --- a/spark +++ b/spark @@ -1 +1 @@ -Subproject commit 4c69ea99bec1fbb431c96213c9663db882fb58b6 +Subproject commit 93b80ac6d146bb4262796954eb7f94e7157ecd4e diff --git a/store b/store index b51e70aa35..9996d5da40 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit b51e70aa357070682dbdbaca77b9067e2705877b +Subproject commit 9996d5da4020d129bca37bbdad1bc8172cc927e5