Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions hudi-spark/src/test/java/HoodieJavaStreamingApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -141,23 +140,19 @@ public void run() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);

// thread for spark strucutured streaming
Future<Void> streamFuture = executor.submit(new Callable<Void>() {
public Void call() throws Exception {
logger.info("===== Streaming Starting =====");
stream(streamingInput);
logger.info("===== Streaming Ends =====");
return null;
}
Future<Void> streamFuture = executor.submit(() -> {
logger.info("===== Streaming Starting =====");
stream(streamingInput);
logger.info("===== Streaming Ends =====");
return null;
});

// thread for adding data to the streaming source and showing results over time
Future<Void> showFuture = executor.submit(new Callable<Void>() {
public Void call() throws Exception {
logger.info("===== Showing Starting =====");
show(spark, fs, inputDF1, inputDF2);
logger.info("===== Showing Ends =====");
return null;
}
Future<Void> showFuture = executor.submit(() -> {
logger.info("===== Showing Starting =====");
show(spark, fs, inputDF1, inputDF2);
logger.info("===== Showing Ends =====");
return null;
});

// let the threads run
Expand Down Expand Up @@ -187,7 +182,7 @@ public void show(SparkSession spark, FileSystem fs, Dataset<Row> inputDF1, Datas
// wait for spark streaming to process one microbatch
Thread.sleep(3000);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
logger.info("Second commit at instant time :" + commitInstantTime1);
logger.info("Second commit at instant time :" + commitInstantTime2);

/**
* Read & do some queries
Expand Down