Skip to content

Commit

Permalink
🐛 Airbyte Kube will now error on start up if incorrect logs configura…
Browse files Browse the repository at this point in the history
…tion is passed in. Fix scheduler counting bug. (#6188)

* Move the cloud client create call so this errors out on app start up instead of when they the log file is retrieved.

* Add tests.

* Fix code that was never being called.
  • Loading branch information
davinchia committed Sep 18, 2021
1 parent fa0bd55 commit 51fadf2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public static File getServerLogFile(Configs configs) {
}

var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + logPathBase;
try {
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
Expand All @@ -107,7 +106,6 @@ public static File getSchedulerLogFile(Configs configs) {
}

var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + logPathBase;
try {
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
Expand All @@ -122,7 +120,6 @@ public static List<String> getJobLogFile(Configs configs, Path logPath) throws I
}

var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
var cloudLogPath = JOB_LOGGING_CLOUD_PREFIX + logPath;
return logClient.tailCloudLog(logConfigs, cloudLogPath, LOG_TAIL_SIZE);
}
Expand All @@ -136,33 +133,38 @@ public static void deleteLogs(Configs configs, String logPath) {
throw new NotImplementedException("Local log deletes not supported.");
}
var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
var cloudLogPath = JOB_LOGGING_CLOUD_PREFIX + logPath;
logClient.deleteLogs(logConfigs, cloudLogPath);
}

public static void setJobMdc(Path path) {
if (shouldUseLocalLogs(new EnvConfigs())) {
var configs = new EnvConfigs();
if (shouldUseLocalLogs(configs)) {
LOGGER.debug("Setting docker job mdc");
MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
} else {
LOGGER.debug("Setting kube job mdc");
var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
MDC.put(LogClientSingleton.CLOUD_JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
}
}

public static void setWorkspaceMdc(Path path) {
if (shouldUseLocalLogs(new EnvConfigs())) {
var configs = new EnvConfigs();
if (shouldUseLocalLogs(configs)) {
LOGGER.debug("Setting docker workspace mdc");
MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString());
} else {
LOGGER.debug("Setting kube workspace mdc");
var logConfigs = new LogConfigDelegator(configs);
createCloudClientIfNull(logConfigs);
MDC.put(LogClientSingleton.CLOUD_WORKSPACE_MDC_KEY, path.toString());
}
}

private static boolean shouldUseLocalLogs(Configs configs) {
return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER) || CloudLogs.hasEmptyConfigs(new LogConfigDelegator(configs));
return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER);
}

private static void createCloudClientIfNull(LogConfigs configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,59 @@
package io.airbyte.config.helpers;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class CloudLogsTest {
public class CloudLogsClientTest {

@Nested
class CloudLogClientMissingConfiguration {

@Test
public void testMinio() {
var configs = Mockito.mock(LogConfigs.class);
// Mising bucket.
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("minio-endpoint");
Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key");
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(configs.getS3LogBucket()).thenReturn("");
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");

assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
}

@Test
public void testAws() {
var configs = Mockito.mock(LogConfigs.class);
// Missing bucket and access key.
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("");
Mockito.when(configs.getAwsAccessKey()).thenReturn("");
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(configs.getS3LogBucket()).thenReturn("");
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");

assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
}

@Test
public void testGcs() {
var configs = Mockito.mock(LogConfigs.class);
Mockito.when(configs.getAwsAccessKey()).thenReturn("");
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("");
Mockito.when(configs.getS3LogBucket()).thenReturn("");
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");

// Missing bucket.
Mockito.when(configs.getGcpStorageBucket()).thenReturn("");
Mockito.when(configs.getGoogleApplicationCredentials()).thenReturn("path/to/google/secret");

assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
}

}

@Test
public void createCloudLogClientTestMinio() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -92,19 +91,20 @@ public void run() {
}

private void scheduleSyncJobs() throws IOException {
final AtomicInteger jobsScheduled = new AtomicInteger();
int jobsScheduled = 0;
final List<StandardSync> activeConnections = getAllActiveConnections();

for (StandardSync connection : activeConnections) {
final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId());

if (scheduleJobPredicate.test(previousJobOptional, connection)) {
jobFactory.create(connection.getConnectionId());
jobsScheduled++;
}
}
int jobsScheduledCount = jobsScheduled.get();
if (jobsScheduledCount > 0) {
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduler: {}", activeConnections.size(), jobsScheduled.get());

if (jobsScheduled > 0) {
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduled this cycle: {}", activeConnections.size(), jobsScheduled);
}
}

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def createSpotlessTarget = { pattern ->
'normalization_test_output',
'tools',
'secrets',
'charts'
'charts' // Helm charts often have injected template strings that will fail general linting. Helm linting is done separately.
]

if (System.getenv().containsKey("SUB_BUILD")) {
Expand Down

0 comments on commit 51fadf2

Please sign in to comment.