Skip to content

Commit

Permalink
šŸ›Destination-gcs\destination-bigquery(gcs) - updated check() method tā€¦
Browse files Browse the repository at this point in the history
ā€¦o handle that user has both storage.objects.create and storage.multipartUploads.create roles (#9121)

* [9044] Destination-gcs\destination-bigquery(gcs) - updated check() method to handle that user has both storage.objects.create and storage.multipartUploads.create roles
  • Loading branch information
etsybaev committed Jan 10, 2022
1 parent 1054e7e commit 44cb30a
Show file tree
Hide file tree
Showing 17 changed files with 135 additions and 27 deletions.
5 changes: 1 addition & 4 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,7 @@ class Config:
)
spec: Optional[ConnectorSpecification] = None
connectionStatus: Optional[AirbyteConnectionStatus] = None
catalog: Optional[AirbyteCatalog] = Field(
None,
description="log message: any kind of logging you want the platform to know about.",
)
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
state: Optional[AirbyteStateMessage] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.6.1",
"dockerImageTag": "0.6.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.17",
"dockerImageTag": "0.1.19",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs",
"icon": "googlecloudstorage.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: BigQuery (denormalized typed struct)
Expand Down Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.18
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google PubSub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-bigquery:0.6.1"
- dockerImage: "airbyte/destination-bigquery:0.6.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -261,7 +261,7 @@
type: "string"
description: "When running custom transformations or Basic normalization,\
\ running queries on interactive mode can hit BQ limits, choosing batch\
\ will solve those limitss."
\ will solve those limits."
title: "Transformation Query Run Type"
default: "interactive"
enum:
Expand Down Expand Up @@ -311,10 +311,12 @@
title: "Block Size (MB) for GCS multipart upload"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes9\
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
\ to upload a bigger files and improve the speed, but consumes more\
\ memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
keep_files_in_gcs-bucket:
Expand Down Expand Up @@ -1141,7 +1143,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.1.18"
- dockerImage: "airbyte/destination-gcs:0.1.19"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class Config:
log: Optional[AirbyteLogMessage] = Field(None, description="log message: any kind of logging you want the platform to know about.")
spec: Optional[ConnectorSpecification] = None
connectionStatus: Optional[AirbyteConnectionStatus] = None
catalog: Optional[AirbyteCatalog] = Field(None, description="log message: any kind of logging you want the platform to know about.")
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
state: Optional[AirbyteStateMessage] = Field(
None, description="schema message: the state. Must be the last message produced. The platform uses this information"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(
workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.1
LABEL io.airbyte.version=0.6.2
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/destination-gcs
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/destination-gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ As a community contributor, you can follow these steps to run integration tests.

## Airbyte Employee

- Access the `destination gcs creds` secrets on Last Pass, and put it in `sample_secrets/config.json`.
- Access the `SECRET_DESTINATION-GCS__CREDS` secrets on SecretManager, and put it in `sample_secrets/config.json`.
_ Access the `SECRET_DESTINATION-GCS_NO_MULTIPART_ROLE_CREDS` secrets on SecretManager, and put it in `sample_secrets/insufficient_roles_config.json`.
- Rename the directory from `sample_secrets` to `secrets`.

### GCP Service Account for Testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ dependencies {

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-gcs')
integrationTestJavaImplementation project(':airbyte-workers')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"gcs_bucket_name": "<bucket-name>",
"gcs_bucket_path": "integration-test",
"gcs_bucket_region": "<region>",
"credential": {
"credential_type": "HMAC_KEY",
"hmac_key_access_id": "<access-id-for-user-with-no-multipart-role>",
"hmac_key_secret": "<secret-for-user-with-no-multipart-role>"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand All @@ -16,13 +19,22 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsDestination.class);
public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, "
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list";

public static final String CHECK_ACTIONS_TMP_FILE_NAME = "test";
public static final String DUMMY_TEXT = "This is just a dummy text to write to test file";

public static void main(final String[] args) throws Exception {
new IntegrationRunner(new GcsDestination()).run(args);
Expand All @@ -31,26 +43,80 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
final GcsDestinationConfig destinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(config);
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
s3Client.putObject(destinationConfig.getBucketName(), "test", "check-content");
s3Client.deleteObject(destinationConfig.getBucketName(), "test");

// Test single Upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig);

// Test Multipart Upload permissions
testMultipartUpload(s3Client, destinationConfig);

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
LOGGER.error("Please make sure you account has all of these roles: " + EXPECTED_ROLES);

return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
.getMessage());
}
}

private void testSingleUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig) {
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
s3Client.putObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, DUMMY_TEXT);
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
LOGGER.info("Finished checking for normal upload mode");
}

