Skip to content

Commit

Permalink
馃毃 Add SSL documentation and check logic for S3 Destination 馃毃 (#17340)
Browse files Browse the repository at this point in the history
* Adds logic to fail upon non-deterministic custom S3 endpoint and documentation for insecure settings

* Reused config factory settings to a single static variable

* Updated error message and example in the spec.json to match expectation of secured endpoint

* Added validation check within the base s3

* Integrated AdaptiveDestinationRunner with S3Destination

* Reduced visibility for testing and fixed AdaptiveDestinationRunner issue

* Adds speicifc secure protocol with S3 and empty endpoint check

* Bumps docker version and adds comments and clearer string methods

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii committed Oct 3, 2022
1 parent bb6dff5 commit 1d956df
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.15
dockerImageTag: 0.3.16
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4410,7 +4410,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.15"
- dockerImage: "airbyte/destination-s3:0.3.16"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class BaseS3Destination extends BaseConnector implements Destina

private static final Logger LOGGER = LoggerFactory.getLogger(BaseS3Destination.class);

private final S3DestinationConfigFactory configFactory;
protected final S3DestinationConfigFactory configFactory;

private final NamingConventionTransformer nameTransformer;

Expand All @@ -38,7 +38,7 @@ public BaseS3Destination(final S3DestinationConfigFactory configFactory) {
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final S3DestinationConfig destinationConfig = configFactory.getS3DestinationConfig(config, storageProvider());
final AmazonS3 s3Client = destinationConfig.getS3Client();
Expand All @@ -59,9 +59,9 @@ public AirbyteConnectionStatus check(JsonNode config) {
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3DestinationConfig s3Config = configFactory.getS3DestinationConfig(config, storageProvider());
return new S3ConsumerFactory().create(
outputRecordCollector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public static void testMultipartUpload(final AmazonS3 s3Client, final String buc
LOGGER.info("Finished verification for multipart upload mode");
}

/**
* Checks that S3 custom endpoint uses a variant that only uses HTTPS
*
* @param endpoint URL string representing an accessible S3 bucket
*/
public static boolean testCustomEndpointSecured(final String endpoint) {
// if user does not use a custom endpoint, do not fail
if (endpoint == null || endpoint.length() == 0) {
return true;
} else {
return endpoint.startsWith("https://");
}
}

@VisibleForTesting
static void attemptS3WriteAndDelete(final S3StorageOperations storageOperations,
final S3DestinationConfig s3Config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_PATH_FORMAT;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
Expand Down Expand Up @@ -253,7 +254,7 @@ protected AmazonS3 createS3Client() {
.build();
}

final ClientConfiguration clientConfiguration = new ClientConfiguration();
final ClientConfiguration clientConfiguration = new ClientConfiguration().withProtocol(Protocol.HTTPS);
clientConfiguration.setSignerOverride("AWSS3V4SignerType");

return AmazonS3ClientBuilder.standard()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class AdaptiveDestinationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(AdaptiveDestinationRunner.class);

private static final String DEPLOYMENT_MODE_KEY = "DEPLOYMENT_MODE";
private static final String COULD_MODE = "CLOUD";
private static final String CLOUD_MODE = "CLOUD";

public static OssDestinationBuilder baseOnEnv() {
final String mode = System.getenv(DEPLOYMENT_MODE_KEY);
Expand Down Expand Up @@ -72,7 +72,7 @@ public Runner(final String deploymentMode,

private Destination getDestination() {
LOGGER.info("Running destination under deployment mode: {}", deploymentMode);
if (deploymentMode != null && deploymentMode.equals(COULD_MODE)) {
if (deploymentMode != null && deploymentMode.equals(CLOUD_MODE)) {
return cloudDestinationSupplier.get();
}
if (deploymentMode == null) {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ RUN /bin/bash -c 'set -e && \
echo "unknown arch" ;\
fi'

LABEL io.airbyte.version=0.3.15
LABEL io.airbyte.version=0.3.16
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.destination.s3.S3Destination'
mainClass = 'io.airbyte.integrations.destination.s3.S3DestinationRunner'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@

package io.airbyte.integrations.destination.s3;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.integrations.base.IntegrationRunner;

public class S3Destination extends BaseS3Destination {

public S3Destination() {}

public S3Destination(final S3DestinationConfigFactory s3DestinationConfigFactory) {
@VisibleForTesting
protected S3Destination(final S3DestinationConfigFactory s3DestinationConfigFactory) {
super(s3DestinationConfigFactory);
}

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
new IntegrationRunner(new S3Destination()).run(args);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3;

import io.airbyte.integrations.base.adaptive.AdaptiveDestinationRunner;

public class S3DestinationRunner {

public static void main(final String[] args) throws Exception {
AdaptiveDestinationRunner.baseOnEnv()
.withOssDestination(S3Destination::new)
.withCloudDestination(S3DestinationStrictEncrypt::new)
.run(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;

public class S3DestinationStrictEncrypt extends S3Destination {

public S3DestinationStrictEncrypt() {
super();
}

@VisibleForTesting
protected S3DestinationStrictEncrypt(final S3DestinationConfigFactory configFactory) {
super(configFactory);
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig destinationConfig = this.configFactory.getS3DestinationConfig(config, super.storageProvider());

// Fails early to avoid extraneous validations checks if custom endpoint is not secure
if (!S3BaseChecks.testCustomEndpointSecured(destinationConfig.getEndpoint())) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Custom endpoint does not use HTTPS");
}
return super.check(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class S3DestinationStrictEncryptTest {

private AmazonS3 s3;
private S3DestinationConfigFactory factoryConfig;

@BeforeEach
public void setup() {
s3 = mock(AmazonS3.class);
final InitiateMultipartUploadResult uploadResult = mock(InitiateMultipartUploadResult.class);
final UploadPartResult uploadPartResult = mock(UploadPartResult.class);
when(s3.uploadPart(any(UploadPartRequest.class))).thenReturn(uploadPartResult);
when(s3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(uploadResult);

factoryConfig = new S3DestinationConfigFactory() {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config, final StorageProvider storageProvider) {
return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region")
.withEndpoint("https://s3.example.com")
.withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey")
.withS3Client(s3)
.get();
}
};
}


/**
* Test that checks if user is using a connection that is HTTPS only
*/
@Test
public void checksCustomEndpointIsHttpsOnly() {
final S3Destination destinationWithHttpsOnlyEndpoint = new S3DestinationStrictEncrypt(factoryConfig);
final AirbyteConnectionStatus status = destinationWithHttpsOnlyEndpoint.check(null);
assertEquals(Status.SUCCEEDED, status.getStatus(), "custom endpoint did not contain `s3-accesspoint`");
}

/**
* Test that checks if user is using a connection that is deemed insecure since it does not always enforce HTTPS only
* <p>https://docs.aws.amazon.com/general/latest/gr/s3.html</p>
*/
@Test
public void checksCustomEndpointIsNotHttpsOnly() {
final S3Destination destinationWithStandardUnsecuredEndpoint = new S3DestinationStrictEncrypt(new S3DestinationConfigFactory() {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config, final StorageProvider storageProvider) {
return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region")
.withEndpoint("s3.us-west-1.amazonaws.com")
.withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey")
.withS3Client(s3)
.get();
}
});
final AirbyteConnectionStatus status = destinationWithStandardUnsecuredEndpoint.check(null);
assertEquals(Status.FAILED, status.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class S3DestinationTest {

private AmazonS3 s3;
private S3DestinationConfig config;
private S3DestinationConfigFactory factoryConfig;

@BeforeEach
public void setup() {
Expand All @@ -48,24 +49,24 @@ public void setup() {
.withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey")
.withS3Client(s3)
.get();
}

@Test
/**
* Test that check will fail if IAM user does not have listObjects permission
*/
public void checksS3WithoutListObjectPermission() {
final S3Destination destinationFail = new S3Destination(new S3DestinationConfigFactory() {

factoryConfig = new S3DestinationConfigFactory() {
public S3DestinationConfig getS3DestinationConfig(final JsonNode config, final StorageProvider storageProvider) {
return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region")
.withEndpoint("fake-endpoint")
.withEndpoint("https://s3.example.com")
.withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey")
.withS3Client(s3)
.get();
}
};
}

});
@Test
/**
* Test that check will fail if IAM user does not have listObjects permission
*/
public void checksS3WithoutListObjectPermission() {
final S3Destination destinationFail = new S3Destination(factoryConfig);
doThrow(new AmazonS3Exception("Access Denied")).when(s3).listObjects(any(ListObjectsRequest.class));
final AirbyteConnectionStatus status = destinationFail.check(null);
assertEquals(Status.FAILED, status.getStatus(), "Connection check should have failed");
Expand All @@ -77,17 +78,7 @@ public S3DestinationConfig getS3DestinationConfig(final JsonNode config, final S
* Test that check will succeed when IAM user has all required permissions
*/
public void checksS3WithListObjectPermission() {
final S3Destination destinationSuccess = new S3Destination(new S3DestinationConfigFactory() {

public S3DestinationConfig getS3DestinationConfig(final JsonNode config, final StorageProvider storageProvider) {
return S3DestinationConfig.create("fake-bucket", "fake-bucketPath", "fake-region")
.withEndpoint("fake-endpoint")
.withAccessKeyCredential("fake-accessKeyId", "fake-secretAccessKey")
.withS3Client(s3)
.get();
}

});
final S3Destination destinationSuccess = new S3Destination(factoryConfig);
final AirbyteConnectionStatus status = destinationSuccess.check(null);
assertEquals(Status.SUCCEEDED, status.getStatus(), "Connection check should have succeeded");
}
Expand Down
1 change: 0 additions & 1 deletion airbyte-webapp/src/core/domain/connector/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export const getExcludedConnectorIds = (workspaceId: string) =>
"2340cbba-358e-11ec-8d3d-0242ac130203", // hide Pular Destination https://github.com/airbytehq/airbyte-cloud/issues/2614
"d4d3fef9-e319-45c2-881a-bd02ce44cc9f", // hide Redis Destination https://github.com/airbytehq/airbyte-cloud/issues/2593
"2c9d93a7-9a17-4789-9de9-f46f0097eb70", // hide Rockset Destination https://github.com/airbytehq/airbyte-cloud/issues/2615
"4816b78f-1489-44c1-9060-4b19d5fa9362", // hide S3 Destination https://github.com/airbytehq/airbyte-cloud/issues/2616
"69589781-7828-43c5-9f63-8925b1c1ccc2", // hide S3 Source https://github.com/airbytehq/airbyte-cloud/issues/2618
"2470e835-feaf-4db6-96f3-70fd645acc77", // Salesforce Singer
"3dc6f384-cd6b-4be3-ad16-a41450899bf0", // hide Scylla Destination https://github.com/airbytehq/airbyte-cloud/issues/2617
Expand Down
5 changes: 4 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ List of required fields:
* **S3 Bucket Region**

1. Allow connections from Airbyte server to your AWS S3/ Minio S3 cluster \(if they exist in separate VPCs\).
2. An S3 bucket with credentials or an instanceprofile with read/write permissions configured for the host (ec2, eks).
2. An S3 bucket with credentials or an instance profile with read/write permissions configured for the host (ec2, eks).
3. [Enforce encryption of data in transit](https://docs.aws.amazon.com/AmazonS3/latest/userguide/security-best-practices.html#transit)

## Step 1: Set up S3

Expand All @@ -21,6 +22,8 @@ Use an existing or create new [Access Key ID and Secret Access Key](https://docs

Prepare S3 bucket that will be used as destination, see [this](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) to create an S3 bucket.

NOTE: If the S3 cluster is not configured to use TLS, the connection to Amazon S3 silently reverts to an unencrypted connection. Airbyte recommends all connections be configured to use TLS/SSL as support for AWS's [shared responsibility model](https://aws.amazon.com/compliance/shared-responsibility-model/)

## Step 2: Set up the S3 destination connector in Airbyte

**For Airbyte Cloud:**
Expand Down

0 comments on commit 1d956df

Please sign in to comment.