Skip to content

Commit

Permalink
chore: nullness annotations for job progress API (#17841)
Browse files Browse the repository at this point in the history
* chore: nullness annotations for job progress API

* chore: added missing nullness annotation

* fix: make annotations consistent in inheritence hierarchy
  • Loading branch information
jbee committed Jun 20, 2024
1 parent 0524427 commit 54bc995
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,14 @@
* form of a loop.
* </ol>
*
* For each of the three levels a new node is announced up front by calling the corresponding {@link
* #startingProcess(String)}, {@link #startingStage(String)} or {@link #startingWorkItem(String)}
* method.
* For each of the three levels a new node is announced up front by calling the corresponding {@code
* startingProcess}, {@code startingStage} or {@code startingWorkItem} method.
*
* <p>The process will now expect a corresponding completion, for example {@link
* #completedWorkItem(String)} in case of success or {@link #failedWorkItem(String)} in case of an
* error. The different {@link #runStage(Stream, Function, Consumer)} or {@link
* #runStageInParallel(int, Collection, Function, Consumer)} helpers can be used to do the error
* handling correctly and make sure the work items are completed in both success and failure
* scenarios.
* <p>The process will now expect a corresponding completion, for example {@code completedWorkItem}
* in case of success or {@code failedWorkItem} in case of an error. The different {@link
* #runStage(Stream, Function, Consumer)} or {@link #runStageInParallel(int, Collection, Function,
* Consumer)} helpers can be used to do the error handling correctly and make sure the work items
* are completed in both success and failure scenarios.
*
* <p>For stages that do not have work items {@link #runStage(Runnable)} and {@link
* #runStage(Object, Callable)} can be used to make sure completion is handled correctly.
Expand All @@ -115,7 +113,7 @@
* cooperatively by not starting any further work.
*
* <p>When a stage is cancelled the run-methods usually return false to give caller a chance to
* react if needed. The next call to {@link #startingStage(String)} will then throw a {@link
* react if needed. The next call to {@code startingStage} will then throw a {@link
* CancellationException} and thereby short-circuit the rest of the process.
*
* @author Jan Bernitt
Expand Down Expand Up @@ -206,17 +204,17 @@ default void addError(
* Tracking API:
*/

void startingProcess(String description, Object... args);
void startingProcess(@CheckForNull String description, Object... args);

default void startingProcess() {
startingProcess(null);
}

void completedProcess(String summary, Object... args);
void completedProcess(@CheckForNull String summary, Object... args);

void failedProcess(String error, Object... args);
void failedProcess(@CheckForNull String error, Object... args);

default void failedProcess(Exception cause) {
default void failedProcess(@Nonnull Exception cause) {
failedProcess("Process failed: {}", getMessage(cause));
}

Expand All @@ -237,44 +235,44 @@ default void endingProcess(boolean success) {
* @throws CancellationException in case cancellation has been requested before this stage had
* started
*/
void startingStage(String description, int workItems, FailurePolicy onFailure)
void startingStage(@Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure)
throws CancellationException;

default void startingStage(String description, int workItems) {
default void startingStage(@Nonnull String description, int workItems) {
startingStage(description, workItems, FailurePolicy.PARENT);
}

default void startingStage(String description, FailurePolicy onFailure) {
default void startingStage(@Nonnull String description, @Nonnull FailurePolicy onFailure) {
startingStage(description, 0, onFailure);
}

default void startingStage(String description, Object... args) {
default void startingStage(@Nonnull String description, Object... args) {
startingStage(format(description, args), FailurePolicy.PARENT);
}

void completedStage(String summary, Object... args);
void completedStage(@CheckForNull String summary, Object... args);

void failedStage(String error, Object... args);
void failedStage(@Nonnull String error, Object... args);

default void failedStage(Exception cause) {
default void failedStage(@Nonnull Exception cause) {
failedStage(getMessage(cause));
}

default void startingWorkItem(String description, Object... args) {
default void startingWorkItem(@Nonnull String description, Object... args) {
startingWorkItem(format(description, args), FailurePolicy.PARENT);
}

void startingWorkItem(String description, FailurePolicy onFailure);
void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure);

default void startingWorkItem(int i) {
startingWorkItem("#" + (i + 1));
}

void completedWorkItem(String summary, Object... args);
void completedWorkItem(@CheckForNull String summary, Object... args);

void failedWorkItem(String error, Object... args);
void failedWorkItem(@Nonnull String error, Object... args);

default void failedWorkItem(Exception cause) {
default void failedWorkItem(@Nonnull Exception cause) {
failedWorkItem(getMessage(cause));
}

Expand All @@ -288,7 +286,7 @@ default void failedWorkItem(Exception cause) {
* @param items the work items to run in the sequence to run them
* @see #runStage(Collection, Function, Consumer)
*/
default void runStage(Collection<Runnable> items) {
default void runStage(@Nonnull Collection<Runnable> items) {
runStage(items, item -> null, Runnable::run);
}

Expand All @@ -299,7 +297,7 @@ default void runStage(Collection<Runnable> items) {
* description. Items are processed in map iteration order.
* @see #runStage(Collection, Function, Consumer)
*/
default void runStage(Map<String, Runnable> items) {
default void runStage(@Nonnull Map<String, Runnable> items) {
runStage(items.entrySet(), Entry::getKey, entry -> entry.getValue().run());
}

Expand All @@ -314,7 +312,9 @@ default void runStage(Map<String, Runnable> items) {
* @see #runStage(Collection, Function, Consumer)
*/
default <T> void runStage(
Collection<T> items, Function<T, String> description, Consumer<T> work) {
@Nonnull Collection<T> items,
@Nonnull Function<T, String> description,
@Nonnull Consumer<T> work) {
runStage(items.stream(), description, work);
}

Expand All @@ -324,7 +324,10 @@ default <T> void runStage(
*
* @see #runStage(Stream, Function, Consumer,BiFunction)
*/
default <T> void runStage(Stream<T> items, Function<T, String> description, Consumer<T> work) {
default <T> void runStage(
@Nonnull Stream<T> items,
@Nonnull Function<T, String> description,
@Nonnull Consumer<T> work) {
runStage(
items,
description,
Expand All @@ -346,10 +349,10 @@ default <T> void runStage(Stream<T> items, Function<T, String> description, Cons
* @param <T> type of work item input
*/
default <T> void runStage(
Stream<T> items,
Function<T, String> description,
Consumer<T> work,
BiFunction<Integer, Integer, String> summary) {
@Nonnull Stream<T> items,
@Nonnull Function<T, String> description,
@Nonnull Consumer<T> work,
@CheckForNull BiFunction<Integer, Integer, String> summary) {
runStage(
items,
description,
Expand Down Expand Up @@ -377,11 +380,11 @@ default <T> void runStage(
* @param <T> type of work item input
*/
default <T, R> void runStage(
Stream<T> items,
Function<T, String> description,
Function<R, String> result,
Function<T, R> work,
BiFunction<Integer, Integer, String> summary) {
@Nonnull Stream<T> items,
@Nonnull Function<T, String> description,
@CheckForNull Function<R, String> result,
@Nonnull Function<T, R> work,
@CheckForNull BiFunction<Integer, Integer, String> summary) {
int i = 0;
int failed = 0;
for (Iterator<T> it = items.iterator(); it.hasNext(); ) {
Expand Down Expand Up @@ -423,7 +426,7 @@ default <T, R> void runStage(
* @return true, if stage is/was skipped (complected as failed), false otherwise
*/
default boolean autoSkipStage(
BiFunction<Integer, Integer, String> summary, int success, int failed) {
@CheckForNull BiFunction<Integer, Integer, String> summary, int success, int failed) {
if (isSkipCurrentStage()) {
String text = summary == null ? "" : summary.apply(success, failed);
if (isCancelled()) {
Expand All @@ -439,7 +442,7 @@ default boolean autoSkipStage(
/**
* @see #runStage(Function, Runnable)
*/
default boolean runStage(Runnable work) {
default boolean runStage(@Nonnull Runnable work) {
return runStage(null, work);
}

Expand All @@ -454,7 +457,8 @@ default boolean runStage(Runnable work) {
* @param work work for the entire stage
* @return true, if completed successful, false if completed exceptionally
*/
default boolean runStage(Function<Boolean, String> summary, Runnable work) {
default boolean runStage(
@CheckForNull Function<Boolean, String> summary, @Nonnull Runnable work) {
return runStage(
false,
summary,
Expand All @@ -464,14 +468,15 @@ default boolean runStage(Function<Boolean, String> summary, Runnable work) {
});
}

default <T> T runStage(Callable<T> work) {
@CheckForNull
default <T> T runStage(@Nonnull Callable<T> work) {
return runStage(null, work);
}

/**
* @see #runStage(Object, Function, Callable)
*/
default <T> T runStage(T errorValue, Callable<T> work) {
default <T> T runStage(@CheckForNull T errorValue, @Nonnull Callable<T> work) {
return runStage(errorValue, null, work);
}

Expand All @@ -488,7 +493,10 @@ default <T> T runStage(T errorValue, Callable<T> work) {
* @return the value returned by work task when successful or the errorValue in case the task
* threw an {@link Exception}
*/
default <T> T runStage(T errorValue, Function<T, String> summary, Callable<T> work) {
default <T> T runStage(
@CheckForNull T errorValue,
@CheckForNull Function<T, String> summary,
@Nonnull Callable<T> work) {
try {
T res = work.call();
completedStage(summary == null ? null : summary.apply(res));
Expand Down Expand Up @@ -518,7 +526,10 @@ default <T> T runStage(T errorValue, Function<T, String> summary, Callable<T> wo
* @param <T> type of work item input
*/
default <T> void runStageInParallel(
int parallelism, Collection<T> items, Function<T, String> description, Consumer<T> work) {
int parallelism,
@Nonnull Collection<T> items,
@Nonnull Function<T, String> description,
@Nonnull Consumer<T> work) {
if (parallelism <= 1) {
runStage(items, description, work);
return;
Expand Down Expand Up @@ -580,7 +591,7 @@ default <T> void runStageInParallel(
* @param args the pattern arguments.
* @return a formatted message string.
*/
default String format(String pattern, Object... args) {
default String format(@CheckForNull String pattern, Object... args) {
return pattern != null ? MessageFormatter.arrayFormat(pattern, args).getMessage() : null;
}

Expand All @@ -602,8 +613,7 @@ enum Status {
*
* <p>The implementation of {@link FailurePolicy} is done by affecting {@link
* #isSkipCurrentStage()} and {@link #isCancelled()} accordingly after the failure occurred and
* has been tracked using one of the {@link #failedStage(String)} or {@link
* #failedWorkItem(String)} methods.
* has been tracked using one of the {@code failedStage} or {@code failedWorkItem} methods.
*/
enum FailurePolicy {
/**
Expand Down Expand Up @@ -880,7 +890,8 @@ public Item(
}
}

static String getMessage(Exception cause) {
@Nonnull
static String getMessage(@Nonnull Exception cause) {
String msg = cause.getMessage();
return msg == null || msg.isBlank() ? cause.getClass().getName() : msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
*/
package org.hisp.dhis.scheduling;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;

/**
* The {@link NoopJobProgress} can be used as a {@link JobProgress} instance when no actual flow
* control and tracking is wanted. For example in test context or in manual runs of operations that
Expand All @@ -52,12 +55,13 @@ public void completedProcess(String summary, Object... args) {
}

@Override
public void failedProcess(String error, Object... args) {
public void failedProcess(@CheckForNull String error, Object... args) {
// as the name said we do nothing
}

@Override
public void startingStage(String description, int workItems, FailurePolicy onFailure) {
public void startingStage(
@Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) {
// as the name said we do nothing
}

Expand All @@ -67,12 +71,12 @@ public void completedStage(String summary, Object... args) {
}

@Override
public void failedStage(String error, Object... args) {
public void failedStage(@Nonnull String error, Object... args) {
// as the name said we do nothing
}

@Override
public void startingWorkItem(String description, FailurePolicy onFailure) {
public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) {
// as the name said we do nothing
}

Expand All @@ -82,7 +86,7 @@ public void completedWorkItem(String summary, Object... args) {
}

@Override
public void failedWorkItem(String error, Object... args) {
public void failedWorkItem(@Nonnull String error, Object... args) {
// as the name said we do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import org.hisp.dhis.system.notification.NotificationDataType;
import org.hisp.dhis.system.notification.NotificationLevel;
Expand Down Expand Up @@ -79,12 +81,13 @@ public void completedProcess(String summary, Object... args) {
}

@Override
public void failedProcess(String error, Object... args) {
public void failedProcess(@CheckForNull String error, Object... args) {
notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), true);
}

@Override
public void startingStage(String description, int workItems, FailurePolicy onFailure) {
public void startingStage(
@Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) {
stageItems = workItems;
stageItem = 0;
if (isNotEmpty(description)) {
Expand All @@ -100,14 +103,14 @@ public void completedStage(String summary, Object... args) {
}

@Override
public void failedStage(String error, Object... args) {
public void failedStage(@Nonnull String error, Object... args) {
if (isNotEmpty(error)) {
notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), false);
}
}

@Override
public void startingWorkItem(String description, FailurePolicy onFailure) {
public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) {
if (isNotEmpty(description)) {
String nOf = "[" + (stageItems > 0 ? stageItem + "/" + stageItems : "" + stageItem) + "] ";
notifier.notify(jobId, NotificationLevel.LOOP, nOf + description, false);
Expand All @@ -124,7 +127,7 @@ public void completedWorkItem(String summary, Object... args) {
}

@Override
public void failedWorkItem(String error, Object... args) {
public void failedWorkItem(@Nonnull String error, Object... args) {
if (isNotEmpty(error)) {
notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), false);
}
Expand Down

0 comments on commit 54bc995

Please sign in to comment.