Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kube Logs are stored and retrieved from Cloud Storage. #4053

Merged
merged 44 commits into from
Jun 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a0827fa
Set Log4j2 debug mode. Successfull route job and app logs to S3 while…
davinchia Jun 11, 2021
814449f
Turn everything into environment variables. Successfully use property…
davinchia Jun 13, 2021
aafa5bd
Checkpoint: Get GCS working.
davinchia Jun 14, 2021
d4e611f
Checkpoint: Get all objects out of a directory and write it to a file.
davinchia Jun 15, 2021
ff959a3
Checkpoint: Get scheduler and server logs from Cloud working using th…
davinchia Jun 16, 2021
09c6d1d
Setup to test S3LoggingClient.
davinchia Jun 16, 2021
98bca11
One initial test in S3Logs class.
davinchia Jun 16, 2021
329bfd1
Fill in normal test.
davinchia Jun 16, 2021
7d97575
Look at a specific folder.
davinchia Jun 16, 2021
e9bd71d
Correct error.
davinchia Jun 16, 2021
35d1abf
Get logs tail working.
davinchia Jun 16, 2021
6415d55
Format.
davinchia Jun 16, 2021
3ff34b7
Pass test.
davinchia Jun 16, 2021
c9e89cb
Make logging test script executable. Disable most of build for faster…
davinchia Jun 16, 2021
0299f44
Mount S3 secret for logging test.
davinchia Jun 16, 2021
746a593
Strip quotes from jq output.
davinchia Jun 16, 2021
b3320c3
Uncomment rest of build.
davinchia Jun 16, 2021
23c8c93
Add comments.
davinchia Jun 16, 2021
afbe9d2
Checkpoint: Kube modifications. Scheduler, Server and Job logs are wo…
davinchia Jun 17, 2021
19db827
Remove affinity.
davinchia Jun 17, 2021
858090b
Revert "Remove affinity."
davinchia Jun 17, 2021
0dcb92b
Checkpoint: Stop emitting job id with MDC as we are not actually usin…
davinchia Jun 17, 2021
742d780
Checkpoint: Remove Temporal PVC. Clean up logging functions and MDC key.
davinchia Jun 17, 2021
d08269a
Merge remote-tracking branch 'origin' into davinchia/s3-logging-exper…
davinchia Jun 18, 2021
06f6981
Push code that is failing on Davin machine so Jared can test.
davinchia Jun 18, 2021
cbca608
Remove random changes.
davinchia Jun 18, 2021
e259fa3
Remove unused plugin.
davinchia Jun 18, 2021
9801789
Remove GCP configuration for now.
davinchia Jun 18, 2021
e062f71
Remove logic added for GKE bug.
davinchia Jun 18, 2021
af615e1
Checkpoint: Job and app logs work. Get S3LogClient integration test w…
davinchia Jun 18, 2021
0d8ccbc
Fix worker log test.
davinchia Jun 18, 2021
35e998a
Fix job scheduler test.
davinchia Jun 18, 2021
ede5e42
Correct cloud storage test path. Fix format.
davinchia Jun 18, 2021
2cb5730
Stash to check if master acceptance tests work.
davinchia Jun 18, 2021
b49c7c9
Does registering all modules fix test?
davinchia Jun 18, 2021
8be0a6e
Add S3 credentials information to Kube acceptance tests.
davinchia Jun 18, 2021
786d594
Add comments to explain Log4j config.
davinchia Jun 18, 2021
9858624
Remove unneeded package.
davinchia Jun 18, 2021
d0043b3
Remove unneeded comments.
davinchia Jun 18, 2021
31dd280
Respond to PR feedback.
davinchia Jun 19, 2021
7014ea3
Explicitly require Jackson date time module.
davinchia Jun 19, 2021
d6b949d
Format.
davinchia Jun 19, 2021
22eb741
Migrate all mappers to use injected function.
davinchia Jun 19, 2021
2c4bd4c
More format.
davinchia Jun 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ WEBAPP_URL=http://localhost:8000/
API_URL=/api/v1/
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001

S3_LOG_BUCKET=
S3_LOG_BUCKET_REGION=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
6 changes: 6 additions & 0 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,16 @@ jobs:
- name: Build Core Docker Images and Run Tests
run: CORE_ONLY=true ./gradlew --no-daemon build --scan --rerun-tasks

- name: Run Logging Tests
run: ./tools/bin/cloud_storage_logging_test.sh
env:
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}

