From c608bd9046f484b321758e5deed6363785e2aad0 Mon Sep 17 00:00:00 2001 From: fbalicchia Date: Sun, 24 Nov 2019 22:12:06 +0100 Subject: [PATCH 1/2] print the right commit instanttime2 --- .../src/test/java/HoodieJavaStreamingApp.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index fb04c6d3fe596..6ce93b63b83be 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -141,23 +141,19 @@ public void run() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); // thread for spark strucutured streaming - Future streamFuture = executor.submit(new Callable() { - public Void call() throws Exception { - logger.info("===== Streaming Starting ====="); - stream(streamingInput); - logger.info("===== Streaming Ends ====="); - return null; - } + Future streamFuture = executor.submit(() -> { + logger.info("===== Streaming Starting ====="); + stream(streamingInput); + logger.info("===== Streaming Ends ====="); + return null; }); // thread for adding data to the streaming source and showing results over time - Future showFuture = executor.submit(new Callable() { - public Void call() throws Exception { - logger.info("===== Showing Starting ====="); - show(spark, fs, inputDF1, inputDF2); - logger.info("===== Showing Ends ====="); - return null; - } + Future showFuture = executor.submit(() -> { + logger.info("===== Showing Starting ====="); + show(spark, fs, inputDF1, inputDF2); + logger.info("===== Showing Ends ====="); + return null; }); // let the threads run @@ -187,7 +183,7 @@ public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Datas // wait for spark streaming to process one microbatch Thread.sleep(3000); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); - logger.info("Second commit at instant time :" + commitInstantTime1); + logger.info("Second commit at instant time :" + commitInstantTime2); /** * Read & do some queries From 65b45bac155f23dc60a591ded04f466cb34c78d7 Mon Sep 17 00:00:00 2001 From: fbalicchia Date: Mon, 25 Nov 2019 09:22:51 +0100 Subject: [PATCH 2/2] remove unused Callable import --- hudi-spark/src/test/java/HoodieJavaStreamingApp.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 6ce93b63b83be..492cc980456da 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -19,7 +19,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;