diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java index aa8cc079ecbd..a70d25d81abe 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java @@ -42,6 +42,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; @@ -263,8 +264,8 @@ public void testSQLLimit() throws Exception { return result.build(); }); - // wait one minute to allow subscription creation. - Thread.sleep(60 * 1000); + eventsTopic.checkIfAnySubscriptionExists( + pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(1)); eventsTopic.publish(messages); assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3)); pool.shutdown();