Skip to content

Commit

Permalink
[Transform] Make use of delegateFailureAndWrap (elastic#106034)
Browse files Browse the repository at this point in the history
* [Transform] Make use of delegateFailureAndWrap

Refactoring to a later pattern of ActionListener, reducing memory
footprint and removing some redudant lines of code.

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
2 people authored and fang-xing-esql committed Mar 8, 2024
1 parent 155a04d commit 1a94033
Showing 1 changed file with 20 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.Function;
import org.elasticsearch.xpack.transform.transforms.FunctionFactory;

import java.time.Instant;
import java.util.List;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary;

public class TransportPutTransformAction extends AcknowledgedTransportMasterNodeAction<Request> {
Expand Down Expand Up @@ -108,21 +105,19 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS
}

// <3> Create the transform
ActionListener<ValidateTransformAction.Response> validateTransformListener = ActionListener.wrap(
unusedValidationResponse -> putTransform(request, listener),
listener::onFailure
ActionListener<ValidateTransformAction.Response> validateTransformListener = listener.delegateFailureAndWrap(
(l, unused) -> putTransform(request, l)
);

// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
aVoid -> ClientHelper.executeAsyncWithOrigin(
ActionListener<Void> checkPrivilegesListener = validateTransformListener.delegateFailureAndWrap(
(l, aVoid) -> ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.TRANSFORM_ORIGIN,
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()),
validateTransformListener
),
listener::onFailure
l
)
);

// <1> Early check to verify that the user can create the destination index and can read from the source
Expand Down Expand Up @@ -170,24 +165,19 @@ protected ClusterBlockException checkBlock(PutTransformAction.Request request, C
}

private void putTransform(Request request, ActionListener<AcknowledgedResponse> listener) {

final TransformConfig config = request.getConfig();
// create the function for validation
final Function function = FunctionFactory.create(config);

// <2> Return to the listener
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> {
logger.debug("[{}] created transform", config.getId());
auditor.info(config.getId(), "Created transform.");
List<String> warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig());
for (String warning : warnings) {
logger.warn(() -> format("[%s] %s", config.getId(), warning));
auditor.warning(config.getId(), warning);
}
listener.onResponse(AcknowledgedResponse.TRUE);
}, listener::onFailure);

// <1> Put our transform
transformConfigManager.putTransformConfiguration(config, putTransformConfigurationListener);
var config = request.getConfig();
transformConfigManager.putTransformConfiguration(config, listener.delegateFailureAndWrap((l, unused) -> {
var transformId = config.getId();
logger.debug("[{}] created transform", transformId);
auditor.info(transformId, "Created transform.");

var validationFunc = FunctionFactory.create(config);
TransformConfigLinter.getWarnings(validationFunc, config.getSource(), config.getSyncConfig()).forEach(warning -> {
logger.warn("[{}] {}", transformId, warning);
auditor.warning(transformId, warning);
});

l.onResponse(AcknowledgedResponse.TRUE);
}));
}
}

0 comments on commit 1a94033

Please sign in to comment.