Skip to content

Commit

Permalink
Disallow adding listener to completed jobs [HZ-2254] (#24197)
Browse files Browse the repository at this point in the history
  • Loading branch information
burakgok committed May 15, 2023
1 parent 4a03057 commit 9d9d7aa
Show file tree
Hide file tree
Showing 27 changed files with 445 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@
/**
* Adds a JobStatusListener to the specified job.
*/
@Generated("cc493370802f788c8333a1e7c81a676c")
@Generated("b54281a30cabeb2b34dfee722257fd03")
public final class JetAddJobStatusListenerCodec {
//hex: 0xFE1300
public static final int REQUEST_MESSAGE_TYPE = 16651008;
//hex: 0xFE1301
public static final int RESPONSE_MESSAGE_TYPE = 16651009;
private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int REQUEST_LOCAL_ONLY_FIELD_OFFSET = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
private static final int REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
private static final int REQUEST_LOCAL_ONLY_FIELD_OFFSET = REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET + UUID_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_LOCAL_ONLY_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
private static final int RESPONSE_RESPONSE_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_RESPONSE_FIELD_OFFSET + UUID_SIZE_IN_BYTES;
Expand All @@ -67,20 +68,26 @@ public static class RequestParameters {
*/
public long jobId;

/**
* Address of the job coordinator for light jobs, null otherwise.
*/
public @Nullable java.util.UUID lightJobCoordinator;

/**
* If true fires events that originated from this node only, otherwise fires all events.
*/
public boolean localOnly;
}

public static ClientMessage encodeRequest(long jobId, boolean localOnly) {
public static ClientMessage encodeRequest(long jobId, @Nullable java.util.UUID lightJobCoordinator, boolean localOnly) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
clientMessage.setOperationName("Jet.AddJobStatusListener");
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
encodeUUID(initialFrame.content, REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET, lightJobCoordinator);
encodeBoolean(initialFrame.content, REQUEST_LOCAL_ONLY_FIELD_OFFSET, localOnly);
clientMessage.add(initialFrame);
return clientMessage;
Expand All @@ -91,6 +98,7 @@ public static JetAddJobStatusListenerCodec.RequestParameters decodeRequest(Clien
RequestParameters request = new RequestParameters();
ClientMessage.Frame initialFrame = iterator.next();
request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
request.lightJobCoordinator = decodeUUID(initialFrame.content, REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET);
request.localOnly = decodeBoolean(initialFrame.content, REQUEST_LOCAL_ONLY_FIELD_OFFSET);
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public boolean removeListener(UUID userRegistrationId) {
return false;
}
listenerRegistration.getConnectionRegistrations().forEach((connection, registration) ->
((ClientConnection) connection).removeEventHandler(registration.getCallId()));
connection.removeEventHandler(registration.getCallId()));
return true;
}).get();
} catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions hazelcast/src/main/java/com/hazelcast/jet/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ default void join() {
*
* @return The registration id
* @throws UnsupportedOperationException if the cluster version is less than 5.3
* @throws IllegalStateException if the job is completed or failed
* @since 5.3
*/
UUID addStatusListener(@Nonnull JobStatusListener listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* @see Job#removeStatusListener(UUID)
* @since 5.3
*/
@FunctionalInterface
public interface JobStatusListener {
/**
* Invoked upon job status change. <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobStatusListener;
import com.hazelcast.jet.config.DeltaJobConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.exception.CancellationByUserException;
import com.hazelcast.jet.impl.operation.AddJobStatusListenerOperation;
import com.hazelcast.jet.impl.operation.UpdateJobConfigOperation;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.exception.TargetNotMemberException;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.eventservice.impl.operations.RegistrationOperation;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -43,6 +48,9 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static com.hazelcast.jet.core.JobStatus.COMPLETED;
import static com.hazelcast.jet.core.JobStatus.FAILED;
import static com.hazelcast.jet.core.JobStatus.RUNNING;
import static com.hazelcast.jet.impl.util.ExceptionUtil.peel;
import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
Expand Down Expand Up @@ -190,10 +198,9 @@ public final JobStatus getStatus() {
if (isLightJob()) {
CompletableFuture<Void> f = getFuture();
if (!f.isDone()) {
return JobStatus.RUNNING;
return RUNNING;
}
return f.isCompletedExceptionally()
? JobStatus.FAILED : JobStatus.COMPLETED;
return f.isCompletedExceptionally() ? FAILED : COMPLETED;
} else {
return getStatus0();
}
Expand Down Expand Up @@ -294,6 +301,16 @@ private void terminate(TerminationMode mode) {
}
}

@Override
public UUID addStatusListener(@Nonnull JobStatusListener listener) {
try {
return doAddStatusListener(listener);
} catch (JobNotFoundException ignored) {
throw cannotAddStatusListener(
future.isCompletedExceptionally() ? FAILED : COMPLETED);
}
}

@Override
public String toString() {
return "Job{id=" + getIdString()
Expand Down Expand Up @@ -329,12 +346,43 @@ public boolean isLightJob() {
protected abstract JobConfig doGetJobConfig();

/**
* Sends an {@link UpdateJobConfigOperation} to the master member. On the master member,
* if the job is SUSPENDED, the job record is updated both locally and {@linkplain
* JobRepository#JOB_RECORDS_MAP_NAME globally} (in order for {@link #getConfig()} to
* reflect the changes); otherwise, the operation fails.
* Applies the specified delta configuration to this job and returns the updated
* configuration. Synchronization with {@link #getConfig()} is handled by {@link
* #updateConfig}.
* @implNote
* Sends an {@link UpdateJobConfigOperation} to the master member. On the master
* member, if the job is SUSPENDED, the job record is updated both locally and
* {@linkplain JobRepository#JOB_RECORDS_MAP_NAME globally} (in order for {@link
* #getConfig()} to reflect the changes); otherwise, the operation fails.
*/
protected abstract JobConfig doUpdateJobConfig(DeltaJobConfig deltaConfig);
protected abstract JobConfig doUpdateJobConfig(@Nonnull DeltaJobConfig deltaConfig);

/**
* Associates the specified listener to this job.
* @throws JobNotFoundException if the job's master context is cleaned up after job
* completion/failure. This is translated to {@link IllegalStateException} by
* {@link #addStatusListener}.
* @implNote
* Listeners added to a job after it completes will not be removed automatically since
* the job has already produced a terminal event. In order to make auto-deregistration
* race-free, it is not allowed to add listeners to completed jobs. Checking the job
* status before the listener registration will not work since they are not atomic. The
* registration should be delegated to the job coordinator, but the {@code listener}
* is local. To overcome this, the following algorithm is used: <ol>
* <li> A {@link Registration} object is created with a unique registration id. The
* {@code listener} is cached locally by the registration id.
* <li> The {@link Registration} object is delivered to the job coordinator via an
* {@link AddJobStatusListenerOperation}. If the job is not completed/failed, the
* coordinator invokes a {@link RegistrationOperation} on the subscriber member
* —or all members if the registration is global. The registration operation is
* guaranteed to be executed earlier than a possible terminal event since the
* operation is executed as an event callback with the same {@code orderKey} as
* job events.
* <li> When the subscriber member receives the {@link RegistrationOperation}, the
* {@link Registration}'s {@code listener} is restored from the cache and the
* registration is completed. </ol>
*/
protected abstract UUID doAddStatusListener(@Nonnull JobStatusListener listener);

/**
* Return the ID of the coordinator - the master member for normal jobs and
Expand Down Expand Up @@ -397,6 +445,10 @@ protected void checkNotLightJob(String msg) {
}
}

public static IllegalStateException cannotAddStatusListener(JobStatus status) {
return new IllegalStateException("Cannot add status listener to a " + status + " job");
}

private abstract class CallbackBase implements BiConsumer<Void, Throwable> {
private final NonCompletableFuture future;

Expand Down
18 changes: 11 additions & 7 deletions hazelcast/src/main/java/com/hazelcast/jet/impl/ClientJobProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected JobConfig doGetJobConfig() {
}

@Override
protected JobConfig doUpdateJobConfig(DeltaJobConfig deltaConfig) {
protected JobConfig doUpdateJobConfig(@Nonnull DeltaJobConfig deltaConfig) {
return callAndRetryIfTargetNotFound(() -> {
Data deltaConfigData = serializationService().toData(deltaConfig);
ClientMessage request = JetUpdateJobConfigCodec.encodeRequest(getId(), deltaConfigData);
Expand Down Expand Up @@ -256,12 +256,16 @@ protected boolean isRunning() {

@Nonnull
@Override
public UUID addStatusListener(@Nonnull JobStatusListener listener) {
protected UUID doAddStatusListener(@Nonnull JobStatusListener listener) {
requireNonNull(listener, "Listener cannot be null");
ClientJobStatusEventHandler handler = new ClientJobStatusEventHandler(listener);
handler.registrationId = container().getListenerService().registerListener(
createJobStatusListenerCodec(getId()), handler);
return handler.registrationId;
try {
ClientJobStatusEventHandler handler = new ClientJobStatusEventHandler(listener);
handler.registrationId = container().getListenerService()
.registerListener(createJobStatusListenerCodec(getId()), handler);
return handler.registrationId;
} catch (Throwable t) {
throw rethrow(t.getCause());
}
}

@Override
Expand All @@ -273,7 +277,7 @@ private ListenerMessageCodec createJobStatusListenerCodec(final long jobId) {
return new ListenerMessageCodec() {
@Override
public ClientMessage encodeAddRequest(boolean localOnly) {
return JetAddJobStatusListenerCodec.encodeRequest(jobId, localOnly);
return JetAddJobStatusListenerCodec.encodeRequest(jobId, lightJobCoordinator, localOnly);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.hazelcast.security.SecurityContext;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.version.Version;
Expand Down Expand Up @@ -113,6 +114,7 @@
import static com.hazelcast.jet.core.JobStatus.RUNNING;
import static com.hazelcast.jet.core.JobStatus.SUSPENDED;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.impl.AbstractJobProxy.cannotAddStatusListener;
import static com.hazelcast.jet.impl.JobClassLoaderService.JobPhase.COORDINATOR;
import static com.hazelcast.jet.impl.TerminationMode.CANCEL_FORCEFUL;
import static com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.deserializeWithCustomClassLoader;
Expand Down Expand Up @@ -841,6 +843,31 @@ public CompletableFuture<JobConfig> updateJobConfig(long jobId, @Nonnull DeltaJo
);
}

/**
* Applies the specified listener registration if the job is not completed/failed.
* Otherwise, an {@link IllegalStateException} is thrown by the returned future.
*/
public CompletableFuture<UUID> addJobStatusListener(long jobId, boolean isLightJob, Registration registration) {
if (isLightJob) {
Object mc = lightMasterContexts.get(jobId);
if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
throw new JobNotFoundException(jobId);
} else {
return completedFuture(((LightMasterContext) mc).addStatusListener(registration));
}
}
return callWithJob(jobId,
masterContext -> masterContext.addStatusListener(registration),
jobResult -> {
throw cannotAddStatusListener(jobResult.getJobStatus());
},
jobRecord -> {
JobEventService jobEventService = nodeEngine.getService(JobEventService.SERVICE_NAME);
return jobEventService.handleAllRegistrations(jobId, registration).getId();
},
null);
}

/**
* Add the given member to shutting down members. This will prevent
* submission of more executions until the member actually leaves the
Expand Down
30 changes: 19 additions & 11 deletions hazelcast/src/main/java/com/hazelcast/jet/impl/JobEventService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@

package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.JobStatusEvent;
import com.hazelcast.jet.JobStatusListener;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.eventservice.EventPublishingService;
import com.hazelcast.spi.impl.eventservice.EventRegistration;
import com.hazelcast.spi.impl.eventservice.EventService;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceImpl;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.eventservice.impl.TrueEventFilter;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static com.hazelcast.internal.util.ConcurrencyUtil.CALLER_RUNS;
import static com.hazelcast.jet.Util.idToString;

public class JobEventService implements EventPublishingService<JobStatusEvent, JobStatusListener> {
public static final String SERVICE_NAME = "hz:impl:jobEventService";

private final EventService eventService;
private final EventServiceImpl eventService;
private final Address address;

public JobEventService(NodeEngine nodeEngine) {
eventService = nodeEngine.getEventService();
eventService = (EventServiceImpl) nodeEngine.getEventService();
address = nodeEngine.getThisAddress();
}

@Override
Expand All @@ -54,17 +59,20 @@ public void publishEvent(long jobId, JobStatus oldStatus, JobStatus newStatus,
}
}

public UUID addLocalEventListener(long jobId, JobStatusListener listener) {
return eventService.registerLocalListener(SERVICE_NAME, idToString(jobId), listener).getId();
}

public UUID addEventListener(long jobId, JobStatusListener listener) {
return eventService.registerListener(SERVICE_NAME, idToString(jobId), listener).getId();
}

public CompletableFuture<UUID> addEventListenerAsync(long jobId, JobStatusListener listener) {
return eventService.registerListenerAsync(SERVICE_NAME, idToString(jobId), listener)
.thenApplyAsync(EventRegistration::getId, CALLER_RUNS);
public Registration prepareRegistration(long jobId, JobStatusListener listener, boolean localOnly) {
UUID registrationId = UuidUtil.newUnsecureUUID();
Registration registration = new Registration(registrationId, SERVICE_NAME, idToString(jobId),
TrueEventFilter.INSTANCE, address, listener, localOnly);
eventService.cacheListener(registration);
return registration;
}

public EventRegistration handleAllRegistrations(long jobId, Registration registration) {
return eventService.handleAllRegistrations(registration, (int) jobId);
}

public boolean removeEventListener(long jobId, UUID id) {
Expand Down

0 comments on commit 9d9d7aa

Please sign in to comment.