- name: Run Kubernetes End-to-End Acceptance Tests
env:
USER: root
HOME: /home/runner
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
run: |
IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
# In case of self-hosted EC2 errors, remove this block.
Expand Down
3 changes: 1 addition & 2 deletions airbyte-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ plugins {
}

dependencies {
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'org.apache.commons:commons-lang3:3.11'
// Dependencies for this module should be specified in the top-level build.gradle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of https://github.com/airbytehq/airbyte/blob/master/build.gradle#L181, if everything is depending on airbyte-commons, it seems cleaner to have all of common's dependencies in the build.gradle as well.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.jackson;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
* The {@link JavaTimeModule} allows mappers to accommodate different varieties of serialised date
* time strings.
*
* All jackson mapper creation should use the following methods for instantiation.
*/
public class MoreMappers {

public static ObjectMapper initMapper() {
final ObjectMapper result = new ObjectMapper().registerModule(new JavaTimeModule());
result.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return result;
}

public static ObjectMapper initYamlMapper(YAMLFactory factory) {
return new ObjectMapper(factory).registerModule(new JavaTimeModule());
}

}
10 changes: 2 additions & 8 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.core.util.Separators;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Charsets;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.stream.MoreStreams;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -46,15 +46,9 @@
public class Jsons {

// Object Mapper is thread-safe
private static final ObjectMapper OBJECT_MAPPER = initMapper();
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writer(new JsonPrettyPrinter());

private static ObjectMapper initMapper() {
final ObjectMapper result = new ObjectMapper();
result.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return result;
}

public static <T> String serialize(T object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.lang.CloseableConsumer;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand All @@ -43,9 +44,8 @@

public class Yamls {

public static final YAMLFactory YAML_FACTORY = new YAMLFactory();
// Object Mapper is thread-safe
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(YAML_FACTORY);
private static final YAMLFactory YAML_FACTORY = new YAMLFactory();
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initYamlMapper(YAML_FACTORY);

public static <T> String serialize(T object) {
try {
Expand Down
63 changes: 59 additions & 4 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,60 @@
<Properties>
<Property name="default-pattern">%d{yyyy-MM-dd HH:mm:ss}{GMT+0} %highlight{%p} %C{1.}(%M):%L - %X - %m%n</Property>
<Property name="default-worker-file-pattern">%d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %m%n</Property>

<Property name="s3-bucket">$${env:S3_LOG_BUCKET}</Property>
<Property name="s3-region">$${env:S3_LOG_BUCKET_REGION}</Property>
<Property name="s3-aws-key">$${env:AWS_ACCESS_KEY_ID}</Property>
<Property name="s3-aws-secret">$${env:AWS_SECRET_ACCESS_KEY}</Property>

</Properties>

<Appenders>
<Console name="Default" target="SYSTEM_OUT">
<PatternLayout pattern="${default-pattern}"/>
</Console>

<Routing name="LogSplit">
<Routes pattern="$${ctx:job_root}">
<Routes pattern="$${ctx:job_log_path}">
<!-- Don't split logs if job_root isn't defined -->
<Route key="$${ctx:job_root}">
<Route key="$${ctx:job_log_path}">
<Null name="/dev/null"/>
</Route>
<Route>
<File name="${ctx:job_root}" fileName="${ctx:job_root}/${ctx:job_log_filename}">
<File name="${ctx:job_log_path}-local" fileName="${ctx:job_log_path}">
<PatternLayout pattern="${default-worker-file-pattern}"/>
</File>
</Route>
</Routes>
<IdlePurgePolicy timeToLive="15" timeUnit="minutes"/>
</Routing>

<!--
Separate routers are created for each cloud logger as
1) a Route only accepts 1 appender
2) Routes don't support routing log output to more than Route
-->
<Routing name="LogSplitCloud">
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will comment why we had to configure in this way - essentially only accepts one appender.

<Routes pattern="$${ctx:job_log_path}">
<!-- Don't split logs if job_root isn't defined -->
<Route key="$${ctx:job_log_path}">
<Null name="/dev/null"/>
</Route>
<Route>
<Log4j2Appender name="${ctx:job_log_path}-cloud"
verbose="true"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:job_log_path}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n</Pattern>
</PatternLayout>
</Log4j2Appender>
</Route>
</Routes>
<IdlePurgePolicy timeToLive="15" timeUnit="minutes"/>
</Routing>

<Routing name="AppLogSplit">
<Routes pattern="$${ctx:workspace_app_root}">
<!-- Don't split logs if workspace_app_log_root isn't defined -->
Expand All @@ -31,7 +65,7 @@
</Route>
<Route>
<RollingFile
name="${ctx:workspace_app_root}"
name="${ctx:workspace_app_root}-local"
fileName="${ctx:workspace_app_root}/logs.log"
filePattern="${ctx:workspace_app_root}/logs.%i.log.gz"
ignoreExceptions="false">
Expand All @@ -47,13 +81,34 @@
</Routes>
<IdlePurgePolicy timeToLive="15" timeUnit="minutes"/>
</Routing>
<Routing name="AppLogSplitCloud">
<Routes pattern="$${ctx:workspace_app_root}">
<!-- Don't split logs if workspace_app_log_root isn't defined -->
<Route key="$${ctx:workspace_app_root}">
<Null name="/dev/null"/>
</Route>
<Route>
<Log4j2Appender name="app-logging/${ctx:workspace_app_root}-cloud/"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="app-logging${ctx:workspace_app_root}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n</Pattern>
</PatternLayout>
</Log4j2Appender>
</Route>
</Routes>
<IdlePurgePolicy timeToLive="15" timeUnit="minutes"/>
</Routing>
</Appenders>

<Loggers>
<Root level="INFO">
<AppenderRef ref="Default"/>
<AppenderRef ref="LogSplit"/>
<AppenderRef ref="LogSplitCloud"/>
<AppenderRef ref="AppLogSplit"/>
<AppenderRef ref="AppLogSplitCloud"/>
</Root>

<Logger name="org.eclipse.jetty" level="INFO" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ void testWorkerDispatch() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {
MDC.put("context", "worker");
MDC.put("job_root", root.toString());
MDC.put("job_log_filename", filename);
MDC.put("job_id", "1");
MDC.put("job_log_path", root + "/" + filename);
logger.error("random message testWorkerDispatch");
MDC.clear();
});
Expand All @@ -83,16 +81,12 @@ void testLogSeparateFiles() throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
MDC.put("job_root", root1.toString());
MDC.put("job_log_filename", filename);
MDC.put("job_id", "1");
MDC.put("job_log_path", root1 + "/" + filename);
logger.error("random message 1");
});

