Skip to content

Commit

Permalink
Convert remote license checker to use LicensedFeature (#79876) (#79975)
Browse files Browse the repository at this point in the history
The RemoteClusterLicenseChecker pulls the license level of a remote
cluster and checks it allows a local feature to communicate with that
cluster. It does the check with a lambda, but these methods could be out
of sync with the actual licensed feature. This commit converts the
remote license checker to take in the feature object that should be
checked.
  • Loading branch information
rjernst committed Oct 28, 2021
1 parent e6d4fad commit d497933
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.core.ClientHelper;
Expand Down Expand Up @@ -242,7 +241,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Function<Exception, ElasticsearchStatusException> unknownLicense
) {
// we have to check the license on the remote cluster
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
new RemoteClusterLicenseChecker(client, CcrConstants.CCR_FEATURE).checkRemoteClusterLicenses(
Collections.singletonList(clusterAlias),
new ActionListener<RemoteClusterLicenseChecker.LicenseCheck>() {

Expand Down Expand Up @@ -450,11 +449,7 @@ private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicen
clusterAlias,
leaderIndex,
clusterAlias,
RemoteClusterLicenseChecker.buildErrorMessage(
"ccr",
licenseCheck.remoteClusterLicenseInfo(),
RemoteClusterLicenseChecker::isAllowedByLicense
)
RemoteClusterLicenseChecker.buildErrorMessage(CcrConstants.CCR_FEATURE, licenseCheck.remoteClusterLicenseInfo())
);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}
Expand All @@ -467,11 +462,7 @@ private static ElasticsearchStatusException clusterStateNonCompliantRemoteLicens
Locale.ROOT,
"can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; %s",
clusterAlias,
RemoteClusterLicenseChecker.buildErrorMessage(
"ccr",
licenseCheck.remoteClusterLicenseInfo(),
RemoteClusterLicenseChecker::isAllowedByLicense
)
RemoteClusterLicenseChecker.buildErrorMessage(CcrConstants.CCR_FEATURE, licenseCheck.remoteClusterLicenseInfo())
);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
Expand All @@ -27,11 +28,12 @@
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.license.XPackLicenseState.isAllowedByOperationMode;

/**
* Checks remote clusters for license compatibility with a specified license predicate.
* Checks remote clusters for license compatibility with a specified licensed feature.
*/
public final class RemoteClusterLicenseChecker {

Expand Down Expand Up @@ -125,23 +127,19 @@ private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {

private static final ClusterNameExpressionResolver clusterNameExpressionResolver = new ClusterNameExpressionResolver();
private final Client client;
private final Predicate<License.OperationMode> predicate;
private final LicensedFeature feature;

/**
* Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate
* does not need to check for the active license state as this is handled by the remote cluster license checker.
* Constructs a remote cluster license checker with the specified licensed feature for checking license compatibility. The feature
* does not need to check for the active license state as this is handled by the remote cluster license checker. If the feature
* is {@code null} a check is only performed on whether the license is active.
*
* @param client the client
* @param predicate the license predicate
* @param feature the licensed feature
*/
public RemoteClusterLicenseChecker(final Client client, final Predicate<License.OperationMode> predicate) {
public RemoteClusterLicenseChecker(final Client client, @Nullable final LicensedFeature feature) {
this.client = client;
this.predicate = predicate;
}

public static boolean isAllowedByLicense(final XPackInfoResponse.LicenseInfo licenseInfo) {
final License.OperationMode mode = License.OperationMode.parse(licenseInfo.getMode());
return XPackLicenseState.isAllowedByOperationMode(mode, License.OperationMode.PLATINUM);
this.feature = feature;
}

/**
Expand Down Expand Up @@ -169,8 +167,8 @@ public void onResponse(final XPackInfoResponse xPackInfoResponse) {
listener.onFailure(new ResourceNotFoundException("license info is missing for cluster [" + clusterAlias.get() + "]"));
return;
}
if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false
|| predicate.test(License.OperationMode.parse(licenseInfo.getMode())) == false) {

if (isActive(feature, licenseInfo) == false || isAllowed(feature, licenseInfo) == false) {
listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo)));
return;
}
Expand All @@ -197,6 +195,15 @@ public void onFailure(final Exception e) {
remoteClusterLicense(clusterAlias.get(), infoListener);
}

private static boolean isActive(LicensedFeature feature, XPackInfoResponse.LicenseInfo licenseInfo) {
return feature != null && feature.isNeedsActive() == false || licenseInfo.getStatus() == LicenseStatus.ACTIVE;
}

private static boolean isAllowed(LicensedFeature feature, XPackInfoResponse.LicenseInfo licenseInfo) {
License.OperationMode mode = License.OperationMode.parse(licenseInfo.getMode());
return feature == null || isAllowedByOperationMode(mode, feature.getMinimumOperationMode());
}

private void remoteClusterLicense(final String clusterAlias, final ActionListener<XPackInfoResponse> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final ContextPreservingActionListener<XPackInfoResponse> contextPreservingActionListener = new ContextPreservingActionListener<>(
Expand Down Expand Up @@ -274,22 +281,19 @@ public static List<String> remoteClusterAliases(final Set<String> remoteClusters
* @param remoteClusterLicenseInfo the remote cluster license info of the cluster that failed the license check
* @return an error message representing license incompatibility
*/
public static String buildErrorMessage(
final String feature,
final RemoteClusterLicenseInfo remoteClusterLicenseInfo,
final Predicate<XPackInfoResponse.LicenseInfo> predicate
) {
public static String buildErrorMessage(final LicensedFeature feature, final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {
final StringBuilder error = new StringBuilder();
if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) {
if (isActive(feature, remoteClusterLicenseInfo.licenseInfo()) == false) {
error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias()));
} else {
assert predicate.test(remoteClusterLicenseInfo.licenseInfo()) == false : "license must be incompatible to build error message";
assert isAllowed(feature, remoteClusterLicenseInfo.licenseInfo()) == false
: "license must be incompatible to build error message";
final String message = String.format(
Locale.ROOT,
"the license mode [%s] on cluster [%s] does not enable [%s]",
License.OperationMode.parse(remoteClusterLicenseInfo.licenseInfo().getMode()),
remoteClusterLicenseInfo.clusterAlias(),
feature
feature.getName()
);
error.append(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,11 @@ public Map<FeatureUsage, Long> getLastUsed() {
return usage.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> timeConverter.apply(e.getValue())));
}

public static boolean isMachineLearningAllowedForOperationMode(final OperationMode operationMode) {
return isAllowedByOperationMode(operationMode, OperationMode.PLATINUM);
}

public static boolean isFipsAllowedForOperationMode(final OperationMode operationMode) {
return isAllowedByOperationMode(operationMode, OperationMode.PLATINUM);
}

public static boolean isCcrAllowedForOperationMode(final OperationMode operationMode) {
return isAllowedByOperationMode(operationMode, OperationMode.PLATINUM);
}

public static boolean isAllowedByOperationMode(final OperationMode operationMode, final OperationMode minimumMode) {
static boolean isAllowedByOperationMode(final OperationMode operationMode, final OperationMode minimumMode) {
if (OperationMode.TRIAL == operationMode) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() {
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);
final AtomicReference<RemoteClusterLicenseChecker.LicenseCheck> licenseCheck = new AtomicReference<>();

licenseChecker.checkRemoteClusterLicenses(
Expand Down Expand Up @@ -202,10 +200,8 @@ public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() {
return null;
}).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any());

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);
final AtomicReference<RemoteClusterLicenseChecker.LicenseCheck> licenseCheck = new AtomicReference<>();

licenseChecker.checkRemoteClusterLicenses(
Expand Down Expand Up @@ -251,10 +247,8 @@ public void testCheckRemoteClusterLicencesGivenNonExistentCluster() {
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);
final AtomicReference<Exception> exception = new AtomicReference<>();

licenseChecker.checkRemoteClusterLicenses(
Expand Down Expand Up @@ -294,10 +288,8 @@ public void testRemoteClusterLicenseCallUsesSystemContext() throws InterruptedEx
return null;
}).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any());

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);

final List<String> remoteClusterAliases = Collections.singletonList("valid");
licenseChecker.checkRemoteClusterLicenses(
Expand Down Expand Up @@ -337,10 +329,8 @@ public void testListenerIsExecutedWithCallingContext() throws InterruptedExcepti
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));
responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null));

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);

