Skip to content

Commit

Permalink
remove warnings when temporal starts (#2317)
Browse files Browse the repository at this point in the history
* remove warnings when temporal starts

* decrease busy waiting loop from 5s -> 2s
  • Loading branch information
jrhizor authored Mar 5, 2021
1 parent 0442fe6 commit 4d95c5e
Showing 1 changed file with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@

package io.airbyte.scheduler.temporal;

import static java.util.stream.Collectors.toSet;

import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TemporalPool implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class);

private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;

Expand All @@ -42,6 +52,8 @@ public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {

@Override
public void run() {
waitForTemporalServerAndLog();

final WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT);

final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name());
Expand All @@ -67,4 +79,36 @@ public void run() {
factory.start();
}

private static void waitForTemporalServerAndLog() {
LOGGER.info("Waiting for temporal server...");

while (!getNamespaces().contains("default")) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
wait(2);
}

// sometimes it takes a few additional seconds for workflow queue listening to be available
wait(5);

LOGGER.info("Found temporal default namespace!");
}

private static void wait(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static Set<String> getNamespaces() {
return TemporalUtils.TEMPORAL_SERVICE.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()
.map(DescribeNamespaceResponse::getNamespaceInfo)
.map(NamespaceInfo::getName)
.collect(toSet());
}

}

0 comments on commit 4d95c5e

Please sign in to comment.