Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SO-3966 fluent job wait retry api #1233

Merged
merged 10 commits into from
Oct 18, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ public R execute(ServiceProvider context) {
user = User.SYSTEM;
} else {
// if there is authentication configured, but no authorization token found prevent execution and throw UnauthorizedException
Request<?, ?> request = Iterables.getFirst(requests, null);
if (PlatformUtil.isDevVersion()) {
Request<?, ?> request = Iterables.getFirst(requests, null);
System.err.println(request);
}
throw new UnauthorizedException("Missing authorization token");
throw new UnauthorizedException("Missing authorization token")
.withDeveloperMessage("Unable to execute request '%s' without a proper authorization token. Supply one either as a standard HTTP Authorization header or via the token query parameter.", request.getType());
}
} else {
// verify authorization header value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import com.b2international.snowowl.core.ResourceURIWithQuery;
import com.b2international.snowowl.core.domain.ListCollectionResource;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.google.common.base.MoreObjects;

/**
Expand Down Expand Up @@ -62,7 +66,11 @@
* @param fromUri - the resource URI representing the comparison baseline
* @param toUri - the resource URI representing the comparison target
*/
public AnalysisCompareResult(final List<AnalysisCompareResultItem> items, final ResourceURIWithQuery fromUri, final ResourceURIWithQuery toUri) {
@JsonCreator
public AnalysisCompareResult(
@JsonProperty("items") final List<AnalysisCompareResultItem> items,
@JsonProperty("fromUri") final ResourceURIWithQuery fromUri,
@JsonProperty("toUri") final ResourceURIWithQuery toUri) {
super(items);
this.fromUri = checkNotNull(fromUri, "Resource URI 'fromUri' may not be null.");
this.toUri = checkNotNull(toUri, "Resource URI 'toUri' may not be null.");
Expand All @@ -86,17 +94,25 @@
return counters;
}

@JsonSetter
void setCounters(List<NamedCount> counters) {
this.counters = counters;
}

Check warning on line 100 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/compare/AnalysisCompareResult.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/compare/AnalysisCompareResult.java#L99-L100

Added lines #L99 - L100 were not covered by tests

@JsonIgnore
public Integer getTotalChanges() {
return getCounterValue(COUNTER_TOTAL);
}

/**
* @return the number of added primary components between the two points of reference
*/
@JsonIgnore
public Integer getNewComponents() {
return getCounterValue(COUNTER_NEW_COMPONENTS);
}

@JsonIgnore
public void setNewComponents(final Integer newComponents) {
setCounterValue(COUNTER_NEW_COMPONENTS, newComponents);
setCounterValue(COUNTER_TOTAL, Optional.ofNullable(getCounterValue(COUNTER_TOTAL)).orElse(0) + newComponents);
Expand All @@ -105,10 +121,12 @@
/**
* @return the number of changed primary components between the two points of reference
*/
@JsonIgnore
public Integer getChangedComponents() {
return getCounterValue(COUNTER_CHANGED_COMPONENTS);
}

@JsonIgnore
public void setChangedComponents(final Integer changedComponents) {
setCounterValue(COUNTER_CHANGED_COMPONENTS, changedComponents);
setCounterValue(COUNTER_TOTAL, Optional.ofNullable(getCounterValue(COUNTER_TOTAL)).orElse(0) + changedComponents);
Expand All @@ -117,10 +135,12 @@
/**
* @return the number of removed primary components between the two points of reference
*/
@JsonIgnore
public Integer getDeletedComponents() {
return getCounterValue(COUNTER_DELETED_COMPONENTS);
}

@JsonIgnore
public void setDeletedComponents(final Integer deletedComponents) {
setCounterValue(COUNTER_DELETED_COMPONENTS, deletedComponents);
setCounterValue(COUNTER_TOTAL, Optional.ofNullable(getCounterValue(COUNTER_TOTAL)).orElse(0) + deletedComponents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import com.b2international.snowowl.core.ServiceProvider;
import com.b2international.snowowl.core.api.SnowowlRuntimeException;
import com.b2international.snowowl.core.events.util.Promise;
import com.b2international.snowowl.core.identity.User;
import com.b2international.snowowl.core.jobs.JobRequests;
Expand Down Expand Up @@ -144,18 +146,27 @@
return getRequest().execute(context);
}

/**
* Helper to run a fully prepared async request inside a job and store its result.
*
* @return a {@link ScheduleJobRequestBuilder} with only the request part configured to this {@link AsyncRequest}
*/
public ScheduleJobRequestBuilder runAsJob() {
return JobRequests.prepareSchedule()
.setRequest(this);
}

/**
* Wraps the this {@link AsyncRequest}'s {@link #getRequest()} into a {@link ScheduleJobRequestBuilder} and prepares for execution.
*
* @param description - the description to use for the job
* @return the prepared {@link AsyncRequest} that will schedule the request as a job and return the job ID as a result
*/
public AsyncRequest<String> runAsJob(String description) {
return JobRequests.prepareSchedule()
return runAsJob()
.setDescription(description)
.setRequest(this)
.buildAsync();
}
}

/**
* Wraps the this {@link AsyncRequest}'s {@link #getRequest()} into a {@link ScheduleJobRequestBuilder} and prepares for execution.
Expand All @@ -165,10 +176,9 @@
* @return the prepared {@link AsyncRequest} that will schedule the request as a job and return the job ID as a result
*/
public AsyncRequest<String> runAsJob(String jobKey, String description) {
return JobRequests.prepareSchedule()
return runAsJob()

Check warning on line 179 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java#L179

Added line #L179 was not covered by tests
.setKey(jobKey)
.setDescription(description)
.setRequest(this)
.buildAsync();
}

Expand All @@ -181,12 +191,34 @@
* @return the prepared {@link AsyncRequest} that will schedule the request as a job and return the job ID as a result
*/
public AsyncRequest<String> runAsJobWithRestart(String jobKey, String description) {
return JobRequests.prepareSchedule()
return runAsJob()
.setKey(jobKey)
.setDescription(description)
.setRequest(this)
.setRestart(true)
.buildAsync();
}

/**
* A simple poller implementation that reuses this prepared {@link AsyncRequest} until a certain condition is met in the response object.
*
* @param bus - the bus to use for request execution
* @param pollIntervalMillis - the polling interval between retries
* @param canFinish
* @return
*/
public Promise<R> retryUntil(IEventBus bus, long pollIntervalMillis, Predicate<R> canFinish) {
return execute(bus).thenWith(result -> {

Check warning on line 210 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java#L210

Added line #L210 was not covered by tests
if (canFinish.test(result)) {
return Promise.immediate(result);

Check warning on line 212 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java#L212

Added line #L212 was not covered by tests
} else {
try {
Thread.sleep(pollIntervalMillis);
} catch (InterruptedException e) {
throw new SnowowlRuntimeException(e);

Check warning on line 217 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java#L215-L217

Added lines #L215 - L217 were not covered by tests
}
return retryUntil(bus, pollIntervalMillis, canFinish);

Check warning on line 219 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/events/AsyncRequest.java#L219

Added line #L219 was not covered by tests
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.b2international.index.*;
import com.b2international.index.BulkUpdate;
import com.b2international.index.Hits;
import com.b2international.index.Index;
import com.b2international.index.Searcher;
import com.b2international.index.aggregations.Aggregation;
import com.b2international.index.aggregations.AggregationBuilder;
import com.b2international.index.query.Expression;
Expand Down Expand Up @@ -112,6 +115,33 @@
});
}

public String schedule(RemoteJob job) {

// then register doc
final String jobId = job.getId();
LOG.trace("Scheduled job {}", jobId);
// try to convert the request to a param object
String parameters;
try {
parameters = mapper.writeValueAsString(job.getParameters(mapper));
} catch (Throwable e) {
parameters = "";

Check warning on line 128 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/RemoteJobTracker.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/RemoteJobTracker.java#L128

Added line #L128 was not covered by tests
}
put(RemoteJobEntry.builder()
.id(jobId)
.key(job.getKey())
.description(job.getDescription())
.user(job.getUser())
.parameters(parameters)
.scheduleDate(new Date())
.build());

// schedule the job after we successfully wrote the job doc into the index
// previously this was done through a listener which was executed synchronously before the job actually stated, see IJobChangeListener.scheduled(...)
job.schedule();

return jobId;
}

public void requestCancel(String jobId) {
final RemoteJobEntry job = get(jobId);
Expand Down Expand Up @@ -297,27 +327,7 @@

@Override
public void scheduled(IJobChangeEvent event) {
if (event.getJob() instanceof RemoteJob) {
final RemoteJob job = (RemoteJob) event.getJob();
final String jobId = job.getId();
LOG.trace("Scheduled job {}", jobId);
// try to convert the request to a param object
String parameters;
try {
parameters = mapper.writeValueAsString(job.getParameters(mapper));
} catch (Throwable e) {
parameters = "";
}
put(RemoteJobEntry.builder()
.id(jobId)
.key(job.getKey())
.description(job.getDescription())
.user(job.getUser())
.parameters(parameters)
.scheduleDate(new Date())
.build());

}
// handled by the ScheduleJobRequest.execute(...) logic which calls the scheduled(RemoteJob) method on this tracker
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 B2i Healthcare Pte Ltd, http://b2i.sg
* Copyright 2017-2023 B2i Healthcare Pte Ltd, http://b2i.sg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import org.hibernate.validator.constraints.NotEmpty;

import com.b2international.commons.exceptions.AlreadyExistsException;
import com.b2international.commons.exceptions.BadRequestException;
import com.b2international.commons.exceptions.ConflictException;
import com.b2international.snowowl.core.ServiceProvider;
import com.b2international.snowowl.core.events.Request;
Expand Down Expand Up @@ -59,22 +60,29 @@
private final boolean autoClean;

private final boolean restart;

private final boolean cached;

private final SerializableSchedulingRule schedulingRule;

ScheduleJobRequest(String key, String user, String description, Request<ServiceProvider, ?> request, SerializableSchedulingRule schedulingRule, boolean autoClean, boolean restart) {
ScheduleJobRequest(String key, String user, String description, Request<ServiceProvider, ?> request, SerializableSchedulingRule schedulingRule, boolean autoClean, boolean restart, boolean cached) {
this.key = key;
this.user = user;
this.request = request;
this.description = description;
this.schedulingRule = schedulingRule;
this.autoClean = autoClean;
this.restart = restart;
this.cached = cached;
}

@Override
public String execute(ServiceProvider context) {

if (cached && autoClean) {
throw new BadRequestException("Automatically cleaned jobs cannot be cached.");

Check warning on line 83 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java#L83

Added line #L83 was not covered by tests
}

final String id = IDs.sha1(key);

try {
Expand All @@ -89,15 +97,23 @@

if (existingJob.isPresent()) {
RemoteJobEntry job = existingJob.get();

// if running, fail
if (!job.isDone()) {
throw new AlreadyExistsException(String.format("Job[%s]", request.getType()), key);
}

// if restart not requested, fail
if (!restart) {
throw new ConflictException("An existing job is present with the same '%s' key. Request 'restart' if the previous job can be safely overriden.", key);

// if cached reuse
if (cached) {
// use an existing running job entry until it is not done or when restart not requested, which kinda forms an asynchronous cache for any request
if (!job.isDeleted() && (!job.isDone() || !restart)) {
return id;

Check warning on line 105 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java#L105

Added line #L105 was not covered by tests
}
} else {
// if running, fail
if (!job.isDone()) {
throw new AlreadyExistsException(String.format("Job[%s]", request.getType()), key);

Check warning on line 110 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java#L110

Added line #L110 was not covered by tests
}

// if restart not requested, fail
if (!restart) {
throw new ConflictException("An existing job is present with the same '%s' key. Request 'restart' if the previous job can be safely overriden.", key);

Check warning on line 115 in core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java

View check run for this annotation

Codecov / codecov/patch

core/com.b2international.snowowl.core/src/com/b2international/snowowl/core/jobs/ScheduleJobRequest.java#L115

Added line #L115 was not covered by tests
}
}

// otherwise delete the existing job and create a new one using the same key
Expand All @@ -113,8 +129,7 @@
if (schedulingRule != null) {
job.setRule(schedulingRule);
}
job.schedule();
return id;
return context.service(RemoteJobTracker.class).schedule(job);
} finally {
SCHEDULE_LOCK.release();
}
Expand Down