Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class JobMasterService implements JobAutoScalerService {
private final WorkerMetricsClient workerMetricsClient;
private final AutoScaleMetricsConfig autoScaleMetricsConfig;
private final Observer<MetricData> metricObserver;
private final WorkerMetricHandler workerMetricHandler;
private final JobAutoScaler jobAutoScaler;
private final Context context;
private final Action0 observableOnCompleteCallback;
Expand All @@ -78,7 +79,8 @@ public JobMasterService(final String jobId,
this.autoScaleMetricsConfig = autoScaleMetricsConfig;
this.masterClientApi = masterClientApi;
this.jobAutoScaler = new JobAutoScaler(jobId, schedInfo, masterClientApi, context, jobAutoscalerManager);
this.metricObserver = new WorkerMetricHandler(jobId, jobAutoScaler.getObserver(), masterClientApi, autoScaleMetricsConfig, jobAutoscalerManager).initAndGetMetricDataObserver();
this.workerMetricHandler = new WorkerMetricHandler(jobId, jobAutoScaler.getObserver(), masterClientApi, autoScaleMetricsConfig, jobAutoscalerManager);
this.metricObserver = workerMetricHandler.initAndGetMetricDataObserver();
this.observableOnCompleteCallback = observableOnCompleteCallback;
this.observableOnErrorCallback = observableOnErrorCallback;
this.observableOnTerminateCallback = observableOnTerminateCallback;
Expand All @@ -95,13 +97,13 @@ public JobMasterService(final JobScalerContext jobScalerContext,
this.autoScaleMetricsConfig = jobScalerContext.getAutoScaleMetricsConfig();
this.masterClientApi = jobScalerContext.getMasterClientApi();
this.jobAutoScaler = new JobAutoScaler(jobScalerContext, activeRule);
this.metricObserver = new WorkerMetricHandler(
this.workerMetricHandler = new WorkerMetricHandler(
jobId,
jobAutoScaler.getObserver(),
masterClientApi,
autoScaleMetricsConfig,
jobScalerContext.getJobAutoscalerManager())
.initAndGetMetricDataObserver();
jobScalerContext.getJobAutoscalerManager());
this.metricObserver = workerMetricHandler.initAndGetMetricDataObserver();
this.observableOnCompleteCallback = jobScalerContext.getObservableOnCompleteCallback();
this.observableOnErrorCallback = jobScalerContext.getObservableOnErrorCallback();
this.observableOnTerminateCallback = jobScalerContext.getObservableOnTerminateCallback();
Expand Down Expand Up @@ -185,6 +187,7 @@ public void shutdown() {
if (subscription != null) {
subscription.unsubscribe();
}
this.workerMetricHandler.shutdown();
this.jobAutoScaler.shutdown();
this.metricObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -49,6 +49,7 @@
import rx.observers.SerializedObserver;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;


/* package */ class WorkerMetricHandler {
Expand All @@ -72,6 +73,9 @@
}
};
private final JobAutoscalerManager jobAutoscalerManager;
private final CompositeSubscription subscriptions = new CompositeSubscription();
private final CompositeSubscription stageSubscriptions = new CompositeSubscription();
private final AtomicBoolean started = new AtomicBoolean(false);

public WorkerMetricHandler(final String jobId,
final Observer<JobAutoScaler.Event> jobAutoScaleObserver,
Expand Down Expand Up @@ -262,7 +266,24 @@ private void addSourceJobDataPoint(final MetricData datapoint) {

@Override
public Subscriber<? super MetricData> call(final Subscriber<? super Object> child) {
child.add(Schedulers.computation().createWorker().schedulePeriodically(
rx.Scheduler.Worker worker = Schedulers.computation().createWorker();

// Wrap worker to add logging on unsubscribe
rx.Subscription workerSubscription = new rx.Subscription() {
@Override
public void unsubscribe() {
logger.info("Shutting down periodic worker for stage {}", stage);
worker.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return worker.isUnsubscribed();
}
};

child.add(workerSubscription);
worker.schedulePeriodically(
new Action0() {
@Override
public void call() {
Expand Down Expand Up @@ -392,7 +413,7 @@ public void call() {
addScalerEventForSourceJobDrops(numWorkers);
}
}, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS
));
);
return new Subscriber<MetricData>() {
@Override
public void onCompleted() {
Expand Down Expand Up @@ -464,8 +485,12 @@ private void addScalerEventForSourceJobDrops(int numWorkers) {
}

private void start() {
final AtomicReference<List<Subscription>> ref = new AtomicReference<>(new ArrayList<>());
masterClientApi.schedulingChanges(jobId)
if (!started.compareAndSet(false, true)) {
return;
}
subscriptions.add(stageSubscriptions);

Subscription schedulingSubscription = masterClientApi.schedulingChanges(jobId)
.doOnNext(jobSchedulingInfo -> {
final Map<Integer, WorkerAssignments> workerAssignments = jobSchedulingInfo.getWorkerAssignments();
for (Map.Entry<Integer, WorkerAssignments> workerAssignmentsEntry : workerAssignments.entrySet()) {
Expand All @@ -474,10 +499,12 @@ private void start() {
numWorkersByStage.put(workerAssignment.getStage(), workerAssignment.getNumWorkers());
workerHostsByStage.put(workerAssignment.getStage(), new ArrayList<>(workerAssignment.getHosts().values()));
}
}).subscribe();
})
.subscribe();
subscriptions.add(schedulingSubscription);

logger.info("Starting worker metric handler with autoscale config {}", autoScaleMetricsConfig);
metricDataSubject
Subscription metricSubscription = metricDataSubject
.groupBy(metricData -> metricData.getStage())
.lift(new DropOperator<>(WorkerMetricHandler.class.getName()))
.doOnNext(go -> {
Expand All @@ -486,12 +513,15 @@ private void start() {
.lift(new StageMetricDataOperator(stage, lookupNumWorkersByStage, autoScaleMetricsConfig))
.subscribe();
logger.info("adding subscription for stage {} StageMetricDataOperator", stage);
ref.get().add(s);
})
.doOnUnsubscribe(() -> {
for (Subscription s : ref.get())
s.unsubscribe();
stageSubscriptions.add(s);
})
.subscribe();
subscriptions.add(metricSubscription);
}

public void shutdown() {
if (started.get()) {
subscriptions.unsubscribe();
}
}
}
Loading