Skip to content

Commit

Permalink
[FLINK-28177][Connector/Elasticsearch][Tests] Use Testcontainers `wai…
Browse files Browse the repository at this point in the history
…tingFor ` method to check if the Elasticsearch containers are up. This closes #48

* [FLINK-28177][Connector/Elasticsearch][Tests] Use Testcontainers `waitingFor ` method to check if the Elasticsearch containers are up. This closes #48
  • Loading branch information
dingweiqings committed Dec 21, 2022
1 parent ce221bb commit 79e7c96
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,20 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
import static org.apache.flink.table.api.Expressions.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.containers.wait.strategy.Wait.forHttp;

/** IT tests for {@link Elasticsearch6DynamicSink}. */
public class Elasticsearch6DynamicSinkITCase extends TestLogger {

@ClassRule
public static ElasticsearchContainer elasticsearchContainer =
new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6));
new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6))
.waitingFor(
forHttp("/")
.withMethod("HEAD")
.forStatusCode(200)
.forPort(9200)
.withStartupTimeout(Duration.ofMinutes(2)));

@SuppressWarnings("deprecation")
protected final RestHighLevelClient getClient() {
Expand Down Expand Up @@ -230,11 +237,8 @@ public void testWritingDocumentsFromTableApi() throws Exception {

@Test
public void testWritingDocumentsNoPrimaryKey() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
settings.getConfiguration().setString("restart-strategy", "fixed-delay");
settings.getConfiguration().setInteger("restart-strategy.fixed-delay.attempts", 3);
// default fixed delay is 1 seconds
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
TableEnvironment tableEnvironment =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());

String index = "no-primary-key";
String myType = "MyType";
Expand Down

0 comments on commit 79e7c96

Please sign in to comment.