11package marketplace ;
22
33import io .confluent .flink .plugin .ConfluentSettings ;
4+ import org .apache .flink .core .execution .JobClient ;
45import org .apache .flink .table .api .TableEnvironment ;
6+ import org .apache .flink .table .api .TableResult ;
57import org .apache .flink .util .CloseableIterator ;
8+ import org .junit .jupiter .api .AfterEach ;
69import org .junit .jupiter .api .BeforeEach ;
710
11+ import java .util .ArrayList ;
12+ import java .util .List ;
813import java .util .stream .Stream ;
914import java .util .stream .StreamSupport ;
1015
1116public abstract class FlinkTableAPITest {
1217 protected static TableEnvironment env ;
13- public abstract void setup ();
18+ private List <TableResult > cleanupList ;
19+ public void setup () {};
20+ public void teardown () {};
1421
1522 @ BeforeEach
1623 public void mainSetup () {
24+ cleanupList = new ArrayList <>();
1725 if (env == null ) {
1826 ConfluentSettings .Builder settings = ConfluentSettings .newBuilder ("/cloud.properties" );
1927 env = TableEnvironment .create (settings .build ());
@@ -22,8 +30,23 @@ public void mainSetup() {
2230 setup ();
2331 }
2432
33+ @ AfterEach
34+ public void mainTeardown () {
35+ cleanupList .forEach (tableResult -> {
36+ JobClient client = tableResult .getJobClient ().orElseThrow ();
37+ client .cancel ().thenRun (() -> System .out .println ("Job Canceled During Cleanup" ));
38+ });
39+
40+ teardown ();
41+ }
42+
2543 protected <T > Stream <T > toStream (CloseableIterator <T > iterator ) {
2644 Iterable <T > iterable = () -> iterator ;
2745 return StreamSupport .stream (iterable .spliterator (), false );
2846 }
47+
48+ protected TableResult cleanupOnExit (TableResult tableResult ) {
49+ cleanupList .add (tableResult );
50+ return tableResult ;
51+ }
2952}
0 commit comments