final AtomicBoolean listenerInvoked = new AtomicBoolean();
threadPool.getThreadContext().putHeader("key", "value");
Expand Down Expand Up @@ -383,10 +373,8 @@ public void testBuildErrorMessageForActiveCompatibleLicense() {
"platinum-cluster",
platinumLicence
);
final AssertionError e = expectThrows(
AssertionError.class,
() -> RemoteClusterLicenseChecker.buildErrorMessage("", info, RemoteClusterLicenseChecker::isAllowedByLicense)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "foo", License.OperationMode.PLATINUM);
final AssertionError e = expectThrows(AssertionError.class, () -> RemoteClusterLicenseChecker.buildErrorMessage(feature, info));
assertThat(e, hasToString(containsString("license must be incompatible to build error message")));
}

Expand All @@ -396,9 +384,10 @@ public void testBuildErrorMessageForIncompatibleLicense() {
"basic-cluster",
basicLicense
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
assertThat(
RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isAllowedByLicense),
equalTo("the license mode [BASIC] on cluster [basic-cluster] does not enable [Feature]")
RemoteClusterLicenseChecker.buildErrorMessage(feature, info),
equalTo("the license mode [BASIC] on cluster [basic-cluster] does not enable [feature]")
);
}

Expand All @@ -408,8 +397,9 @@ public void testBuildErrorMessageForInactiveLicense() {
"expired-cluster",
expiredLicense
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "foo", License.OperationMode.PLATINUM);
assertThat(
RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isAllowedByLicense),
RemoteClusterLicenseChecker.buildErrorMessage(feature, info),
equalTo("the license on cluster [expired-cluster] is not active")
);
}
Expand All @@ -424,10 +414,8 @@ public void testCheckRemoteClusterLicencesNoLicenseMetadata() {
return null;
}).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any());

final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(
client,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)
);
LicensedFeature.Momentary feature = LicensedFeature.momentary(null, "feature", License.OperationMode.PLATINUM);
final RemoteClusterLicenseChecker licenseChecker = new RemoteClusterLicenseChecker(client, feature);
final AtomicReference<Exception> exception = new AtomicReference<>();

licenseChecker.checkRemoteClusterLicenses(
Expand Down

0 comments on commit d497933

Please sign in to comment.