private void testMultipartUpload(final AmazonS3 s3Client, final GcsDestinationConfig destinationConfig)
throws Exception {

LOGGER.info("Started testing if all required credentials assigned to user for Multipart upload");
final TransferManager tm = TransferManagerBuilder.standard()
.withS3Client(s3Client)
// Sets the size threshold, in bytes, for when to use multipart uploads. Uploads over this size will
// automatically use a multipart upload strategy, while uploads smaller than this threshold will use
// a single connection to upload the whole object. So we need to set it as small for testing
// connection. See javadoc for more details.
.withMultipartUploadThreshold(1024L) // set 1KB as part size
.build();

try {
// TransferManager processes all transfers asynchronously,
// so this call returns immediately.
final Upload upload = tm.upload(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME, getTmpFileToUpload());
upload.waitForCompletion();
s3Client.deleteObject(destinationConfig.getBucketName(), CHECK_ACTIONS_TMP_FILE_NAME);
} finally {
tm.shutdownNow(true);
}
LOGGER.info("Finished verification for multipart upload mode");
}

private File getTmpFileToUpload() throws IOException {
final File tmpFile = File.createTempFile(CHECK_ACTIONS_TMP_FILE_NAME, ".tmp");
try (final FileWriter writer = new FileWriter(tmpFile)) {
// Text should be bigger than Threshold's size to make client use a multipart upload strategy,
// smaller than threshold will use a single connection to upload the whole object even if multipart
// upload option is ON. See {@link TransferManagerBuilder#withMultipartUploadThreshold}
// javadoc for more information.

writer.write(StringUtils.repeat(DUMMY_TEXT, 1000));
}
return tmpFile;
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final GcsWriterFactory formatterFactory = new ProductionWriterFactory();
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog,
formatterFactory, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.gcs;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
Expand All @@ -13,6 +15,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.S3FormatConfig;
Expand All @@ -25,6 +28,7 @@
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,7 +48,8 @@ public abstract class GcsDestinationAcceptanceTest extends DestinationAcceptance
protected static final Logger LOGGER = LoggerFactory.getLogger(GcsDestinationAcceptanceTest.class);
protected static final ObjectMapper MAPPER = MoreMappers.initMapper();

protected final String secretFilePath = "secrets/config.json";
protected static final String SECRET_FILE_PATH = "secrets/config.json";
protected static final String SECRET_FILE_PATH_INSUFFICIENT_ROLES = "secrets/insufficient_roles_config.json";
protected final S3Format outputFormat;
protected JsonNode configJson;
protected GcsDestinationConfig config;
Expand All @@ -55,7 +60,7 @@ protected GcsDestinationAcceptanceTest(final S3Format outputFormat) {
}

protected JsonNode getBaseConfigJson() {
return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath)));
return Jsons.deserialize(IOs.readFile(Path.of(SECRET_FILE_PATH)));
}

@Override
Expand Down Expand Up @@ -147,4 +152,27 @@ protected void tearDown(final TestDestinationEnv testEnv) {
}
}

/**
* Verify that when given user with no Multipart Upload Roles, that check connection returns a
* failed response. Assume that the #getInsufficientRolesFailCheckConfig() returns the service
* account has storage.objects.create permission but not storage.multipartUploads.create.
*/
@Test
public void testCheckConnectionInsufficientRoles() throws Exception {
final JsonNode baseConfigJson = Jsons.deserialize(IOs.readFile(Path.of(
SECRET_FILE_PATH_INSUFFICIENT_ROLES)));

// Set a random GCS bucket path for each integration test
final JsonNode configJson = Jsons.clone(baseConfigJson);
final String testBucketPath = String.format(
"%s_test_%s",
outputFormat.name().toLowerCase(Locale.ROOT),
RandomStringUtils.randomAlphanumeric(5));
((ObjectNode) configJson)
.put("gcs_bucket_path", testBucketPath)
.set("format", getFormatConfig());

assertEquals(Status.FAILED, runCheck(configJson).getStatus());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final ResultSet tableInfo = connection.createStatement()
.executeQuery(String.format("SHOW TABLES LIKE '%s' IN SCHEMA %s;", tableName, schema));
assertTrue(tableInfo.next());
// check that we're creating permanent tables. DBT defaults to transient tables, which have `TRANSIENT` as the value for the `kind` column.
// check that we're creating permanent tables. DBT defaults to transient tables, which have
// `TRANSIENT` as the value for the `kind` column.
assertEquals("TABLE", tableInfo.getString("kind"));
return connection.createStatement()
.executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.2 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.6.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/issues/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.5.1 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
Expand All @@ -170,8 +171,8 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/pull/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.1.11 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
| 0.1.10 | 2021-11-09 | [\#7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |
| 0.1.17 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.1.16 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. |
Expand Down

0 comments on commit 44cb30a

Please sign in to comment.