diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index cb8440ce1afaa9..b3babb14afab92 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -252,7 +252,6 @@ jobs: # - name: Get Docker Space # run: docker run --rm busybox df -h - # make sure these always run before pushing platform docker images - name: Run End-to-End Acceptance Tests run: ./tools/bin/acceptance_test.sh diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index 6e1dd1a1a29785..b7e7eafe2433f0 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -47,19 +47,19 @@ 2) Routes don't support routing log output to more than Route --> - + - + - + gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}"> @@ -90,18 +90,18 @@ - + - + - + gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:cloud_workspace_app_root}"> diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/CloudLogs.java b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/CloudLogs.java index f4958f92ea286e..5a3603153cf955 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/CloudLogs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/CloudLogs.java @@ -59,11 +59,16 @@ public interface CloudLogs { * @return true if no cloud logging configuration is set; */ static boolean hasEmptyConfigs(LogConfigs configs) { - return !hasS3Configuration(configs) && !hasGcpConfiguration(configs); + return !hasMinioConfiguration(configs) && !hasS3Configuration(configs) && !hasGcpConfiguration(configs); } static CloudLogs createCloudLogClient(LogConfigs configs) { // check if the configs exists, and pick a client. + if (hasMinioConfiguration(configs)) { + LOGGER.info("Creating Minio Log Client"); + return new S3Logs(); + } + if (hasS3Configuration(configs)) { LOGGER.info("Creating AWS Log Client"); return new S3Logs(); @@ -77,6 +82,11 @@ static CloudLogs createCloudLogClient(LogConfigs configs) { throw new RuntimeException("Error no cloud credentials configured.."); } + private static boolean hasMinioConfiguration(LogConfigs configs) { + return !configs.getS3LogBucket().isBlank() && !configs.getAwsAccessKey().isBlank() + && !configs.getAwsSecretAccessKey().isBlank() && !configs.getS3MinioEndpoint().isBlank(); + } + private static boolean hasS3Configuration(LogConfigs configs) { return !configs.getAwsAccessKey().isBlank() && !configs.getAwsSecretAccessKey().isBlank() && diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java index bc1599d916971e..be54877df661c1 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java @@ -28,6 +28,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.config.Configs; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.EnvConfigs; import java.io.File; import java.io.IOException; import java.nio.file.Path; @@ -54,7 +55,10 @@ public class LogClientSingleton { // Any changes to the following values must also be propagated to the log4j2.xml in main/resources. public static String WORKSPACE_MDC_KEY = "workspace_app_root"; + public static String CLOUD_WORKSPACE_MDC_KEY = "cloud_workspace_app_root"; + public static String JOB_LOG_PATH_MDC_KEY = "job_log_path"; + public static String CLOUD_JOB_LOG_PATH_MDC_KEY = "cloud_job_log_path"; // S3/Minio public static String S3_LOG_BUCKET = "S3_LOG_BUCKET"; @@ -138,7 +142,23 @@ public static void deleteLogs(Configs configs, String logPath) { } public static void setJobMdc(Path path) { - MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString()); + if (shouldUseLocalLogs(new EnvConfigs())) { + LOGGER.info("Setting docker job mdc"); + MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString()); + } else { + LOGGER.info("Setting kube job mdc"); + MDC.put(LogClientSingleton.CLOUD_JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString()); + } + } + + public static void setWorkspaceMdc(Path path) { + if (shouldUseLocalLogs(new EnvConfigs())) { + LOGGER.info("Setting docker workspace mdc"); + MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString()); + } else { + LOGGER.info("Setting kube workspace mdc"); + MDC.put(LogClientSingleton.CLOUD_WORKSPACE_MDC_KEY, path.toString()); + } } private static boolean shouldUseLocalLogs(Configs configs) { diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/S3Logs.java b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/S3Logs.java index c9f36d6808a291..f21d96796cca7b 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/S3Logs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/S3Logs.java @@ -58,8 +58,16 @@ public class S3Logs implements CloudLogs { private static void assertValidS3Configuration(LogConfigs configs) { Preconditions.checkNotNull(configs.getAwsAccessKey()); Preconditions.checkNotNull(configs.getAwsSecretAccessKey()); - Preconditions.checkNotNull(configs.getS3LogBucketRegion()); Preconditions.checkNotNull(configs.getS3LogBucket()); + + // When region is set, endpoint cannot be set and vice versa. + if (configs.getS3LogBucketRegion().isBlank()) { + Preconditions.checkNotNull(configs.getS3MinioEndpoint(), "Either S3 region or endpoint needs to be configured."); + } + + if (configs.getS3MinioEndpoint().isBlank()) { + Preconditions.checkNotNull(configs.getS3LogBucketRegion(), "Either S3 region or endpoint needs to be configured."); + } } @Override @@ -151,14 +159,22 @@ public void deleteLogs(LogConfigs configs, String logPath) { private static void createS3ClientIfNotExist(LogConfigs configs) { if (S3 == null) { assertValidS3Configuration(configs); + + var builder = S3Client.builder(); + + // Pure S3 Client var s3Region = configs.getS3LogBucketRegion(); - var builder = S3Client.builder().region(Region.of(s3Region)); + if (!s3Region.isBlank()) { + builder.region(Region.of(s3Region)); + } + // The Minio S3 client. var minioEndpoint = configs.getS3MinioEndpoint(); if (!minioEndpoint.isBlank()) { try { var minioUri = new URI(minioEndpoint); builder.endpointOverride(minioUri); + builder.region(Region.US_EAST_1); // Although this is not used, the S3 client will error out if this is not set. Set a stub value. } catch (URISyntaxException e) { throw new RuntimeException("Error creating S3 log client to Minio", e); } diff --git a/airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsTest.java b/airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsTest.java index 9e0280a4923274..2e9a077ebb8fb8 100644 --- a/airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsTest.java +++ b/airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsTest.java @@ -31,9 +31,22 @@ public class CloudLogsTest { + @Test + public void createCloudLogClientTestMinio() { + var configs = Mockito.mock(LogConfigs.class); + Mockito.when(configs.getS3MinioEndpoint()).thenReturn("minio-endpoint"); + Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key"); + Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret"); + Mockito.when(configs.getS3LogBucket()).thenReturn("test-bucket"); + Mockito.when(configs.getS3LogBucketRegion()).thenReturn(""); + + assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass()); + } + @Test public void createCloudLogClientTestAws() { var configs = Mockito.mock(LogConfigs.class); + Mockito.when(configs.getS3MinioEndpoint()).thenReturn(""); Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key"); Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret"); Mockito.when(configs.getS3LogBucket()).thenReturn("test-bucket"); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 1dcecdaf3c8011..a1cd9f6c8eb130 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -217,7 +217,7 @@ public static void main(String[] args) throws IOException, InterruptedException final Configs configs = new EnvConfigs(); - MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getSchedulerLogsRoot(configs).toString()); + LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getSchedulerLogsRoot(configs)); final Path workspaceRoot = configs.getWorkspaceRoot(); LOGGER.info("workspaceRoot = " + workspaceRoot); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 2ecff8af146728..9ab9215016a2d3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -170,7 +170,7 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep public static ServerRunnable getServer(ServerFactory apiFactory) throws Exception { final Configs configs = new EnvConfigs(); - MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getServerLogsRoot(configs).toString()); + LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getServerLogsRoot(configs)); LOGGER.info("Creating config repository..."); final Database configDatabase = new ConfigsDatabaseInstance( diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 5fb9c70da8a82d..ecd95bf5331c47 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -754,8 +754,6 @@ public void testCheckpointing() throws Exception { @Test @Order(14) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") public void testRedactionOfSensitiveRequestBodies() throws Exception { // check that the source password is not present in the logs final List serverLogLines = java.nio.file.Files.readAllLines( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java index 29c05e233e28df..fe481954c34db9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java @@ -24,6 +24,8 @@ package io.airbyte.workers; +import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.NormalizationInput; import io.airbyte.workers.normalization.NormalizationRunner; import java.nio.file.Files; @@ -60,7 +62,13 @@ public Void run(NormalizationInput input, Path jobRoot) throws WorkerException { try (normalizationRunner) { LOGGER.info("Running normalization."); normalizationRunner.start(); - final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); + + Path normalizationRoot = null; + // There are no shared volumes on Kube; only create this for Docker. + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.DOCKER)) { + normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); + } + if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(), input.getResourceRequirements())) { throw new WorkerException("Normalization Failed."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index f08dda23cbcd35..99d8e30b4e4fc5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -28,6 +28,7 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.io.IOs; +import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.scheduler.models.JobRunConfig; @@ -108,11 +109,16 @@ public OUTPUT get() { mdcSetter.accept(jobRoot); LOGGER.info("Executing worker wrapper. Airbyte version: {}", new EnvConfigs().getAirbyteVersionOrWarning()); - jobRootDirCreator.accept(jobRoot); - final String workflowId = workflowIdProvider.get(); - final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME); - IOs.writeFile(workflowIdFile, workflowId); + // There are no shared volumes on Kube; only do this for Docker. + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.DOCKER)) { + LOGGER.debug("Creating local workspace directory.."); + jobRootDirCreator.accept(jobRoot); + + final String workflowId = workflowIdProvider.get(); + final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME); + IOs.writeFile(workflowIdFile, workflowId); + } final Worker worker = workerSupplier.get(); final CompletableFuture outputFuture = new CompletableFuture<>(); diff --git a/kube/resources/kustomization.yaml b/kube/resources/kustomization.yaml index 934f54b26adb24..f6e422853a6938 100644 --- a/kube/resources/kustomization.yaml +++ b/kube/resources/kustomization.yaml @@ -12,5 +12,4 @@ resources: - temporal.yaml - volume-configs.yaml - volume-db.yaml - - volume-workspace.yaml - webapp.yaml diff --git a/kube/resources/server.yaml b/kube/resources/server.yaml index 62e31d010a5aa3..b9865c0e5346dd 100644 --- a/kube/resources/server.yaml +++ b/kube/resources/server.yaml @@ -148,15 +148,10 @@ spec: volumeMounts: - name: airbyte-volume-configs mountPath: /configs - - name: airbyte-volume-workspace - mountPath: /workspace - name: gcs-log-creds-volume mountPath: /secrets/gcs-log-creds readOnly: true volumes: - - name: airbyte-volume-workspace - persistentVolumeClaim: - claimName: airbyte-volume-workspace - name: airbyte-volume-configs persistentVolumeClaim: claimName: airbyte-volume-configs diff --git a/kube/resources/volume-workspace.yaml b/kube/resources/volume-workspace.yaml deleted file mode 100644 index 6094deef2b824c..00000000000000 --- a/kube/resources/volume-workspace.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: airbyte-volume-workspace - labels: - airbyte: volume-workspace -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: 500Mi