Skip to content

Commit

Permalink
Support registering preJob and postJob hooks (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
yifan-c committed Mar 24, 2020
1 parent b933540 commit 8090d55
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
Expand Up @@ -22,6 +22,8 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -62,6 +64,19 @@ public static void main(String ... args) {
spark.stop();
}

// optional code block to run before a job starts
private Runnable preJobHook;
// optional code block to run after a job completes successfully; otherwise, it is not executed.
private Consumer<Map<String, RangeStats>> postJobHook;

public void addPreJobHook(Runnable preJobHook) {
this.preJobHook = preJobHook;
}

public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
this.postJobHook = postJobHook;
}

public void run(JobConfiguration configuration, JavaSparkContext sc) {
SparkConf conf = sc.getConf();
// get partitioner from both clusters and verify that they match
Expand Down Expand Up @@ -124,6 +139,9 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
sourceProvider,
targetProvider);

if (null != preJobHook)
preJobHook.run();

// Run the distributed diff and collate results
Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
.map((split) -> new Differ(configuration,
Expand All @@ -140,6 +158,8 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
// Publish results. This also removes the job from the currently running list
job.finalizeJob(params.jobId, diffStats);
logger.info("FINISHED: {}", diffStats);
if (null != postJobHook)
postJobHook.accept(diffStats);
} catch (Exception e) {
// If the job errors out, try and mark the job as not running, so it can be restarted.
// If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
Expand Down

0 comments on commit 8090d55

Please sign in to comment.