Skip to content

Commit

Permalink
Logging to GCS. (#4501)
Browse files Browse the repository at this point in the history
Add the ability to log to GCS.
  • Loading branch information
davinchia committed Jul 7, 2021
1 parent 3e37e90 commit e2074a4
Show file tree
Hide file tree
Showing 35 changed files with 686 additions and 141 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
LOG_LEVEL=INFO

# Cloud log backups. Don't use this unless you know what you're doing.
# Cloud log backups. Don't use this unless you know what you're doing. Mainly for Airbyte devs.
# If you just want to capture Docker logs, you probably want to use something like this instead:
# https://docs.docker.com/config/containers/logging/configure/
S3_LOG_BUCKET=
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ jobs:
run: ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates --scan

- name: Format
run: ./gradlew --no-daemon format --scan --info --stacktrace
run: ./gradlew format --scan --info --stacktrace

- name: Ensure no file change
run: git status --porcelain && test -z "$(git status --porcelain)"

- name: Build
run: CORE_ONLY=true ./gradlew --no-daemon build --scan
run: CORE_ONLY=true ./gradlew build --scan

- name: Ensure no file change
run: git status --porcelain && test -z "$(git status --porcelain)"
Expand All @@ -111,7 +111,7 @@ jobs:

- name: Build Core Docker Images
if: success() && github.ref == 'refs/heads/master'
run: ./gradlew --no-daemon composeBuild --scan
run: ./gradlew composeBuild --scan
env:
GIT_REVISION: ${{ github.sha }}

Expand All @@ -122,7 +122,7 @@ jobs:
run: ./tools/bin/acceptance_test.sh

- name: Automatic Migration Acceptance Test
run: MIGRATION_TEST_VERSION=$(grep VERSION .env | tr -d "VERSION=") ./gradlew --no-daemon :airbyte-tests:automaticMigrationAcceptanceTest --rerun-tasks --scan -i
run: MIGRATION_TEST_VERSION=$(grep VERSION .env | tr -d "VERSION=") ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan

- name: Slack Notification - Failure
if: failure() && github.ref == 'refs/heads/master'
Expand Down Expand Up @@ -320,6 +320,7 @@ jobs:
run: ./tools/bin/cloud_storage_logging_test.sh
env:
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
GOOGLE_CLOUD_STORAGE_TEST_CREDS: ${{ secrets.GOOGLE_CLOUD_STORAGE_TEST_CREDS }}

- name: Run Kubernetes End-to-End Acceptance Tests
env:
Expand Down
9 changes: 7 additions & 2 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
<Property name="default-worker-file-pattern">%d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %m%n</Property>
<!-- Always log INFO by default. -->
<Property name="log-level">$${env:LOG_LEVEL:-INFO}</Property>

<Property name="s3-bucket">$${env:S3_LOG_BUCKET}</Property>
<Property name="s3-region">$${env:S3_LOG_BUCKET_REGION}</Property>
<Property name="s3-aws-key">$${env:AWS_ACCESS_KEY_ID}</Property>
<Property name="s3-aws-secret">$${env:AWS_SECRET_ACCESS_KEY}</Property>
<Property name="s3-minio-endpoint">$${env:S3_MINIO_ENDPOINT}</Property>
<Property name="s3-path-style-access">$${env:S3_PATH_STYLE_ACCESS}</Property>

<Property name="gcp-storage-bucket">$${env:GCP_STORAGE_BUCKET}</Property>
</Properties>

<Appenders>
Expand Down Expand Up @@ -50,7 +53,8 @@
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:job_log_path}" s3SigningRegion="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}">
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:job_log_path}">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n</Pattern>
</PatternLayout>
Expand Down Expand Up @@ -95,7 +99,8 @@
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="app-logging${ctx:workspace_app_root}" s3SigningRegion="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}">
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:workspace_app_root}">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n</Pattern>
</PatternLayout>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public interface Configs {

String getKubeNamespace();

// Resources
String getCpuRequest();

String getCpuLimit();
Expand All @@ -75,7 +76,7 @@ public interface Configs {

String getMemoryLimit();

// The following methods retrieve logging related information.
// Logging
String getS3LogBucket();

String getS3LogBucketRegion();
Expand All @@ -86,6 +87,10 @@ public interface Configs {

String getS3MinioEndpoint();

String getGcpStorageBucket();

String getGoogleApplicationCredentials();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package io.airbyte.config;

import com.google.common.base.Preconditions;
import io.airbyte.config.helpers.LogHelpers;
import io.airbyte.config.helpers.LogClientSingleton;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
Expand Down Expand Up @@ -212,27 +212,37 @@ public String getMemoryLimit() {

@Override
public String getS3LogBucket() {
return getEnsureEnv(LogHelpers.S3_LOG_BUCKET);
return getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, "");
}

@Override
public String getS3LogBucketRegion() {
return getEnsureEnv(LogHelpers.S3_LOG_BUCKET_REGION);
return getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET_REGION, "");
}

@Override
public String getAwsAccessKey() {
return getEnsureEnv(LogHelpers.AWS_ACCESS_KEY_ID);
return getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, "");
}

@Override
public String getAwsSecretAccessKey() {
return getEnsureEnv(LogHelpers.AWS_SECRET_ACCESS_KEY);
return getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, "");
}

