Skip to content

Commit

Permalink
Autoscaling capacity off transport thread (#80499) (#81598)
Browse files Browse the repository at this point in the history
Calculating the autoscaling capacity can take considerable time and
has therefore been moved off transport threads. Also, we ensure that
only one thread calculates capacity and only compute one result if
multiple requests are pending, serving all pending requests in one go.
Finally, the capacity requests are now cancellable.

Closes #79104
  • Loading branch information
henningandersen committed Dec 14, 2021
1 parent 32cb6c3 commit f8fa41b
Show file tree
Hide file tree
Showing 17 changed files with 944 additions and 40 deletions.
4 changes: 4 additions & 0 deletions x-pack/plugin/autoscaling/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ dependencies {
testImplementation project(path: xpackModule('data-streams'))
testImplementation project(path: xpackModule('searchable-snapshots'))
testImplementation project(path: xpackModule('ilm'))

testImplementation "com.fasterxml.jackson.core:jackson-databind:2.10.4"
testImplementation project(':modules:transport-netty4') // for http
testImplementation project(':plugins:transport-nio') // for http
}

addQaCheckDependencies()
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class AutoscalingCountTestDeciderService implements AutoscalingDeciderService {

public static final String NAME = "count";

private final AtomicInteger counter = new AtomicInteger();

@Override
public String name() {
return NAME;
}

@Override
public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDeciderContext context) {
return new AutoscalingDeciderResult(null, new CountReason(counter.incrementAndGet()));
}

@Override
public List<Setting<?>> deciderSettings() {
return org.elasticsearch.core.List.of();
}

@Override
public List<DiscoveryNodeRole> roles() {
return Collections.unmodifiableList(new ArrayList<>(DiscoveryNode.getPossibleRoles()));
}

@Override
public boolean defaultOn() {
return false;
}

public static class CountReason implements AutoscalingDeciderResult.Reason {
private final int count;

public CountReason(int count) {
this.count = count;
}

public CountReason(StreamInput in) throws IOException {
this.count = in.readInt();
}

@Override
public String summary() {
return Integer.toString(count);
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(count);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("count", count);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CountReason that = (CountReason) o;
return count == that.count;
}

@Override
public int hashCode() {
return Objects.hash(count);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class AutoscalingSyncTestDeciderService implements AutoscalingDeciderService {

// ! ensures this decider is always first in tests.
public static final String NAME = "!sync";

public static final Setting<Boolean> CHECK_FOR_CANCEL = Setting.boolSetting("check_for_cancel", false);

private final CyclicBarrier syncBarrier = new CyclicBarrier(2);

public AutoscalingSyncTestDeciderService() {}

@Override
public String name() {
return NAME;
}

@Override
public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDeciderContext context) {
internalSync();
internalSync();
if (CHECK_FOR_CANCEL.get(configuration)) {
context.ensureNotCancelled();
assert false;
}
return null;
}

private void internalSync() {
try {
syncBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
assert false : e;
}
}

@Override
public List<Setting<?>> deciderSettings() {
return org.elasticsearch.core.List.of(CHECK_FOR_CANCEL);
}

@Override
public List<DiscoveryNodeRole> roles() {
return Collections.unmodifiableList(new ArrayList<>(DiscoveryNode.getPossibleRoles()));
}

@Override
public boolean defaultOn() {
return false;
}

public <E extends Exception> void sync(CheckedRunnable<E> run) throws E {
internalSync();
try {
run.run();
} finally {
internalSync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling;

import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class LocalStateAutoscaling extends LocalStateCompositeXPackPlugin {

private final AutoscalingTestPlugin testPlugin;

public LocalStateAutoscaling(final Settings settings) {
super(settings, null);
testPlugin = new AutoscalingTestPlugin();
plugins.add(testPlugin);
}

public AutoscalingTestPlugin testPlugin() {
return testPlugin;
}

public static class AutoscalingTestPlugin extends Autoscaling {
private final AutoscalingSyncTestDeciderService syncDeciderService = new AutoscalingSyncTestDeciderService();

private AutoscalingTestPlugin() {
super(new AutoscalingLicenseChecker(() -> true));
}

@Override
public Set<AutoscalingDeciderService> createDeciderServices(AllocationDeciders allocationDeciders) {
Set<AutoscalingDeciderService> deciderServices = new HashSet<>(super.createDeciderServices(allocationDeciders));
deciderServices.add(syncDeciderService);
deciderServices.add(new AutoscalingCountTestDeciderService());
return deciderServices;
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return CollectionUtils.appendToCopy(
super.getNamedWriteables(),
new NamedWriteableRegistry.Entry(
AutoscalingDeciderResult.Reason.class,
AutoscalingCountTestDeciderService.NAME,
AutoscalingCountTestDeciderService.CountReason::new
)
);
}

/**
* Sync with decider service, running the runnable when we are sure that an active capacity response is blocked in the sync decider.
* @param run
*/
public <E extends Exception> void syncWithDeciderService(CheckedRunnable<E> run) throws E {
syncDeciderService.sync(run);
}
}
}

0 comments on commit f8fa41b

Please sign in to comment.