Skip to content

Commit

Permalink
šŸ› Kubernetes: Fix server starting up before Temporal ready to operate (ā€¦
Browse files Browse the repository at this point in the history
ā€¦#4567)

* refactor waitForTemporalServerAndLog into TemporalUtils

* remove use of docker-compose-wait
  • Loading branch information
Phlair committed Jul 7, 2021
1 parent af57170 commit 2ca4f94
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 138 deletions.
7 changes: 1 addition & 6 deletions airbyte-scheduler/app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@ RUN add-apt-repository \
stable"
RUN apt-get update && apt-get install -y docker-ce-cli jq

ENV WAIT_VERSION=2.7.2
ENV APPLICATION airbyte-scheduler

WORKDIR /app

# Install wait
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/${WAIT_VERSION}/wait wait
RUN chmod +x wait

# Install kubectl
RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.14/bin/linux/amd64/kubectl
RUN chmod +x ./kubectl
Expand All @@ -36,4 +31,4 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1

# wait for upstream dependencies to become available before starting server
ENTRYPOINT ["/bin/bash", "-c", "./wait && bin/${APPLICATION}"]
ENTRYPOINT ["/bin/bash", "-c", "bin/${APPLICATION}"]
7 changes: 1 addition & 6 deletions airbyte-server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ FROM openjdk:14.0.2-slim AS server

EXPOSE 8000

ENV WAIT_VERSION=2.7.2
ENV APPLICATION airbyte-server

WORKDIR /app

# Install wait
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/${WAIT_VERSION}/wait wait
RUN chmod +x wait

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN mkdir latest_seeds
Expand All @@ -19,4 +14,4 @@ COPY build/config_init/resources/main/config latest_seeds
RUN tar xf ${APPLICATION}.tar --strip-components=1

# wait for upstream dependencies to become available before starting server
ENTRYPOINT ["/bin/bash", "-c", "./wait && bin/${APPLICATION}"]
ENTRYPOINT ["/bin/bash", "-c", "bin/${APPLICATION}"]
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ services:
restart: unless-stopped
environment:
- WEBAPP_URL=${WEBAPP_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS_TIMEOUT=45
- WAIT_HOSTS=db:5432
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=jdbc:postgresql://db:5432/${DATABASE_DB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ services:
restart: unless-stopped
environment:
- WEBAPP_URL=${WEBAPP_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS_TIMEOUT=45
- WAIT_HOSTS=db:5432
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=jdbc:postgresql://db:5432/${DATABASE_DB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,15 @@

package io.airbyte.workers.temporal;

import static java.util.stream.Collectors.toSet;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.SyncWorkflow.DbtTransformationActivityImpl;
import io.airbyte.workers.temporal.SyncWorkflow.NormalizationActivityImpl;
import io.airbyte.workers.temporal.SyncWorkflow.ReplicationActivityImpl;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,7 +52,6 @@ public TemporalPool(WorkflowServiceStubs temporalService, Path workspaceRoot, Pr

@Override
public void run() {
waitForTemporalServerAndLog();

final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService));

Expand All @@ -85,37 +77,4 @@ public void run() {
factory.start();
}

protected void waitForTemporalServerAndLog() {
LOGGER.info("Waiting for temporal server...");

boolean temporalStatus = false;

while (!temporalStatus) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
Exceptions.toRuntime(() -> Thread.sleep(2000));

try {
temporalStatus = getNamespaces(temporalService).contains("default");
} catch (Exception e) {
// Ignore the exception because this likely means that the Temporal service is still initializing.
LOGGER.warn("Ignoring exception while trying to request Temporal namespaces:", e);
}
}

// sometimes it takes a few additional seconds for workflow queue listening to be available
Exceptions.toRuntime(() -> Thread.sleep(5000));

LOGGER.info("Found temporal default namespace!");
}

protected static Set<String> getNamespaces(WorkflowServiceStubs temporalService) {
return temporalService.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()
.map(DescribeNamespaceResponse::getNamespaceInfo)
.map(NamespaceInfo::getName)
.collect(toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@

package io.airbyte.workers.temporal;

import static java.util.stream.Collectors.toSet;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
Expand All @@ -34,19 +40,26 @@
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.workflow.Functions;
import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TemporalUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalUtils.class);

public static WorkflowServiceStubs createTemporalService(String temporalHost) {
final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
// todo move to env.
.setTarget(temporalHost)
.build();

return WorkflowServiceStubs.newInstance(options);
final WorkflowServiceStubs temporalService = WorkflowServiceStubs.newInstance(options);
waitForTemporalServerAndLog(temporalService);
return temporalService;
}

public static WorkflowClient createTemporalClient(String temporalHost) {
Expand Down Expand Up @@ -115,4 +128,37 @@ public static <STUB, A1, R> ImmutablePair<WorkflowExecution, CompletableFuture<R
return ImmutablePair.of(workflowExecution, resultAsync);
}

public static void waitForTemporalServerAndLog(WorkflowServiceStubs temporalService) {
LOGGER.info("Waiting for temporal server...");

boolean temporalStatus = false;

while (!temporalStatus) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
Exceptions.toRuntime(() -> Thread.sleep(2000));

try {
temporalStatus = getNamespaces(temporalService).contains("default");
} catch (Exception e) {
// Ignore the exception because this likely means that the Temporal service is still initializing.
LOGGER.warn("Ignoring exception while trying to request Temporal namespaces:", e);
}
}

// sometimes it takes a few additional seconds for workflow queue listening to be available
Exceptions.toRuntime(() -> Thread.sleep(5000));

LOGGER.info("Found temporal default namespace!");
}

protected static Set<String> getNamespaces(WorkflowServiceStubs temporalService) {
return temporalService.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()
.map(DescribeNamespaceResponse::getNamespaceInfo)
.map(NamespaceInfo::getName)
.collect(toSet());
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@

package io.airbyte.workers.temporal;

import static io.airbyte.workers.temporal.TemporalUtils.waitForTemporalServerAndLog;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.commons.concurrency.VoidCallable;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand All @@ -44,11 +49,13 @@
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,6 +105,20 @@ void testAsyncExecute() throws Exception {
assertEquals("completed", result);
}

@Test
public void testWaitForTemporalServerAndLogThrowsException() {
WorkflowServiceStubs workflowServiceStubs = mock(WorkflowServiceStubs.class, Mockito.RETURNS_DEEP_STUBS);
DescribeNamespaceResponse describeNamespaceResponse = mock(DescribeNamespaceResponse.class);
NamespaceInfo namespaceInfo = mock(NamespaceInfo.class);

when(namespaceInfo.getName()).thenReturn("default");
when(describeNamespaceResponse.getNamespaceInfo()).thenReturn(namespaceInfo);
when(workflowServiceStubs.blockingStub().listNamespaces(any()).getNamespacesList())
.thenThrow(RuntimeException.class)
.thenReturn(List.of(describeNamespaceResponse));
waitForTemporalServerAndLog(workflowServiceStubs);
}

@WorkflowInterface
public interface TestWorkflow {

Expand Down
6 changes: 0 additions & 6 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ services:
restart: unless-stopped
environment:
- WEBAPP_URL=${WEBAPP_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS_TIMEOUT=45
- WAIT_HOSTS=${DATABASE_HOST}:${DATABASE_PORT}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
Expand Down Expand Up @@ -78,9 +75,6 @@ services:
restart: unless-stopped
environment:
- WEBAPP_URL=${WEBAPP_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS_TIMEOUT=45
- WAIT_HOSTS=${DATABASE_HOST}:${DATABASE_PORT}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_URL=${DATABASE_URL}
Expand Down
7 changes: 0 additions & 7 deletions kube/resources/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ spec:
configMapKeyRef:
name: airbyte-env
key: TRACKING_STRATEGY
- name: WAIT_BEFORE_HOSTS
value: "5"
- name: WAIT_HOSTS
#Translate value: $(DATABASE_HOST):$(DATABASE_PORT)
value: airbyte-db-svc:5432
- name: WAIT_HOSTS_TIMEOUT
value: "45"
- name: WORKSPACE_DOCKER_MOUNT
value: workspace
- name: WORKSPACE_ROOT
Expand Down
6 changes: 0 additions & 6 deletions kube/resources/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ spec:
configMapKeyRef:
name: airbyte-env
key: TRACKING_STRATEGY
- name: WAIT_BEFORE_HOSTS
value: "5"
- name: WAIT_HOSTS
value: airbyte-db-svc:5432
- name: WAIT_HOSTS_TIMEOUT
value: "45"
- name: WORKER_ENVIRONMENT
valueFrom:
configMapKeyRef:
Expand Down

0 comments on commit 2ca4f94

Please sign in to comment.