@Override
public String getS3MinioEndpoint() {
return getEnv(LogHelpers.S3_MINIO_ENDPOINT);
return getEnvOrDefault(LogClientSingleton.S3_MINIO_ENDPOINT, "");
}

@Override
public String getGcpStorageBucket() {
return getEnvOrDefault(LogClientSingleton.GCP_STORAGE_BUCKET, "");
}

@Override
public String getGoogleApplicationCredentials() {
return getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, "");
}

private String getEnvOrDefault(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@

package io.airbyte.config.helpers;

import io.airbyte.config.Configs;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Interface for various Cloud Storage clients supporting Cloud log retrieval.
Expand All @@ -38,21 +39,52 @@
*/
public interface CloudLogs {

Logger LOGGER = LoggerFactory.getLogger(CloudLogs.class);

/**
* Retrieve all objects at the given path in lexicographical order, and return their contents as one
* file.
*/
File downloadCloudLog(Configs configs, String logPath) throws IOException;
File downloadCloudLog(LogConfigs configs, String logPath) throws IOException;

/**
* Assume all the lexicographically ordered objects at the given path form one giant log file,
* return the last numLines lines.
*/
List<String> tailCloudLog(Configs configs, String logPath, int numLines) throws IOException;
List<String> tailCloudLog(LogConfigs configs, String logPath, int numLines) throws IOException;

/**
* @return true if configuration is not set
* @return true if no cloud logging configuration is set;
*/
boolean hasEmptyConfigs(Configs configs);
static boolean hasEmptyConfigs(LogConfigs configs) {
return !hasS3Configuration(configs) && !hasGcpConfiguration(configs);
}

static CloudLogs createCloudLogClient(LogConfigs configs) {
// check if the configs exists, and pick a client.
if (hasS3Configuration(configs)) {
LOGGER.info("Creating AWS Log Client");
return new S3Logs();
}

if (hasGcpConfiguration(configs)) {
LOGGER.info("Creating GCS Log Client");
return new GcsLogs();
}

throw new RuntimeException("Error no cloud credentials configured..");
}

private static boolean hasS3Configuration(LogConfigs configs) {
return !configs.getAwsAccessKey().isBlank() &&
!configs.getAwsSecretAccessKey().isBlank() &&
!configs.getS3LogBucketRegion().isBlank() &&
!configs.getS3LogBucket().isBlank();
}

private static boolean hasGcpConfiguration(LogConfigs configs) {
return !configs.getGcpStorageBucket().isBlank() &&
!configs.getGoogleApplicationCredentials().isBlank();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.helpers;

import com.google.api.client.util.Preconditions;
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.Lists;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.EnvConfigs;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsLogs implements CloudLogs {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsLogs.class);

private static Storage GCS;

@Override
public File downloadCloudLog(LogConfigs configs, String logPath) throws IOException {
return getFile(configs, logPath, LogClientSingleton.DEFAULT_PAGE_SIZE);
}

static File getFile(LogConfigs configs, String logPath, int pageSize) throws IOException {
LOGGER.debug("Retrieving logs from GCS path: {}", logPath);
createGcsClientIfNotExists(configs);

LOGGER.debug("Start GCS list request.");
Page<Blob> blobs = GCS.list(
configs.getGcpStorageBucket(),
Storage.BlobListOption.prefix(logPath),
Storage.BlobListOption.pageSize(pageSize));

var randomName = Strings.addRandomSuffix("logs", "-", 5);
var tmpOutputFile = new File("/tmp/" + randomName);
var os = new FileOutputStream(tmpOutputFile);
LOGGER.debug("Start getting GCS objects.");
// Objects are returned in lexicographical order.
for (Blob blob : blobs.iterateAll()) {
blob.downloadTo(os);
}
os.close();
LOGGER.debug("Done retrieving GCS logs: {}.", logPath);
return tmpOutputFile;
}

@Override
public List<String> tailCloudLog(LogConfigs configs, String logPath, int numLines) throws IOException {
LOGGER.debug("Tailing logs from GCS path: {}", logPath);
createGcsClientIfNotExists(configs);

LOGGER.debug("Start GCS list request.");
Page<Blob> blobs = GCS.list(
configs.getGcpStorageBucket(),
Storage.BlobListOption.prefix(logPath));

var ascendingTimestampBlobs = new ArrayList<Blob>();
for (Blob blob : blobs.iterateAll()) {
ascendingTimestampBlobs.add(blob);
}
var descendingTimestampBlobs = Lists.reverse(ascendingTimestampBlobs);

var lines = new ArrayList<String>();
int linesRead = 0;

LOGGER.debug("Start getting GCS objects.");
while (linesRead <= numLines && !descendingTimestampBlobs.isEmpty()) {
var poppedBlob = descendingTimestampBlobs.remove(0);
try (var inMemoryData = new ByteArrayOutputStream()) {
poppedBlob.downloadTo(inMemoryData);
var currFileLines = inMemoryData.toString().split("\n");
List<String> currFileLinesReversed = Lists.reverse(List.of(currFileLines));
for (var line : currFileLinesReversed) {
if (linesRead == numLines) {
break;
}
lines.add(0, line);
linesRead++;
}
}
}

LOGGER.debug("Done retrieving GCS logs: {}.", logPath);
return lines;
}

private static void createGcsClientIfNotExists(LogConfigs configs) {
if (GCS == null) {
Preconditions.checkNotNull(configs.getGcpStorageBucket());
Preconditions.checkNotNull(configs.getGoogleApplicationCredentials());

GCS = StorageOptions.getDefaultInstance().getService();
}
}

public static void main(String[] args) throws IOException {
Storage storage = StorageOptions.getDefaultInstance().getService();
var bucket = "davin-kube-logging-test";
Page<Blob> blobs =
storage.list(
bucket,
Storage.BlobListOption.prefix("app-logging/workspace/server/logs"),
Storage.BlobListOption.pageSize(1));

var randomName = Strings.addRandomSuffix("logs", "-", 5);
var tmpOutputFile = new File("/tmp/" + randomName);
var os = new FileOutputStream(tmpOutputFile);
for (Blob blob : blobs.iterateAll()) {
System.out.println(blob.getName());
blob.downloadTo(os);
}
os.close();
var data = new GcsLogs().tailCloudLog(new LogConfigDelegator(new EnvConfigs()), "tail", 6);
System.out.println(data);
}

}

0 comments on commit e2074a4

Please sign in to comment.