Skip to content

Commit

Permalink
🔪 : Get rid of Kube Workspace Volume. (#5663)
Browse files Browse the repository at this point in the history
Get rid of the workspace volume on Kube. This used to contain logs and configs. Since we've moved things into the config db, this now only contains log. However on Kube, we log to external storage, which means we can get rid of this.

- Set env specific log4j2 context map keys and modify our log4j configuration to publish to cloud/local depending on those keys.
- In the process, I discovered a bug with how we were creating the Minio client - that meant Kube deployments with Minio were almost certainly all using the local workspace volume for logs instead of Minio. Fixed this.
  • Loading branch information
davinchia committed Sep 2, 2021
1 parent 9c05308 commit 8353eea
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 42 deletions.
1 change: 0 additions & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ jobs:
# - name: Get Docker Space
# run: docker run --rm busybox df -h

# make sure these always run before pushing platform docker images
- name: Run End-to-End Acceptance Tests
run: ./tools/bin/acceptance_test.sh

Expand Down
20 changes: 10 additions & 10 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@
2) Routes don't support routing log output to more than Route
-->
<Routing name="LogSplitCloud">
<Routes pattern="$${ctx:job_log_path}">
<Routes pattern="$${ctx:cloud_job_log_path}">
<!-- Don't split logs if job_root isn't defined -->
<Route key="$${ctx:job_log_path}">
<Route key="$${ctx:cloud_job_log_path}">
<Null name="/dev/null"/>
</Route>
<Route>
<Log4j2Appender name="${ctx:job_log_path}-cloud"
<Log4j2Appender name="${ctx:cloud_job_log_path}"
verbose="true"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:job_log_path}" s3Region="${s3-region}"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:cloud_job_log_path}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:job_log_path}">
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}">
<PatternLayout pattern="${cloud-worker-file-pattern}"/>
</Log4j2Appender>
</Route>
Expand Down Expand Up @@ -90,18 +90,18 @@
<IdlePurgePolicy timeToLive="15" timeUnit="minutes"/>
</Routing>
<Routing name="AppLogSplitCloud">
<Routes pattern="$${ctx:workspace_app_root}">
<Routes pattern="$${ctx:cloud_workspace_app_root}">
<!-- Don't split logs if workspace_app_log_root isn't defined -->
<Route key="$${ctx:workspace_app_root}">
<Route key="$${ctx:cloud_workspace_app_root}">
<Null name="/dev/null"/>
</Route>
<Route>
<Log4j2Appender name="app-logging/${ctx:workspace_app_root}-cloud/"
<Log4j2Appender name="app-logging/${ctx:cloud_workspace_app_root}/"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="app-logging${ctx:workspace_app_root}" s3Region="${s3-region}"
s3Bucket="${s3-bucket}" s3Path="app-logging${ctx:cloud_workspace_app_root}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:workspace_app_root}">
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:cloud_workspace_app_root}">
<PatternLayout pattern="${cloud-worker-file-pattern}"/>
</Log4j2Appender>
</Route>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ public interface CloudLogs {
* @return true if no cloud logging configuration is set;
*/
static boolean hasEmptyConfigs(LogConfigs configs) {
return !hasS3Configuration(configs) && !hasGcpConfiguration(configs);
return !hasMinioConfiguration(configs) && !hasS3Configuration(configs) && !hasGcpConfiguration(configs);
}

static CloudLogs createCloudLogClient(LogConfigs configs) {
// check if the configs exists, and pick a client.
if (hasMinioConfiguration(configs)) {
LOGGER.info("Creating Minio Log Client");
return new S3Logs();
}

if (hasS3Configuration(configs)) {
LOGGER.info("Creating AWS Log Client");
return new S3Logs();
Expand All @@ -77,6 +82,11 @@ static CloudLogs createCloudLogClient(LogConfigs configs) {
throw new RuntimeException("Error no cloud credentials configured..");
}

private static boolean hasMinioConfiguration(LogConfigs configs) {
return !configs.getS3LogBucket().isBlank() && !configs.getAwsAccessKey().isBlank()
&& !configs.getAwsSecretAccessKey().isBlank() && !configs.getS3MinioEndpoint().isBlank();
}

private static boolean hasS3Configuration(LogConfigs configs) {
return !configs.getAwsAccessKey().isBlank() &&
!configs.getAwsSecretAccessKey().isBlank() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -54,7 +55,10 @@ public class LogClientSingleton {

// Any changes to the following values must also be propagated to the log4j2.xml in main/resources.
public static String WORKSPACE_MDC_KEY = "workspace_app_root";
public static String CLOUD_WORKSPACE_MDC_KEY = "cloud_workspace_app_root";

public static String JOB_LOG_PATH_MDC_KEY = "job_log_path";
public static String CLOUD_JOB_LOG_PATH_MDC_KEY = "cloud_job_log_path";

// S3/Minio
public static String S3_LOG_BUCKET = "S3_LOG_BUCKET";
Expand Down Expand Up @@ -138,7 +142,23 @@ public static void deleteLogs(Configs configs, String logPath) {
}

public static void setJobMdc(Path path) {
MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
if (shouldUseLocalLogs(new EnvConfigs())) {
LOGGER.info("Setting docker job mdc");
MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
} else {
LOGGER.info("Setting kube job mdc");
MDC.put(LogClientSingleton.CLOUD_JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
}
}

public static void setWorkspaceMdc(Path path) {
if (shouldUseLocalLogs(new EnvConfigs())) {
LOGGER.info("Setting docker workspace mdc");
MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString());
} else {
LOGGER.info("Setting kube workspace mdc");
MDC.put(LogClientSingleton.CLOUD_WORKSPACE_MDC_KEY, path.toString());
}
}

private static boolean shouldUseLocalLogs(Configs configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,16 @@ public class S3Logs implements CloudLogs {
private static void assertValidS3Configuration(LogConfigs configs) {
Preconditions.checkNotNull(configs.getAwsAccessKey());
Preconditions.checkNotNull(configs.getAwsSecretAccessKey());
Preconditions.checkNotNull(configs.getS3LogBucketRegion());
Preconditions.checkNotNull(configs.getS3LogBucket());

// When region is set, endpoint cannot be set and vice versa.
if (configs.getS3LogBucketRegion().isBlank()) {
Preconditions.checkNotNull(configs.getS3MinioEndpoint(), "Either S3 region or endpoint needs to be configured.");
}

if (configs.getS3MinioEndpoint().isBlank()) {
Preconditions.checkNotNull(configs.getS3LogBucketRegion(), "Either S3 region or endpoint needs to be configured.");
}
}

@Override
Expand Down Expand Up @@ -151,14 +159,22 @@ public void deleteLogs(LogConfigs configs, String logPath) {
private static void createS3ClientIfNotExist(LogConfigs configs) {
if (S3 == null) {
assertValidS3Configuration(configs);

var builder = S3Client.builder();

// Pure S3 Client
var s3Region = configs.getS3LogBucketRegion();
var builder = S3Client.builder().region(Region.of(s3Region));
if (!s3Region.isBlank()) {
builder.region(Region.of(s3Region));
}

// The Minio S3 client.
var minioEndpoint = configs.getS3MinioEndpoint();
if (!minioEndpoint.isBlank()) {
try {
var minioUri = new URI(minioEndpoint);
builder.endpointOverride(minioUri);
builder.region(Region.US_EAST_1); // Although this is not used, the S3 client will error out if this is not set. Set a stub value.
} catch (URISyntaxException e) {
throw new RuntimeException("Error creating S3 log client to Minio", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,22 @@

public class CloudLogsTest {

@Test
public void createCloudLogClientTestMinio() {
var configs = Mockito.mock(LogConfigs.class);
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("minio-endpoint");
Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key");
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(configs.getS3LogBucket()).thenReturn("test-bucket");
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");

assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass());
}

@Test
public void createCloudLogClientTestAws() {
var configs = Mockito.mock(LogConfigs.class);
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("");
Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key");
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(configs.getS3LogBucket()).thenReturn("test-bucket");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public static void main(String[] args) throws IOException, InterruptedException

final Configs configs = new EnvConfigs();

MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getSchedulerLogsRoot(configs).toString());
LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getSchedulerLogsRoot(configs));

final Path workspaceRoot = configs.getWorkspaceRoot();
LOGGER.info("workspaceRoot = " + workspaceRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep
public static ServerRunnable getServer(ServerFactory apiFactory) throws Exception {
final Configs configs = new EnvConfigs();

MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getServerLogsRoot(configs).toString());
LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getServerLogsRoot(configs));

LOGGER.info("Creating config repository...");
final Database configDatabase = new ConfigsDatabaseInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,8 +754,6 @@ public void testCheckpointing() throws Exception {

@Test
@Order(14)
@DisabledIfEnvironmentVariable(named = "KUBE",
matches = "true")
public void testRedactionOfSensitiveRequestBodies() throws Exception {
// check that the source password is not present in the logs
final List<String> serverLogLines = java.nio.file.Files.readAllLines(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package io.airbyte.workers;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.workers.normalization.NormalizationRunner;
import java.nio.file.Files;
Expand Down Expand Up @@ -60,7 +62,13 @@ public Void run(NormalizationInput input, Path jobRoot) throws WorkerException {
try (normalizationRunner) {
LOGGER.info("Running normalization.");
normalizationRunner.start();
final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));

Path normalizationRoot = null;
// There are no shared volumes on Kube; only create this for Docker.
if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.DOCKER)) {
normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));
}

if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(),
input.getResourceRequirements())) {
throw new WorkerException("Normalization Failed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.io.IOs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.scheduler.models.JobRunConfig;
Expand Down Expand Up @@ -108,11 +109,16 @@ public OUTPUT get() {
mdcSetter.accept(jobRoot);

LOGGER.info("Executing worker wrapper. Airbyte version: {}", new EnvConfigs().getAirbyteVersionOrWarning());
jobRootDirCreator.accept(jobRoot);

final String workflowId = workflowIdProvider.get();
final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME);
IOs.writeFile(workflowIdFile, workflowId);
// There are no shared volumes on Kube; only do this for Docker.
if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.DOCKER)) {
LOGGER.debug("Creating local workspace directory..");
jobRootDirCreator.accept(jobRoot);

final String workflowId = workflowIdProvider.get();
final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME);
IOs.writeFile(workflowIdFile, workflowId);
}

final Worker<INPUT, OUTPUT> worker = workerSupplier.get();
final CompletableFuture<OUTPUT> outputFuture = new CompletableFuture<>();
Expand Down
1 change: 0 additions & 1 deletion kube/resources/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ resources:
- temporal.yaml
- volume-configs.yaml
- volume-db.yaml
- volume-workspace.yaml
- webapp.yaml
5 changes: 0 additions & 5 deletions kube/resources/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,10 @@ spec:
volumeMounts:
- name: airbyte-volume-configs
mountPath: /configs
- name: airbyte-volume-workspace
mountPath: /workspace
- name: gcs-log-creds-volume
mountPath: /secrets/gcs-log-creds
readOnly: true
volumes:
- name: airbyte-volume-workspace
persistentVolumeClaim:
claimName: airbyte-volume-workspace
- name: airbyte-volume-configs
persistentVolumeClaim:
claimName: airbyte-volume-configs
Expand Down
12 changes: 0 additions & 12 deletions kube/resources/volume-workspace.yaml

This file was deleted.

0 comments on commit 8353eea

Please sign in to comment.