diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index fb04c6d3fe596..492cc980456da 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -19,7 +19,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -141,23 +140,19 @@ public void run() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); // thread for spark strucutured streaming - Future streamFuture = executor.submit(new Callable() { - public Void call() throws Exception { - logger.info("===== Streaming Starting ====="); - stream(streamingInput); - logger.info("===== Streaming Ends ====="); - return null; - } + Future streamFuture = executor.submit(() -> { + logger.info("===== Streaming Starting ====="); + stream(streamingInput); + logger.info("===== Streaming Ends ====="); + return null; }); // thread for adding data to the streaming source and showing results over time - Future showFuture = executor.submit(new Callable() { - public Void call() throws Exception { - logger.info("===== Showing Starting ====="); - show(spark, fs, inputDF1, inputDF2); - logger.info("===== Showing Ends ====="); - return null; - } + Future showFuture = executor.submit(() -> { + logger.info("===== Showing Starting ====="); + show(spark, fs, inputDF1, inputDF2); + logger.info("===== Showing Ends ====="); + return null; }); // let the threads run @@ -187,7 +182,7 @@ public void show(SparkSession spark, FileSystem fs, Dataset inputDF1, Datas // wait for spark streaming to process one microbatch Thread.sleep(3000); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); - logger.info("Second commit at instant time :" + commitInstantTime1); + logger.info("Second commit at instant time :" + commitInstantTime2); /** * Read & do some queries