Skip to content

Commit

Permalink
[SNAP-1190] Changes for updates to spark layer (#435)
Browse files Browse the repository at this point in the history
- register LaunchTasks with PooledKryoSerializer if present
- avoid sending useless TaskState.RUNNING updates (heartbeat messages are separate)
- adding the new kafka-sql project
  • Loading branch information
Sumedh Wale committed Dec 3, 2016
1 parent da77cc7 commit f2999c2
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 3 deletions.
Expand Up @@ -17,14 +17,15 @@
package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer

import com.pivotal.gemfirexd.internal.engine.store.GemFireStore
import io.snappydata.cluster.ExecutorInitiator

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.sql.SnappyContext
import org.apache.spark.{SparkEnv, TaskState}

class SnappyCoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -36,6 +37,7 @@ class SnappyCoarseGrainedExecutorBackend(
env: SparkEnv)
extends CoarseGrainedExecutorBackend(rpcEnv, driverUrl,
executorId, hostName, cores, userClassPath, env) {

override def onStop() {
SnappyContext.clearStaticArtifacts()
exitWithoutRestart()
Expand All @@ -50,6 +52,16 @@ class SnappyCoarseGrainedExecutorBackend(
userClassPath, new SnappyUncaughtExceptionHandler(this),
isLocal = false)

/**
* Avoid sending any message for TaskState.RUNNING which serves no purpose.
*/
override def statusUpdate(taskId: Long, state: TaskState.TaskState,
data: ByteBuffer): Unit = {
if ((state ne TaskState.RUNNING) || data.hasRemaining) {
super.statusUpdate(taskId, state, data)
}
}

/**
* Snappy addition (Replace System.exit with exitExecutor). We could have
* added functions calling System.exit to SnappyCoarseGrainedExecutorBackend
Expand Down
Expand Up @@ -136,6 +136,14 @@ final class PooledKryoSerializer(conf: SparkConf)
kryo.register(classOf[MultiBucketExecutorPartition],
new KryoSerializableSerializer)

try {
val launchTasksClass = Utils.classForName(
"org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTasks")
kryo.register(launchTasksClass, new KryoSerializableSerializer)
} catch {
case _: ClassNotFoundException => // ignore
}

kryo
}

Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Expand Up @@ -54,6 +54,7 @@ if (new File(rootDir, 'spark/build.gradle').exists()) {
include ':snappy-spark:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-examples_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-repl_' + scalaBinaryVersion
include ':snappy-spark:snappy-spark-launcher_' + scalaBinaryVersion
Expand Down Expand Up @@ -89,6 +90,7 @@ if (new File(rootDir, 'spark/build.gradle').exists()) {
project(':snappy-spark:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/flume-sink" as File
project(':snappy-spark:snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-8" as File
project(':snappy-spark:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-10" as File
project(':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/spark/external/kafka-0-10-sql" as File
project(':snappy-spark:snappy-spark-examples_' + scalaBinaryVersion).projectDir = "$rootDir/spark/examples" as File
project(':snappy-spark:snappy-spark-repl_' + scalaBinaryVersion).projectDir = "$rootDir/spark/repl" as File
project(':snappy-spark:snappy-spark-launcher_' + scalaBinaryVersion).projectDir = "$rootDir/spark/launcher" as File
Expand Down
2 changes: 1 addition & 1 deletion spark
Submodule spark updated from 4c69ea to 93b80a
2 changes: 1 addition & 1 deletion store
Submodule store updated from b51e70 to 9996d5

0 comments on commit f2999c2

Please sign in to comment.