Skip to content
Permalink
Browse files

Rename classes

  • Loading branch information...
mccheah committed Jun 8, 2018
1 parent 03b1064 commit c1b8431524474ae710e2733836d86e1ee94df02f
@@ -26,7 +26,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.ThreadUtils

private[spark] class ExecutorPodsPollingEventSource(
private[spark] class ExecutorPodsPollingSnapshotSource(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -26,7 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] class ExecutorPodsWatchEventSource(
private[spark] class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {

@@ -80,13 +80,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())

val podsWatchEventSource = new ExecutorPodsWatchEventSource(
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
val podsPollingEventSource = new ExecutorPodsPollingEventSource(
val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)

new KubernetesClusterSchedulerBackend(
@@ -35,8 +35,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchEventSource,
pollEvents: ExecutorPodsPollingEventSource)
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsPollingEventSourceSuite extends SparkFunSuite with BeforeAndAfter {
class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

private val sparkConf = new SparkConf

@@ -53,12 +53,12 @@ class ExecutorPodsPollingEventSourceSuite extends SparkFunSuite with BeforeAndAf
private var eventQueue: ExecutorPodsSnapshotsStore = _

private var pollingExecutor: DeterministicScheduler = _
private var pollingSourceUnderTest: ExecutorPodsPollingEventSource = _
private var pollingSourceUnderTest: ExecutorPodsPollingSnapshotSource = _

before {
MockitoAnnotations.initMocks(this)
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingEventSource(
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsWatchEventSourceSuite extends SparkFunSuite with BeforeAndAfter {
class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

@Mock
private var eventQueue: ExecutorPodsSnapshotsStore = _
@@ -50,7 +50,7 @@ class ExecutorPodsWatchEventSourceSuite extends SparkFunSuite with BeforeAndAfte

private var watch: ArgumentCaptor[Watcher[Pod]] = _

private var watchEventSourceUnderTest: ExecutorPodsWatchEventSource = _
private var watchSourceUnderTest: ExecutorPodsWatchSnapshotSource = _

before {
MockitoAnnotations.initMocks(this)
@@ -61,12 +61,12 @@ class ExecutorPodsWatchEventSourceSuite extends SparkFunSuite with BeforeAndAfte
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchEventSourceUnderTest = new ExecutorPodsWatchEventSource(
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchEventSourceUnderTest.start(TEST_SPARK_APP_ID)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}

test("Watch events should be pushed to the queue.") {
test("Watch events should be pushed to the snapshots store as snapshot updates.") {
watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
verify(eventQueue).updatePod(runningExecutor(1))
@@ -69,10 +69,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _

@Mock
private var watchEvents: ExecutorPodsWatchEventSource = _
private var watchEvents: ExecutorPodsWatchSnapshotSource = _

@Mock
private var pollEvents: ExecutorPodsPollingEventSource = _
private var pollEvents: ExecutorPodsPollingSnapshotSource = _

private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _

0 comments on commit c1b8431

Please sign in to comment.
You can’t perform that action at this time.