executor.submit(() -> {
MDC.put("job_root", root2.toString());
MDC.put("job_log_filename", filename);
MDC.put("job_id", "2");
MDC.put("job_log_path", root2 + "/" + filename);
logger.error("random message 2");
});

Expand Down
3 changes: 3 additions & 0 deletions airbyte-config/models/build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import org.jsonschema2pojo.SourceType

plugins {
id 'airbyte-integration-test-java'
id "com.github.eirnym.js2p" version "1.0"
}

dependencies {
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')

integrationTestJavaImplementation project(':airbyte-config:models')
}

jsonSchema2Pojo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public interface Configs {

Set<Integer> getTemporalWorkerPorts();

// The following methods retrieve logging related information.
String getS3LogBucket();

String getS3LogBucketRegion();

String getAwsAccessKey();

String getAwsSecretAccessKey();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.config;

import com.google.common.base.Preconditions;
import io.airbyte.config.helpers.LogHelpers;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
Expand Down Expand Up @@ -177,6 +178,26 @@ public Set<Integer> getTemporalWorkerPorts() {
.collect(Collectors.toSet());
}

@Override
public String getS3LogBucket() {
davinchia marked this conversation as resolved.
Show resolved Hide resolved
return getEnsureEnv(LogHelpers.S3_LOG_BUCKET);
}

@Override
public String getS3LogBucketRegion() {
return getEnsureEnv(LogHelpers.S3_LOG_BUCKET_REGION);
}

@Override
public String getAwsAccessKey() {
return getEnsureEnv(LogHelpers.AWS_ACCESS_KEY_ID);
}

@Override
public String getAwsSecretAccessKey() {
return getEnsureEnv(LogHelpers.AWS_SECRET_ACCESS_KEY);
}

private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.helpers;

import io.airbyte.config.Configs;
import java.io.File;
import java.io.IOException;
import java.util.List;

/**
* Interface for various Cloud Storage clients supporting Cloud log retrieval.
*
* The underlying assumption 1) each file at the path is part of the entire log file represented by
* that path 2) log files names start with timestamps, making it possible extract the time the file
* was written from it's name.
*/
public interface CloudLogs {

/**
* Retrieve all objects at the given path in lexicographical order, and return their contents as one
* file.
*/
File downloadCloudLog(Configs configs, String logPath) throws IOException;

/**
* Assume all the lexicographically ordered objects at the given path form one giant log file,
* return the last numLines lines.
*/
List<String> tailCloudLog(Configs configs, String logPath, int numLines) throws IOException;

}