diff --git a/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobProgress.java b/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobProgress.java index a953046eb56d..dfdb6f694075 100644 --- a/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobProgress.java +++ b/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/JobProgress.java @@ -83,16 +83,14 @@ * form of a loop. * * - * For each of the three levels a new node is announced up front by calling the corresponding {@link - * #startingProcess(String)}, {@link #startingStage(String)} or {@link #startingWorkItem(String)} - * method. + * For each of the three levels a new node is announced up front by calling the corresponding {@code + * startingProcess}, {@code startingStage} or {@code startingWorkItem} method. * - *

The process will now expect a corresponding completion, for example {@link - * #completedWorkItem(String)} in case of success or {@link #failedWorkItem(String)} in case of an - * error. The different {@link #runStage(Stream, Function, Consumer)} or {@link - * #runStageInParallel(int, Collection, Function, Consumer)} helpers can be used to do the error - * handling correctly and make sure the work items are completed in both success and failure - * scenarios. + *

The process will now expect a corresponding completion, for example {@code completedWorkItem} + * in case of success or {@code failedWorkItem} in case of an error. The different {@link + * #runStage(Stream, Function, Consumer)} or {@link #runStageInParallel(int, Collection, Function, + * Consumer)} helpers can be used to do the error handling correctly and make sure the work items + * are completed in both success and failure scenarios. * *

For stages that do not have work items {@link #runStage(Runnable)} and {@link * #runStage(Object, Callable)} can be used to make sure completion is handled correctly. @@ -115,7 +113,7 @@ * cooperatively by not starting any further work. * *

When a stage is cancelled the run-methods usually return false to give caller a chance to - * react if needed. The next call to {@link #startingStage(String)} will then throw a {@link + * react if needed. The next call to {@code startingStage} will then throw a {@link * CancellationException} and thereby short-circuit the rest of the process. * * @author Jan Bernitt @@ -206,17 +204,17 @@ default void addError( * Tracking API: */ - void startingProcess(String description, Object... args); + void startingProcess(@CheckForNull String description, Object... args); default void startingProcess() { startingProcess(null); } - void completedProcess(String summary, Object... args); + void completedProcess(@CheckForNull String summary, Object... args); - void failedProcess(String error, Object... args); + void failedProcess(@CheckForNull String error, Object... args); - default void failedProcess(Exception cause) { + default void failedProcess(@Nonnull Exception cause) { failedProcess("Process failed: {}", getMessage(cause)); } @@ -237,44 +235,44 @@ default void endingProcess(boolean success) { * @throws CancellationException in case cancellation has been requested before this stage had * started */ - void startingStage(String description, int workItems, FailurePolicy onFailure) + void startingStage(@Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) throws CancellationException; - default void startingStage(String description, int workItems) { + default void startingStage(@Nonnull String description, int workItems) { startingStage(description, workItems, FailurePolicy.PARENT); } - default void startingStage(String description, FailurePolicy onFailure) { + default void startingStage(@Nonnull String description, @Nonnull FailurePolicy onFailure) { startingStage(description, 0, onFailure); } - default void startingStage(String description, Object... args) { + default void startingStage(@Nonnull String description, Object... args) { startingStage(format(description, args), FailurePolicy.PARENT); } - void completedStage(String summary, Object... args); + void completedStage(@CheckForNull String summary, Object... args); - void failedStage(String error, Object... args); + void failedStage(@Nonnull String error, Object... args); - default void failedStage(Exception cause) { + default void failedStage(@Nonnull Exception cause) { failedStage(getMessage(cause)); } - default void startingWorkItem(String description, Object... args) { + default void startingWorkItem(@Nonnull String description, Object... args) { startingWorkItem(format(description, args), FailurePolicy.PARENT); } - void startingWorkItem(String description, FailurePolicy onFailure); + void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure); default void startingWorkItem(int i) { startingWorkItem("#" + (i + 1)); } - void completedWorkItem(String summary, Object... args); + void completedWorkItem(@CheckForNull String summary, Object... args); - void failedWorkItem(String error, Object... args); + void failedWorkItem(@Nonnull String error, Object... args); - default void failedWorkItem(Exception cause) { + default void failedWorkItem(@Nonnull Exception cause) { failedWorkItem(getMessage(cause)); } @@ -288,7 +286,7 @@ default void failedWorkItem(Exception cause) { * @param items the work items to run in the sequence to run them * @see #runStage(Collection, Function, Consumer) */ - default void runStage(Collection items) { + default void runStage(@Nonnull Collection items) { runStage(items, item -> null, Runnable::run); } @@ -299,7 +297,7 @@ default void runStage(Collection items) { * description. Items are processed in map iteration order. * @see #runStage(Collection, Function, Consumer) */ - default void runStage(Map items) { + default void runStage(@Nonnull Map items) { runStage(items.entrySet(), Entry::getKey, entry -> entry.getValue().run()); } @@ -314,7 +312,9 @@ default void runStage(Map items) { * @see #runStage(Collection, Function, Consumer) */ default void runStage( - Collection items, Function description, Consumer work) { + @Nonnull Collection items, + @Nonnull Function description, + @Nonnull Consumer work) { runStage(items.stream(), description, work); } @@ -324,7 +324,10 @@ default void runStage( * * @see #runStage(Stream, Function, Consumer,BiFunction) */ - default void runStage(Stream items, Function description, Consumer work) { + default void runStage( + @Nonnull Stream items, + @Nonnull Function description, + @Nonnull Consumer work) { runStage( items, description, @@ -346,10 +349,10 @@ default void runStage(Stream items, Function description, Cons * @param type of work item input */ default void runStage( - Stream items, - Function description, - Consumer work, - BiFunction summary) { + @Nonnull Stream items, + @Nonnull Function description, + @Nonnull Consumer work, + @CheckForNull BiFunction summary) { runStage( items, description, @@ -377,11 +380,11 @@ default void runStage( * @param type of work item input */ default void runStage( - Stream items, - Function description, - Function result, - Function work, - BiFunction summary) { + @Nonnull Stream items, + @Nonnull Function description, + @CheckForNull Function result, + @Nonnull Function work, + @CheckForNull BiFunction summary) { int i = 0; int failed = 0; for (Iterator it = items.iterator(); it.hasNext(); ) { @@ -423,7 +426,7 @@ default void runStage( * @return true, if stage is/was skipped (complected as failed), false otherwise */ default boolean autoSkipStage( - BiFunction summary, int success, int failed) { + @CheckForNull BiFunction summary, int success, int failed) { if (isSkipCurrentStage()) { String text = summary == null ? "" : summary.apply(success, failed); if (isCancelled()) { @@ -439,7 +442,7 @@ default boolean autoSkipStage( /** * @see #runStage(Function, Runnable) */ - default boolean runStage(Runnable work) { + default boolean runStage(@Nonnull Runnable work) { return runStage(null, work); } @@ -454,7 +457,8 @@ default boolean runStage(Runnable work) { * @param work work for the entire stage * @return true, if completed successful, false if completed exceptionally */ - default boolean runStage(Function summary, Runnable work) { + default boolean runStage( + @CheckForNull Function summary, @Nonnull Runnable work) { return runStage( false, summary, @@ -464,14 +468,15 @@ default boolean runStage(Function summary, Runnable work) { }); } - default T runStage(Callable work) { + @CheckForNull + default T runStage(@Nonnull Callable work) { return runStage(null, work); } /** * @see #runStage(Object, Function, Callable) */ - default T runStage(T errorValue, Callable work) { + default T runStage(@CheckForNull T errorValue, @Nonnull Callable work) { return runStage(errorValue, null, work); } @@ -488,7 +493,10 @@ default T runStage(T errorValue, Callable work) { * @return the value returned by work task when successful or the errorValue in case the task * threw an {@link Exception} */ - default T runStage(T errorValue, Function summary, Callable work) { + default T runStage( + @CheckForNull T errorValue, + @CheckForNull Function summary, + @Nonnull Callable work) { try { T res = work.call(); completedStage(summary == null ? null : summary.apply(res)); @@ -518,7 +526,10 @@ default T runStage(T errorValue, Function summary, Callable wo * @param type of work item input */ default void runStageInParallel( - int parallelism, Collection items, Function description, Consumer work) { + int parallelism, + @Nonnull Collection items, + @Nonnull Function description, + @Nonnull Consumer work) { if (parallelism <= 1) { runStage(items, description, work); return; @@ -580,7 +591,7 @@ default void runStageInParallel( * @param args the pattern arguments. * @return a formatted message string. */ - default String format(String pattern, Object... args) { + default String format(@CheckForNull String pattern, Object... args) { return pattern != null ? MessageFormatter.arrayFormat(pattern, args).getMessage() : null; } @@ -602,8 +613,7 @@ enum Status { * *

The implementation of {@link FailurePolicy} is done by affecting {@link * #isSkipCurrentStage()} and {@link #isCancelled()} accordingly after the failure occurred and - * has been tracked using one of the {@link #failedStage(String)} or {@link - * #failedWorkItem(String)} methods. + * has been tracked using one of the {@code failedStage} or {@code failedWorkItem} methods. */ enum FailurePolicy { /** @@ -880,7 +890,8 @@ public Item( } } - static String getMessage(Exception cause) { + @Nonnull + static String getMessage(@Nonnull Exception cause) { String msg = cause.getMessage(); return msg == null || msg.isBlank() ? cause.getClass().getName() : msg; } diff --git a/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/NoopJobProgress.java b/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/NoopJobProgress.java index 5a04442993d1..f57f9a9dc979 100644 --- a/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/NoopJobProgress.java +++ b/dhis-2/dhis-api/src/main/java/org/hisp/dhis/scheduling/NoopJobProgress.java @@ -27,6 +27,9 @@ */ package org.hisp.dhis.scheduling; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + /** * The {@link NoopJobProgress} can be used as a {@link JobProgress} instance when no actual flow * control and tracking is wanted. For example in test context or in manual runs of operations that @@ -52,12 +55,13 @@ public void completedProcess(String summary, Object... args) { } @Override - public void failedProcess(String error, Object... args) { + public void failedProcess(@CheckForNull String error, Object... args) { // as the name said we do nothing } @Override - public void startingStage(String description, int workItems, FailurePolicy onFailure) { + public void startingStage( + @Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) { // as the name said we do nothing } @@ -67,12 +71,12 @@ public void completedStage(String summary, Object... args) { } @Override - public void failedStage(String error, Object... args) { + public void failedStage(@Nonnull String error, Object... args) { // as the name said we do nothing } @Override - public void startingWorkItem(String description, FailurePolicy onFailure) { + public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) { // as the name said we do nothing } @@ -82,7 +86,7 @@ public void completedWorkItem(String summary, Object... args) { } @Override - public void failedWorkItem(String error, Object... args) { + public void failedWorkItem(@Nonnull String error, Object... args) { // as the name said we do nothing } } diff --git a/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/NotifierJobProgress.java b/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/NotifierJobProgress.java index d31f5e720c0a..8b4aa3bfe85e 100644 --- a/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/NotifierJobProgress.java +++ b/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/NotifierJobProgress.java @@ -32,6 +32,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; import lombok.RequiredArgsConstructor; import org.hisp.dhis.system.notification.NotificationDataType; import org.hisp.dhis.system.notification.NotificationLevel; @@ -79,12 +81,13 @@ public void completedProcess(String summary, Object... args) { } @Override - public void failedProcess(String error, Object... args) { + public void failedProcess(@CheckForNull String error, Object... args) { notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), true); } @Override - public void startingStage(String description, int workItems, FailurePolicy onFailure) { + public void startingStage( + @Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) { stageItems = workItems; stageItem = 0; if (isNotEmpty(description)) { @@ -100,14 +103,14 @@ public void completedStage(String summary, Object... args) { } @Override - public void failedStage(String error, Object... args) { + public void failedStage(@Nonnull String error, Object... args) { if (isNotEmpty(error)) { notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), false); } } @Override - public void startingWorkItem(String description, FailurePolicy onFailure) { + public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) { if (isNotEmpty(description)) { String nOf = "[" + (stageItems > 0 ? stageItem + "/" + stageItems : "" + stageItem) + "] "; notifier.notify(jobId, NotificationLevel.LOOP, nOf + description, false); @@ -124,7 +127,7 @@ public void completedWorkItem(String summary, Object... args) { } @Override - public void failedWorkItem(String error, Object... args) { + public void failedWorkItem(@Nonnull String error, Object... args) { if (isNotEmpty(error)) { notifier.notify(jobId, NotificationLevel.ERROR, format(error, args), false); } diff --git a/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/RecordingJobProgress.java b/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/RecordingJobProgress.java index 2dba2ef60380..a4b501dad374 100644 --- a/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/RecordingJobProgress.java +++ b/dhis-2/dhis-services/dhis-service-core/src/main/java/org/hisp/dhis/scheduling/RecordingJobProgress.java @@ -231,7 +231,7 @@ public void completedProcess(String summary, Object... args) { } @Override - public void failedProcess(String error, Object... args) { + public void failedProcess(@CheckForNull String error, Object... args) { observer.run(); String message = format(error, args); @@ -250,7 +250,7 @@ public void failedProcess(String error, Object... args) { } @Override - public void failedProcess(Exception cause) { + public void failedProcess(@Nonnull Exception cause) { observer.run(); tracker.failedProcess(cause); @@ -271,7 +271,8 @@ public void failedProcess(Exception cause) { } @Override - public void startingStage(String description, int workItems, FailurePolicy onFailure) { + public void startingStage( + @Nonnull String description, int workItems, @Nonnull FailurePolicy onFailure) { observer.run(); if (isCancelled()) { @@ -296,7 +297,7 @@ public void completedStage(String summary, Object... args) { } @Override - public void failedStage(String error, Object... args) { + public void failedStage(@Nonnull String error, Object... args) { observer.run(); String message = format(error, args); @@ -310,7 +311,7 @@ public void failedStage(String error, Object... args) { } @Override - public void failedStage(Exception cause) { + public void failedStage(@Nonnull Exception cause) { observer.run(); cause = cancellationAsAbort(cause); @@ -326,7 +327,7 @@ public void failedStage(Exception cause) { } @Override - public void startingWorkItem(String description, FailurePolicy onFailure) { + public void startingWorkItem(@Nonnull String description, @Nonnull FailurePolicy onFailure) { observer.run(); tracker.startingWorkItem(description, onFailure); @@ -346,7 +347,7 @@ public void completedWorkItem(String summary, Object... args) { } @Override - public void failedWorkItem(String error, Object... args) { + public void failedWorkItem(@Nonnull String error, Object... args) { observer.run(); String message = format(error, args); @@ -360,7 +361,7 @@ public void failedWorkItem(String error, Object... args) { } @Override - public void failedWorkItem(Exception cause) { + public void failedWorkItem(@Nonnull Exception cause) { observer.run(); tracker.failedWorkItem(cause);