diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 44dbb45cdcf60..0c996191199c6 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -156,6 +156,7 @@ jobs: ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }} PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }} DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }} + DESTINATION_AZURE_BLOB_CREDS: ${{ secrets.DESTINATION_AZURE_BLOB_CREDS }} DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }} APIFY_INTEGRATION_TEST_CREDS: ${{ secrets.APIFY_INTEGRATION_TEST_CREDS }} DESTINATION_DYNAMODB_TEST_CREDS: ${{ secrets.DESTINATION_DYNAMODB_TEST_CREDS }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 3e94c78c47584..fd2d4d63bfd19 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -156,6 +156,7 @@ jobs: ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }} PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }} DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }} + DESTINATION_AZURE_BLOB_CREDS: ${{ secrets.DESTINATION_AZURE_BLOB_CREDS }} DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }} DESTINATION_DYNAMODB_TEST_CREDS: ${{ secrets.DESTINATION_DYNAMODB_TEST_CREDS }} APIFY_INTEGRATION_TEST_CREDS: ${{ secrets.APIFY_INTEGRATION_TEST_CREDS }} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json new file mode 100644 index 0000000000000..39fad40923c14 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json @@ -0,0 +1,7 @@ +{ + "destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb", + "name": "Azure Blob Storage", + "dockerRepository": "airbyte/destination-azure-blob-storage", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage" +} diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 7bb143ad6db79..1d28cc37a3ad9 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -14,6 +14,11 @@ dockerImageTag: 0.3.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres icon: postgresql.svg +- destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb + name: Azure Blob Storage + dockerRepository: airbyte/destination-azure-blob-storage + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage - destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 name: BigQuery dockerRepository: airbyte/destination-bigquery diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index ceea61e648c09..269789bd05d5b 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -86,6 +86,7 @@ |name |status | | :--- | :--- | +| Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) | | BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) | | Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) | | Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) | diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/.dockerignore b/airbyte-integrations/connectors/destination-azure-blob-storage/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile new file mode 100644 index 0000000000000..618eb625c9efa --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-azure-blob-storage + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-azure-blob-storage diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/README.md b/airbyte-integrations/connectors/destination-azure-blob-storage/README.md new file mode 100644 index 0000000000000..c0f235239edf9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/README.md @@ -0,0 +1,26 @@ +# Azure Blob Storage Test Configuration + +In order to test the Azure Blob Storage destination, you need a Microsoft account. + +## Community Contributor + +As a community contributor, you will need access to Azure to run the integration tests. + +- Create an AzureBlobStorage account for testing. Check if it works under https://portal.azure.com/ -> "Storage explorer (preview)". +- Get your `azure_blob_storage_account_name` and `azure_blob_storage_account_key` that can read and write to the Azure Container. +- Paste the accountName and key information into the config files under [`./sample_secrets`](secrets). +- Rename the directory from `sample_secrets` to `secrets`. +- Feel free to modify the config files with different settings in the acceptance test file (e.g. `AzureBlobStorageJsonlDestinationAcceptanceTest.java`, method `getFormatConfig`), as long as they follow the schema defined in [spec.json](src/main/resources/spec.json). + +## Airbyte Employee +- Access the `Azure Blob Storage Account` secrets on Last Pass. +- Replace the `config.json` under `sample_secrets`. +- Rename the directory from `sample_secrets` to `secrets`. + +## Add New Output Format +- Add a new enum in `AzureBlobStorageFormat'. +- Modify `spec.json` to specify the configuration of this new format. +- Update `AzureBlobStorageFormatConfigs` to be able to construct a config for this new format. +- Create a new package under `io.airbyte.integrations.destination.azure_blob_storage`. +- Implement a new `AzureBlobStorageWriter`. The implementation can extend `BaseAzureBlobStorageWriter`. +- Write an acceptance test for the new output format. The test can extend `AzureBlobStorageDestinationAcceptanceTest`. diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle new file mode 100644 index 0000000000000..24fe77e61cb73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle @@ -0,0 +1,25 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + implementation 'com.azure:azure-storage-blob:12.12.0' + implementation 'org.apache.commons:commons-csv:1.4' + + testImplementation 'org.apache.commons:commons-lang3:3.11' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-azure-blob-storage') +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/sample_secrets/config.json b/airbyte-integrations/connectors/destination-azure-blob-storage/sample_secrets/config.json new file mode 100644 index 0000000000000..878f23245dfb1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/sample_secrets/config.json @@ -0,0 +1,6 @@ +{ + "azure_blob_storage_endpoint_domain_name": "blob.core.windows.net", + "azure_blob_storage_account_name": "your_account_name_here", + "azure_blob_storage_account_key": "your_account_key_here", + "azure_blob_storage_container_name": "testcontainername" +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConnectionChecker.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConnectionChecker.java new file mode 100644 index 0000000000000..b1b982510f383 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConnectionChecker.java @@ -0,0 +1,134 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.azure.core.http.rest.PagedIterable; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageConnectionChecker { + + private static final String TEST_BLOB_NAME_PREFIX = "testConnectionBlob"; + private BlobContainerClient containerClient; // aka schema in SQL DBs + private final AppendBlobClient appendBlobClient; // aka "SQL Table" + + private static final Logger LOGGER = LoggerFactory.getLogger( + AzureBlobStorageConnectionChecker.class); + + public AzureBlobStorageConnectionChecker( + AzureBlobStorageDestinationConfig azureBlobStorageConfig) { + + StorageSharedKeyCredential credential = new StorageSharedKeyCredential( + azureBlobStorageConfig.getAccountName(), + azureBlobStorageConfig.getAccountKey()); + + this.appendBlobClient = + new SpecializedBlobClientBuilder() + .endpoint(azureBlobStorageConfig.getEndpointUrl()) + .credential(credential) + .containerName(azureBlobStorageConfig.getContainerName()) // Like schema in DB + .blobName(TEST_BLOB_NAME_PREFIX + UUID.randomUUID()) // Like table in DB + .buildAppendBlobClient(); + } + + /* + * This a kinda test method that is used in CHECK operation to make sure all works fine with the + * current config + */ + public void attemptWriteAndDelete() { + initTestContainerAndBlob(); + writeUsingAppendBlock("Some test data"); + listBlobsInContainer() + .forEach( + blobItem -> LOGGER.info( + "Blob name: " + blobItem.getName() + "Snapshot: " + blobItem.getSnapshot())); + + deleteBlob(); + } + + private void initTestContainerAndBlob() { + // create container if absent (aka SQl Schema) + this.containerClient = appendBlobClient.getContainerClient(); + if (!containerClient.exists()) { + containerClient.create(); + } + + // create a storage container if absent (aka Table is SQL BD) + if (!appendBlobClient.exists()) { + appendBlobClient.create(); + LOGGER.info("blobContainerClient created"); + } else { + LOGGER.info("blobContainerClient already exists"); + } + } + + /* + * This options may be used to write and flush right away. Note: Azure SDK fails for empty lines, + * but those are not supposed to be written here + */ + public void writeUsingAppendBlock(String data) { + LOGGER.info("Writing test data to Azure Blob storage: " + data); + InputStream dataStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + + Integer blobCommittedBlockCount = appendBlobClient.appendBlock(dataStream, data.length()) + .getBlobCommittedBlockCount(); + + LOGGER.info("blobCommittedBlockCount: " + blobCommittedBlockCount); + } + + /* + * List the blob(s) in our container. + */ + public PagedIterable listBlobsInContainer() { + return containerClient.listBlobs(); + } + + /* + * Delete the blob we created earlier. + */ + public void deleteBlob() { + LOGGER.info("Deleting blob: " + appendBlobClient.getBlobName()); + appendBlobClient.delete(); // remove aka "SQL Table" used + } + + /* + * Delete the Container. Be very careful when you ise ir. It removes thw whole bucket and supposed + * to be used in check connection ony for writing tmp data + */ + public void deleteContainer() { + LOGGER.info("Deleting blob: " + containerClient.getBlobContainerName()); + containerClient.delete(); // remove aka "SQL Schema" used + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java new file mode 100644 index 0000000000000..6d41ac69eee85 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -0,0 +1,184 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageConsumer.class); + + private final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final AzureBlobStorageWriterFactory writerFactory; + private final Consumer outputRecordCollector; + private final Map streamNameAndNamespaceToWriters; + + private AirbyteMessage lastStateMessage = null; + + public AzureBlobStorageConsumer( + AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, + ConfiguredAirbyteCatalog configuredCatalog, + AzureBlobStorageWriterFactory writerFactory, + Consumer outputRecordCollector) { + this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig; + this.configuredCatalog = configuredCatalog; + this.writerFactory = writerFactory; + this.outputRecordCollector = outputRecordCollector; + this.streamNameAndNamespaceToWriters = new HashMap<>(configuredCatalog.getStreams().size()); + } + + @Override + protected void startTracked() throws Exception { + // Init the client itself here + StorageSharedKeyCredential credential = new StorageSharedKeyCredential( + azureBlobStorageDestinationConfig.getAccountName(), + azureBlobStorageDestinationConfig.getAccountKey()); + + SpecializedBlobClientBuilder specializedBlobClientBuilder = new SpecializedBlobClientBuilder() + .endpoint(azureBlobStorageDestinationConfig.getEndpointUrl()) + .credential(credential) + .containerName( + azureBlobStorageDestinationConfig + .getContainerName());// Like schema (or even oracle user) in DB + + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + + AppendBlobClient appendBlobClient = specializedBlobClientBuilder + .blobName(configuredStream.getStream().getName()) + .buildAppendBlobClient(); + + boolean isNewlyCreatedBlob = createContainers(appendBlobClient, configuredStream); + + AzureBlobStorageWriter writer = writerFactory + .create(azureBlobStorageDestinationConfig, appendBlobClient, configuredStream, + isNewlyCreatedBlob); + + AirbyteStream stream = configuredStream.getStream(); + AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair + .fromAirbyteSteam(stream); + streamNameAndNamespaceToWriters.put(streamNamePair, writer); + } + } + + private boolean createContainers(AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream) { + // create container if absent (aka SQl Schema) + final BlobContainerClient containerClient = appendBlobClient.getContainerClient(); + if (!containerClient.exists()) { + containerClient.create(); + } + // create a storage container if absent (aka Table is SQL BD) + if (SyncMode.FULL_REFRESH.equals(configuredStream.getSyncMode())) { + // full refresh sync. Create blob and override if any + LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically" + + " created or all data would be overridden (if any) for stream:" + configuredStream + .getStream().getName()); + appendBlobClient.create(true); + return true; + } else { + // incremental sync. Create new container only if still absent + if (!appendBlobClient.exists()) { + LOGGER.info("Sync mode is selected to APPEND mode. New container will be automatically" + + " created for stream:" + configuredStream.getStream().getName()); + appendBlobClient.create(false); + LOGGER.info(appendBlobClient.getBlobName() + " blob has been created"); + return true; + } else { + LOGGER.info(String.format( + "Sync mode is selected to APPEND mode. Container %s already exists. Append mode is " + + "only available for \"Append blobs\". For more details please visit" + + " https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs", + configuredStream.getStream().getName())); + LOGGER.info(appendBlobClient.getBlobName() + " already exists"); + return false; + } + } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) throws Exception { + if (airbyteMessage.getType() == Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != Type.RECORD) { + return; + } + + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!streamNameAndNamespaceToWriters.containsKey(pair)) { + String errMsg = String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage)); + LOGGER.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + + try { + streamNameAndNamespaceToWriters.get(pair).write(UUID.randomUUID(), recordMessage); + + } catch (Exception e) { + LOGGER.error(String.format("Failed to write messagefor stream %s, details: %s", + streamNameAndNamespaceToWriters.get(pair), e.getMessage())); + throw new RuntimeException(e); + } + } + + @Override + protected void close(boolean hasFailed) throws Exception { + for (AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) { + handler.close(hasFailed); + } + + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestination.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestination.java new file mode 100644 index 0000000000000..373a369fb248b --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestination.java @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; +import io.airbyte.integrations.destination.azure_blob_storage.writer.ProductionWriterFactory; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageDestination.class); + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new AzureBlobStorageDestination()).run(args); + } + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try { + AzureBlobStorageConnectionChecker client = new AzureBlobStorageConnectionChecker( + AzureBlobStorageDestinationConfig.getAzureBlobStorageConfig(config)); + client.attemptWriteAndDelete(); + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception attempting to access the azure blob storage bucket: ", e); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage( + "Could not connect to the azure blob storage with the provided configuration. \n" + e + .getMessage()); + } + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + AzureBlobStorageWriterFactory formatterFactory = new ProductionWriterFactory(); + return new AzureBlobStorageConsumer( + AzureBlobStorageDestinationConfig.getAzureBlobStorageConfig(config), configuredCatalog, + formatterFactory, outputRecordCollector); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java new file mode 100644 index 0000000000000..ad0f25f34453d --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java @@ -0,0 +1,98 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import static io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConstants.*; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Locale; + +public class AzureBlobStorageDestinationConfig { + + private final String endpointUrl; + private final String accountName; + private final String accountKey; + private final String containerName; + private final AzureBlobStorageFormatConfig formatConfig; + + public AzureBlobStorageDestinationConfig( + String endpointUrl, + String accountName, + String accountKey, + String containerName, + AzureBlobStorageFormatConfig formatConfig) { + this.endpointUrl = endpointUrl; + this.accountName = accountName; + this.accountKey = accountKey; + this.containerName = containerName; + this.formatConfig = formatConfig; + } + + public String getEndpointUrl() { + return endpointUrl; + } + + public String getAccountName() { + return accountName; + } + + public String getAccountKey() { + return accountKey; + } + + public String getContainerName() { + return containerName; + } + + public AzureBlobStorageFormatConfig getFormatConfig() { + return formatConfig; + } + + public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(JsonNode config) { + final String accountNameFomConfig = config.get("azure_blob_storage_account_name").asText(); + final String accountKeyFromConfig = config.get("azure_blob_storage_account_key").asText(); + final JsonNode endpointFromConfig = config + .get("azure_blob_storage_endpoint_domain_name"); + final JsonNode containerName = config.get("azure_blob_storage_container_name"); + final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId + + final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT, + DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL, + accountNameFomConfig, + endpointFromConfig == null ? DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME + : endpointFromConfig.asText()); + + final String containerNameComputed = + containerName == null ? DEFAULT_STORAGE_CONTAINER_NAME : containerName.asText(); + + return new AzureBlobStorageDestinationConfig( + endpointComputed, + accountNameFomConfig, + accountKeyFromConfig, + containerNameComputed, + AzureBlobStorageFormatConfigs.getAzureBlobStorageFormatConfig(config)); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java new file mode 100644 index 0000000000000..a577f1fa2b2a5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConstants.java @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +public final class AzureBlobStorageDestinationConstants { + + public static final String DEFAULT_STORAGE_CONTAINER_NAME = "airbytecontainer"; + public static final String DEFAULT_STORAGE_ENDPOINT_HTTP_PROTOCOL = "https"; + public static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net"; + public static final String DEFAULT_STORAGE_ENDPOINT_FORMAT = "%s://%s.%s"; + + private AzureBlobStorageDestinationConstants() {} + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormat.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormat.java new file mode 100644 index 0000000000000..5ac09bfe49f8a --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormat.java @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +public enum AzureBlobStorageFormat { + + CSV("csv"), + JSONL("jsonl"); + + private final String fileExtension; + + AzureBlobStorageFormat(String fileExtension) { + this.fileExtension = fileExtension; + } + + public String getFileExtension() { + return fileExtension; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfig.java new file mode 100644 index 0000000000000..d85d0e9231a31 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfig.java @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.fasterxml.jackson.databind.JsonNode; + +public interface AzureBlobStorageFormatConfig { + + AzureBlobStorageFormat getFormat(); + + static String withDefault(JsonNode config, String property, String defaultValue) { + JsonNode value = config.get(property); + if (value == null || value.isNull()) { + return defaultValue; + } + return value.asText(); + } + + static int withDefault(JsonNode config, String property, int defaultValue) { + JsonNode value = config.get(property); + if (value == null || value.isNull()) { + return defaultValue; + } + return value.asInt(); + } + + static boolean withDefault(JsonNode config, String property, boolean defaultValue) { + JsonNode value = config.get(property); + if (value == null || value.isNull()) { + return defaultValue; + } + return value.asBoolean(); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigs.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigs.java new file mode 100644 index 0000000000000..f93668dc4efeb --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigs.java @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvFormatConfig; +import io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlFormatConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageFormatConfigs { + + protected static final Logger LOGGER = LoggerFactory + .getLogger(AzureBlobStorageFormatConfigs.class); + + public static AzureBlobStorageFormatConfig getAzureBlobStorageFormatConfig(JsonNode config) { + JsonNode formatConfig = config.get("format"); + LOGGER.info("Azure Blob Storage format config: {}", formatConfig.toString()); + AzureBlobStorageFormat formatType = AzureBlobStorageFormat + .valueOf(formatConfig.get("format_type").asText().toUpperCase()); + + switch (formatType) { + case CSV -> { + return new AzureBlobStorageCsvFormatConfig(formatConfig); + } + case JSONL -> { + return new AzureBlobStorageJsonlFormatConfig(); + } + default -> { + throw new RuntimeException("Unexpected output format: " + Jsons.serialize(config)); + } + } + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfig.java new file mode 100644 index 0000000000000..859bbdff49912 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfig.java @@ -0,0 +1,84 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormat; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormatConfig; + +public class AzureBlobStorageCsvFormatConfig implements AzureBlobStorageFormatConfig { + + public enum Flattening { + + // These values must match the format / csv_flattening enum values in spec.json. + NO("No flattening"), + ROOT_LEVEL("Root level flattening"); + + private final String value; + + Flattening(String value) { + this.value = value; + } + + @JsonCreator + public static Flattening fromValue(String value) { + for (Flattening f : Flattening.values()) { + if (f.value.equalsIgnoreCase(value)) { + return f; + } + } + throw new IllegalArgumentException("Unexpected value: " + value); + } + + public String getValue() { + return value; + } + + } + + private final Flattening flattening; + + public AzureBlobStorageCsvFormatConfig(JsonNode formatConfig) { + this.flattening = Flattening.fromValue(formatConfig.get("flattening").asText()); + } + + @Override + public AzureBlobStorageFormat getFormat() { + return AzureBlobStorageFormat.CSV; + } + + public Flattening getFlattening() { + return flattening; + } + + @Override + public String toString() { + return "AzureBlobStorageCsvFormatConfig{" + + "flattening=" + flattening + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java new file mode 100644 index 0000000000000..880b31a584fd1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -0,0 +1,99 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.BlobOutputStream; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter; +import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.QuoteMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implements + AzureBlobStorageWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageCsvWriter.class); + + private final CsvSheetGenerator csvSheetGenerator; + private final CSVPrinter csvPrinter; + private final BlobOutputStream blobOutputStream; + + public AzureBlobStorageCsvWriter(AzureBlobStorageDestinationConfig config, + AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream, + boolean isNewlyCreatedBlob) + throws IOException { + super(config, appendBlobClient, configuredStream); + + AzureBlobStorageCsvFormatConfig formatConfig = (AzureBlobStorageCsvFormatConfig) config + .getFormatConfig(); + + this.csvSheetGenerator = CsvSheetGenerator.Factory + .create(configuredStream.getStream().getJsonSchema(), + formatConfig); + + this.blobOutputStream = appendBlobClient.getBlobOutputStream(); + + if (isNewlyCreatedBlob) { + this.csvPrinter = new CSVPrinter( + new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) + .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); + } else { + // no header required for append + this.csvPrinter = new CSVPrinter( + new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8), + CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)); + } + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException { + csvPrinter.printRecord(csvSheetGenerator.getDataRow(id, recordMessage)); + } + + @Override + protected void closeWhenSucceed() throws IOException { + LOGGER.info("Closing csvPrinter when succeed"); + csvPrinter.close(); + } + + @Override + protected void closeWhenFail() throws IOException { + LOGGER.info("Closing csvPrinter when failed"); + csvPrinter.close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/BaseSheetGenerator.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/BaseSheetGenerator.java new file mode 100644 index 0000000000000..c6b887aee8c9e --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/BaseSheetGenerator.java @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +/** + * CSV data row = ID column + timestamp column + record columns. This class takes care of the first + * two columns, which is shared by downstream implementations. + */ +public abstract class BaseSheetGenerator implements CsvSheetGenerator { + + public List getDataRow(UUID id, AirbyteRecordMessage recordMessage) { + List data = new LinkedList<>(); + data.add(id); + data.add(recordMessage.getEmittedAt()); + data.addAll(getRecordColumns(recordMessage.getData())); + return data; + } + + abstract List getRecordColumns(JsonNode json); + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/CsvSheetGenerator.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/CsvSheetGenerator.java new file mode 100644 index 0000000000000..e5e1f4ca56cd6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/CsvSheetGenerator.java @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvFormatConfig.Flattening; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.List; +import java.util.UUID; + +/** + * This class takes case of the generation of the CSV data sheet, including the header row and the + * data row. + */ +public interface CsvSheetGenerator { + + List getHeaderRow(); + + List getDataRow(UUID id, AirbyteRecordMessage recordMessage); + + final class Factory { + + public static CsvSheetGenerator create(JsonNode jsonSchema, AzureBlobStorageCsvFormatConfig formatConfig) { + if (formatConfig.getFlattening() == Flattening.NO) { + return new NoFlatteningSheetGenerator(); + } else if (formatConfig.getFlattening() == Flattening.ROOT_LEVEL) { + return new RootLevelFlatteningSheetGenerator(jsonSchema); + } else { + throw new IllegalArgumentException( + "Unexpected flattening config: " + formatConfig.getFlattening()); + } + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGenerator.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGenerator.java new file mode 100644 index 0000000000000..6929b67a84673 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGenerator.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.util.Collections; +import java.util.List; + +public class NoFlatteningSheetGenerator extends BaseSheetGenerator implements CsvSheetGenerator { + + @Override + public List getHeaderRow() { + return Lists.newArrayList( + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + JavaBaseConstants.COLUMN_NAME_DATA); + } + + /** + * When no flattening is needed, the record column is just one json blob. + */ + @Override + List getRecordColumns(JsonNode json) { + return Collections.singletonList(Jsons.serialize(json)); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGenerator.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGenerator.java new file mode 100644 index 0000000000000..061ec4bc600cd --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGenerator.java @@ -0,0 +1,81 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +public class RootLevelFlatteningSheetGenerator extends BaseSheetGenerator implements CsvSheetGenerator { + + /** + * Keep a header list to iterate the input json object with a defined order. + */ + private final List recordHeaders; + + public RootLevelFlatteningSheetGenerator(JsonNode jsonSchema) { + this.recordHeaders = MoreIterators.toList(jsonSchema.get("properties").fieldNames()) + .stream().sorted().collect(Collectors.toList());; + } + + @Override + public List getHeaderRow() { + List headers = Lists.newArrayList(JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + headers.addAll(recordHeaders); + return headers; + } + + /** + * With root level flattening, the record columns are the first level fields of the json. + */ + @Override + List getRecordColumns(JsonNode json) { + List values = new LinkedList<>(); + for (String field : recordHeaders) { + JsonNode value = json.get(field); + if (value == null) { + values.add(""); + } else if (value.isValueNode()) { + // Call asText method on value nodes so that proper string + // representation of json values can be returned by Jackson. + // Otherwise, CSV printer will just call the toString method, + // which can be problematic (e.g. text node will have extra + // double quotation marks around its text value). + values.add(value.asText()); + } else { + values.add(Jsons.serialize(value)); + } + } + + return values; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlFormatConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlFormatConfig.java new file mode 100644 index 0000000000000..1a28c6d3133c6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlFormatConfig.java @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.jsonl; + +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormat; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormatConfig; + +public class AzureBlobStorageJsonlFormatConfig implements AzureBlobStorageFormatConfig { + + @Override + public AzureBlobStorageFormat getFormat() { + return AzureBlobStorageFormat.JSONL; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java new file mode 100644 index 0000000000000..d6fd79f2fb1c8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java @@ -0,0 +1,89 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.jsonl; + +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter; +import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter implements + AzureBlobStorageWriter { + + protected static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageJsonlWriter.class); + + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + private static final ObjectWriter WRITER = MAPPER.writer(); + + private final BlobOutputStream blobOutputStream; + private final PrintWriter printWriter; + + public AzureBlobStorageJsonlWriter(AzureBlobStorageDestinationConfig config, + AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream, + boolean isNewlyCreatedBlob) { + super(config, appendBlobClient, configuredStream); + // at this moment we already receive appendBlobClient initialized + this.blobOutputStream = appendBlobClient.getBlobOutputStream(); + this.printWriter = new PrintWriter(blobOutputStream, true, StandardCharsets.UTF_8); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) { + ObjectNode json = MAPPER.createObjectNode(); + json.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString()); + json.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()); + json.set(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData()); + printWriter.println(Jsons.serialize(json)); + } + + @Override + protected void closeWhenSucceed() throws IOException { + // this would also close the blobOutputStream + printWriter.close(); + } + + @Override + protected void closeWhenFail() throws IOException { + // this would also close the blobOutputStream + printWriter.close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriter.java new file mode 100644 index 0000000000000..360f86d38bd30 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriter.java @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.writer; + +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.IOException; +import java.util.UUID; + +/** + * {@link AzureBlobStorageWriter} is responsible for writing Airbyte stream data to an + * AzureBlobStorage location in a specific format. + */ +public interface AzureBlobStorageWriter { + + /** + * Write an Airbyte record message to an AzureBlobStorage object. + */ + void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException; + + /** + * Close the AzureBlobStorage writer for the stream. + */ + void close(boolean hasFailed) throws IOException; + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java new file mode 100644 index 0000000000000..6534c1a1f2fa6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.writer; + +import com.azure.storage.blob.specialized.AppendBlobClient; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; + +/** + * Create different {@link AzureBlobStorageWriter} based on + * {@link AzureBlobStorageDestinationConfig}. + */ +public interface AzureBlobStorageWriterFactory { + + AzureBlobStorageWriter create(AzureBlobStorageDestinationConfig config, + AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream, + boolean isNewlyCreatedBlob) + throws Exception; + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java new file mode 100644 index 0000000000000..0a3c9aeba34e8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.writer; + +import com.azure.storage.blob.specialized.AppendBlobClient; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The base implementation takes care of the following: + *
  • Create shared instance variables.
  • + *
  • Create the bucket and prepare the bucket path.
  • + *
  • Log and close the write.
  • + */ +public abstract class BaseAzureBlobStorageWriter implements AzureBlobStorageWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseAzureBlobStorageWriter.class); + + protected final AzureBlobStorageDestinationConfig config; + protected final AppendBlobClient appendBlobClient; + protected final AirbyteStream stream; + protected final DestinationSyncMode syncMode; + + protected BaseAzureBlobStorageWriter(AzureBlobStorageDestinationConfig config, + AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream) { + this.config = config; + this.appendBlobClient = appendBlobClient; + this.stream = configuredStream.getStream(); + this.syncMode = configuredStream.getDestinationSyncMode(); + } + + /** + * Log and close the write. + */ + @Override + public void close(boolean hasFailed) throws IOException { + if (hasFailed) { + LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName()); + closeWhenFail(); + LOGGER.warn("Upload of stream '{}' aborted.", stream.getName()); + } else { + LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName()); + closeWhenSucceed(); + LOGGER.info("Upload completed for stream '{}'.", stream.getName()); + } + } + + /** + * Operations that will run when the write succeeds. + */ + protected void closeWhenSucceed() throws IOException { + // Do nothing by default + } + + /** + * Operations that will run when the write fails. + */ + protected void closeWhenFail() throws IOException { + // Do nothing by default + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java new file mode 100644 index 0000000000000..4f3979d3f77a6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java @@ -0,0 +1,63 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.writer; + +import com.azure.storage.blob.specialized.AppendBlobClient; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; +import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormat; +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvWriter; +import io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlWriter; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProductionWriterFactory implements AzureBlobStorageWriterFactory { + + protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class); + + @Override + public AzureBlobStorageWriter create(AzureBlobStorageDestinationConfig config, + AppendBlobClient appendBlobClient, + ConfiguredAirbyteStream configuredStream, + boolean isNewlyCreatedBlob) + throws Exception { + AzureBlobStorageFormat format = config.getFormatConfig().getFormat(); + + if (format == AzureBlobStorageFormat.CSV) { + LOGGER.debug("Picked up CSV format writer"); + return new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream, + isNewlyCreatedBlob); + } + + if (format == AzureBlobStorageFormat.JSONL) { + LOGGER.debug("Picked up JSONL format writer"); + return new AzureBlobStorageJsonlWriter(config, appendBlobClient, configuredStream, + isNewlyCreatedBlob); + } + + throw new RuntimeException("Unexpected AzureBlobStorage destination format: " + format); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json new file mode 100644 index 0000000000000..b3b7c6ea78979 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/resources/spec.json @@ -0,0 +1,81 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "AzureBlobStorage Destination Spec", + "type": "object", + "required": [ + "azure_blob_storage_account_name", + "azure_blob_storage_account_key", + "format" + ], + "additionalProperties": false, + "properties": { + "azure_blob_storage_endpoint_domain_name": { + "title": "Endpoint Domain Name", + "type": "string", + "default": "blob.core.windows.net", + "description": "This is Azure Blob Storage endpoint domain name. Leave default value (or leave it empty if run container from command line) to use Microsoft native from example.", + "examples": ["blob.core.windows.net"] + }, + "azure_blob_storage_container_name": { + "title": "Azure blob storage container (Bucket) Name", + "type": "string", + "description": "The name of the Azure blob storage container. If not exists - will be created automatically. May be empty, then will be created automatically airbytecontainer+timestamp", + "examples": ["airbytetescontainername"] + }, + "azure_blob_storage_account_name": { + "title": "Azure Blob Storage account name", + "type": "string", + "description": "The account's name of the Azure Blob Storage.", + "examples": ["airbyte5storage"] + }, + "azure_blob_storage_account_key": { + "description": "The Azure blob storage account key.", + "airbyte_secret": true, + "type": "string", + "examples": [ + "Z8ZkZpteggFx394vm+PJHnGTvdRncaYS+JhLKdj789YNmD+iyGTnG+PV+POiuYNhBg/ACS+LKjd%4FG3FHGN12Nd==" + ] + }, + "format": { + "title": "Output Format", + "type": "object", + "description": "Output data format", + "oneOf": [ + { + "title": "CSV: Comma-Separated Values", + "required": ["format_type", "flattening"], + "properties": { + "format_type": { + "type": "string", + "const": "CSV" + }, + "flattening": { + "type": "string", + "title": "Normalization (Flattening)", + "description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.", + "default": "No flattening", + "enum": ["No flattening", "Root level flattening"] + } + } + }, + { + "title": "JSON Lines: newline-delimited JSON", + "required": ["format_type"], + "properties": { + "format_type": { + "type": "string", + "const": "JSONL" + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..98ceef883c63c --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationAcceptanceTest.java @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import java.nio.file.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class AzureBlobDestinationAcceptanceTest { + + protected final String secretFilePath = "secrets/config.json"; + private JsonNode config; + + @BeforeEach + public void beforeAll() { + final JsonNode configFomSecrets = Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", configFomSecrets.get("azure_blob_storage_account_name")) + .put("azure_blob_storage_account_key", configFomSecrets.get("azure_blob_storage_account_key")) + .put("azure_blob_storage_endpoint_domain_name", configFomSecrets.get("azure_blob_storage_endpoint_domain_name")) + .put("format", getJsonlFormatConfig()) + .build()); + } + + @Test + public void testCheck() { + final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); + final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(config); + + assertEquals(Status.SUCCEEDED, checkResult.getStatus()); + } + + @Test + public void testCheckInvalidAccountName() { + final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "someInvalidName") + .put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key")) + .build()); + + final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); + final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); + + assertEquals(Status.FAILED, checkResult.getStatus()); + } + + @Test + public void testCheckInvalidKey() { + final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name")) + .put("azure_blob_storage_account_key", "someInvalidKey") + .build()); + final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); + final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); + + assertEquals(Status.FAILED, checkResult.getStatus()); + } + + @Test + public void testCheckInvaliDomainName() { + final JsonNode invalidConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", config.get("azure_blob_storage_account_name")) + .put("azure_blob_storage_account_key", config.get("azure_blob_storage_account_key")) + .put("azure_blob_storage_endpoint_domain_name", "invalidDomain.com.invalid123") + .build()); + final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); + final AirbyteConnectionStatus checkResult = azureBlobStorageDestination.check(invalidConfig); + + assertEquals(Status.FAILED, checkResult.getStatus()); + } + + private JsonNode getJsonlFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"JSONL\"\n" + + "}"); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..3258b757bec05 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java @@ -0,0 +1,126 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.StreamSupport; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.csv.QuoteMode; + +public class AzureBlobStorageCsvDestinationAcceptanceTest extends + AzureBlobStorageDestinationAcceptanceTest { + + public AzureBlobStorageCsvDestinationAcceptanceTest() { + super(AzureBlobStorageFormat.CSV); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"CSV\",\n" + + " \"flattening\": \"Root level flattening\"\n" + + "}"); + } + + /** + * Convert json_schema to a map from field name to field types. + */ + private static Map getFieldTypes(JsonNode streamSchema) { + Map fieldTypes = new HashMap<>(); + JsonNode fieldDefinitions = streamSchema.get("properties"); + Iterator> iterator = fieldDefinitions.fields(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + fieldTypes.put(entry.getKey(), entry.getValue().get("type").asText()); + } + return fieldTypes; + } + + private static JsonNode getJsonNode(Map input, Map fieldTypes) { + ObjectNode json = MAPPER.createObjectNode(); + + if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) { + return Jsons.deserialize(input.get(JavaBaseConstants.COLUMN_NAME_DATA)); + } + + for (Entry entry : input.entrySet()) { + String key = entry.getKey(); + if (key.equals(JavaBaseConstants.COLUMN_NAME_AB_ID) || key + .equals(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) { + continue; + } + String value = entry.getValue(); + if (value == null || value.equals("")) { + continue; + } + String type = fieldTypes.get(key); + switch (type) { + case "boolean" -> json.put(key, Boolean.valueOf(value)); + case "integer" -> json.put(key, Integer.valueOf(value)); + case "number" -> json.put(key, Double.valueOf(value)); + default -> json.put(key, value); + } + } + return json; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + String allSyncedObjects = getAllSyncedObjects(streamName); + + Map fieldTypes = getFieldTypes(streamSchema); + List jsonRecords = new LinkedList<>(); + + try (Reader in = new StringReader(allSyncedObjects)) { + Iterable records = CSVFormat.DEFAULT + .withQuoteMode(QuoteMode.NON_NUMERIC) + .withFirstRecordAsHeader() + .parse(in); + + StreamSupport.stream(records.spliterator(), false) + .forEach(r -> jsonRecords.add(getJsonNode(r.toMap(), fieldTypes))); + } + + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..ba9ed11233780 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java @@ -0,0 +1,163 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AzureBlobStorageDestinationAcceptanceTest extends DestinationAcceptanceTest { + + protected static final Logger LOGGER = LoggerFactory + .getLogger(AzureBlobStorageDestinationAcceptanceTest.class); + protected static final ObjectMapper MAPPER = MoreMappers.initMapper(); + + protected final String secretFilePath = "secrets/config.json"; + protected final AzureBlobStorageFormat outputFormat; + protected JsonNode configJson; + protected AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig; + protected SpecializedBlobClientBuilder specializedBlobClientBuilder; + protected StorageSharedKeyCredential credential; + + protected AzureBlobStorageDestinationAcceptanceTest(AzureBlobStorageFormat outputFormat) { + this.outputFormat = outputFormat; + } + + protected JsonNode getBaseConfigJson() { + return Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))); + } + + @Override + protected String getImageName() { + return "airbyte/destination-azure-blob-storage:dev"; + } + + @Override + protected JsonNode getConfig() { + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "invalidAccountName") + .put("azure_blob_storage_account_key", "invalidAccountKey") + .put("azure_blob_storage_endpoint_domain_name", "InvalidDomainName") + .put("format", getFormatConfig()) + .build()); + } + + /** + * Helper method to retrieve all synced objects inside the configured bucket path. + */ + @Deprecated + protected String getAllSyncedObjects(String streamName) { + AppendBlobClient appendBlobClient = specializedBlobClientBuilder + .blobName(streamName) + .buildAppendBlobClient(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + appendBlobClient.download(outputStream); + String result = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + + LOGGER.info("All objects: " + result); + return result; + + } + + protected abstract JsonNode getFormatConfig(); + + /** + * This method does the following: + *
  • Construct the Azure Blob destination config.
  • + *
  • Construct the Azure Blob client.
  • + */ + @Override + protected void setup(TestDestinationEnv testEnv) { + JsonNode baseConfigJson = getBaseConfigJson(); + + configJson = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", + baseConfigJson.get("azure_blob_storage_account_name")) + .put("azure_blob_storage_account_key", baseConfigJson.get("azure_blob_storage_account_key")) + .put("azure_blob_storage_endpoint_domain_name", + baseConfigJson.get("azure_blob_storage_endpoint_domain_name")) + .put("azure_blob_storage_container_name", + baseConfigJson.get("azure_blob_storage_container_name").asText() + + System.currentTimeMillis()) + .put("format", getFormatConfig()) + .build()); + + this.azureBlobStorageDestinationConfig = AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(configJson); + + this.credential = new StorageSharedKeyCredential( + azureBlobStorageDestinationConfig.getAccountName(), + azureBlobStorageDestinationConfig.getAccountKey()); + + this.specializedBlobClientBuilder = new SpecializedBlobClientBuilder() + .endpoint(azureBlobStorageDestinationConfig.getEndpointUrl()) + .credential(credential) + .containerName( + azureBlobStorageDestinationConfig.getContainerName());// Like user\schema in DB + + } + + /** + * Remove all the Container output from the tests. + */ + @Override + protected void tearDown(TestDestinationEnv testEnv) { + BlobServiceClient storageClient = + new BlobServiceClientBuilder() + .endpoint(azureBlobStorageDestinationConfig.getEndpointUrl()) + .credential(credential) + .buildClient(); + + BlobContainerClient blobContainerClient = storageClient + .getBlobContainerClient(azureBlobStorageDestinationConfig.getContainerName()); + + if (blobContainerClient.exists()) { + LOGGER.info("Deleting test env: " + azureBlobStorageDestinationConfig.getContainerName()); + blobContainerClient.delete(); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..66f85b59b6dcd --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class AzureBlobStorageJsonlDestinationAcceptanceTest extends + AzureBlobStorageDestinationAcceptanceTest { + + protected AzureBlobStorageJsonlDestinationAcceptanceTest() { + super(AzureBlobStorageFormat.JSONL); + } + + @Override + protected JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"JSONL\"\n" + + "}"); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException { + + String allSyncedObjects = getAllSyncedObjects(streamName); + List jsonRecords = new LinkedList<>(); + + allSyncedObjects.lines().forEach(line -> { + jsonRecords.add(Jsons.deserialize(line).get(JavaBaseConstants.COLUMN_NAME_DATA)); + }); + return jsonRecords; + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java new file mode 100644 index 0000000000000..d56ffe69b9621 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobDestinationTest.java @@ -0,0 +1,109 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.ConnectorSpecification; +import org.junit.jupiter.api.Test; + +public class AzureBlobDestinationTest { + + @Test + public void testConfigObjectCustomDomainName() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "accName") + .put("azure_blob_storage_account_key", "accKey") + .put("azure_blob_storage_endpoint_domain_name", "accDomainName.com") + .put("format", getFormatConfig()) + .build()); + final AzureBlobStorageDestinationConfig azureBlobStorageConfig = AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(config); + + assertEquals("https://accName.accDomainName.com", + azureBlobStorageConfig.getEndpointUrl()); + } + + @Test + public void testConfigObjectDefaultDomainName() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "accName") + .put("azure_blob_storage_account_key", "accKey") + .put("format", getFormatConfig()) + .build()); + final AzureBlobStorageDestinationConfig azureBlobStorageConfig = AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(config); + + assertEquals("https://accName.blob.core.windows.net", + azureBlobStorageConfig.getEndpointUrl()); + } + + @Test + public void testConfigObjectDefaultBlobName() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "accName") + .put("azure_blob_storage_account_key", "accKey") + .put("format", getFormatConfig()) + .build()); + final AzureBlobStorageDestinationConfig azureBlobStorageConfig = + AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(config); + + assertNotNull(azureBlobStorageConfig); + } + + @Test + public void testConfigObjectDefaultContainerName() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("azure_blob_storage_account_name", "accName") + .put("azure_blob_storage_account_key", "accKey") + .put("format", getFormatConfig()) + .build()); + final AzureBlobStorageDestinationConfig azureBlobStorageConfig = AzureBlobStorageDestinationConfig + .getAzureBlobStorageConfig(config); + + assertEquals("airbytecontainer", azureBlobStorageConfig.getContainerName()); + } + + @Test + public void testSpec() throws Exception { + final AzureBlobStorageDestination azureBlobStorageDestination = new AzureBlobStorageDestination(); + final ConnectorSpecification spec = azureBlobStorageDestination.spec(); + final JsonNode connectionSpecification = spec.getConnectionSpecification(); + + assertNotNull(connectionSpecification); + } + + private JsonNode getFormatConfig() { + return Jsons.deserialize("{\n" + + " \"format_type\": \"JSONL\"\n" + + "}"); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigsTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigsTest.java new file mode 100644 index 0000000000000..507058e49c7b1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageFormatConfigsTest.java @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvFormatConfig; +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvFormatConfig.Flattening; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@DisplayName("AzureBlobStorageFormatConfigs") +public class AzureBlobStorageFormatConfigsTest { + + private static final ObjectMapper mapper = MoreMappers.initMapper(); + + @Test + @DisplayName("When CSV format is specified, it returns CSV format config") + public void testGetCsvS3FormatConfig() { + ObjectNode stubFormatConfig = mapper.createObjectNode(); + stubFormatConfig.put("format_type", AzureBlobStorageFormat.CSV.toString()); + stubFormatConfig.put("flattening", Flattening.ROOT_LEVEL.getValue()); + + ObjectNode stubConfig = mapper.createObjectNode(); + stubConfig.set("format", stubFormatConfig); + AzureBlobStorageFormatConfig formatConfig = AzureBlobStorageFormatConfigs + .getAzureBlobStorageFormatConfig(stubConfig); + assertEquals(formatConfig.getFormat(), AzureBlobStorageFormat.CSV); + assertTrue(formatConfig instanceof AzureBlobStorageCsvFormatConfig); + AzureBlobStorageCsvFormatConfig csvFormatConfig = (AzureBlobStorageCsvFormatConfig) formatConfig; + assertEquals(csvFormatConfig.getFlattening(), Flattening.ROOT_LEVEL); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfigTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfigTest.java new file mode 100644 index 0000000000000..cb394443e14f6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvFormatConfigTest.java @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.integrations.destination.azure_blob_storage.csv.AzureBlobStorageCsvFormatConfig.Flattening; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@DisplayName("AzureBlobStorageCsvFormatConfig") +public class AzureBlobStorageCsvFormatConfigTest { + + @Test + @DisplayName("Flattening enums can be created from value string") + public void testFlatteningCreationFromString() { + assertEquals(Flattening.NO, Flattening.fromValue("no flattening")); + assertEquals(Flattening.ROOT_LEVEL, Flattening.fromValue("root level flattening")); + try { + Flattening.fromValue("invalid flattening value"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGeneratorTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGeneratorTest.java new file mode 100644 index 0000000000000..a29e607a77e54 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/NoFlatteningSheetGeneratorTest.java @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import static org.junit.jupiter.api.Assertions.assertLinesMatch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class NoFlatteningSheetGeneratorTest { + + private final ObjectMapper mapper = MoreMappers.initMapper(); + private final NoFlatteningSheetGenerator sheetGenerator = new NoFlatteningSheetGenerator(); + + @Test + public void testGetHeaderRow() { + assertLinesMatch( + Lists.newArrayList( + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + JavaBaseConstants.COLUMN_NAME_DATA), + sheetGenerator.getHeaderRow()); + } + + @Test + public void testGetRecordColumns() { + ObjectNode json = mapper.createObjectNode(); + json.set("Field 4", mapper.createObjectNode().put("Field 41", 15)); + json.put("Field 1", "A"); + json.put("Field 3", 71); + json.put("Field 2", true); + + assertLinesMatch( + Collections.singletonList("{\"Field 4\":{\"Field 41\":15},\"Field 1\":\"A\",\"Field 3\":71,\"Field 2\":true}"), + sheetGenerator.getRecordColumns(json)); + } + +} diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGeneratorTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGeneratorTest.java new file mode 100644 index 0000000000000..351c5edf09374 --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/csv/RootLevelFlatteningSheetGeneratorTest.java @@ -0,0 +1,88 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.azure_blob_storage.csv; + +import static org.junit.jupiter.api.Assertions.assertLinesMatch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.integrations.base.JavaBaseConstants; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class RootLevelFlatteningSheetGeneratorTest { + + private final static ObjectMapper MAPPER = MoreMappers.initMapper(); + private final static ObjectNode SCHEMA = MAPPER.createObjectNode(); + static { + List fields = Lists.newArrayList("C", "B", "A", "c", "b", "a"); + Collections.shuffle(fields); + + ObjectNode schemaProperties = MAPPER.createObjectNode(); + for (String field : fields) { + schemaProperties.set(field, MAPPER.createObjectNode()); + } + + SCHEMA.set("properties", schemaProperties); + } + + private RootLevelFlatteningSheetGenerator sheetGenerator; + + @BeforeEach + public void createGenerator() { + this.sheetGenerator = new RootLevelFlatteningSheetGenerator(SCHEMA); + } + + @Test + public void testGetHeaderRow() { + assertLinesMatch( + Lists.newArrayList( + JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT, + "A", "B", "C", "a", "b", "c"), + sheetGenerator.getHeaderRow()); + } + + @Test + public void testGetRecordColumns() { + ObjectNode json = MAPPER.createObjectNode(); + // Field c is missing + json.put("C", 3); + json.put("B", "value B"); + json.set("A", MAPPER.createObjectNode().put("Field 41", 15)); + json.put("b", "value b"); + json.put("a", 1); + + assertLinesMatch( + // A, B, C, a, b, c + Lists.newArrayList("{\"Field 41\":15}", "value B", "3", "1", "value b", ""), + sheetGenerator.getRecordColumns(json)); + } + +} diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index ad0ac0c78dad4..72f4ce1405610 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -129,6 +129,7 @@ * [Zoom](integrations/sources/zoom.md) * [Zuora](integrations/sources/zuora.md) * [Destinations](integrations/destinations/README.md) + * [AzureBlobStorage](integrations/destinations/azureblobstorage.md) * [BigQuery](integrations/destinations/bigquery.md) * [Chargify](integrations/destinations/keen.md) * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index dce2eb0246c8b..d43d059270963 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -112,6 +112,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex ### Destinations | Connector | Grade | |----|----| +|[AzureBlobStorage](./destinations/azureblobstorage.md)| Alpha | |[BigQuery](./destinations/bigquery.md)| Certified | |[Chargify (Keen)](./destinations/keen.md)| Alpha | |[Google Cloud Storage (GCS)](./destinations/gcs.md)| Alpha | diff --git a/docs/integrations/destinations/azureblobstorage.md b/docs/integrations/destinations/azureblobstorage.md new file mode 100644 index 0000000000000..ae934399d3be5 --- /dev/null +++ b/docs/integrations/destinations/azureblobstorage.md @@ -0,0 +1,142 @@ +# Azure Blob Storage + +## Overview + +This destination writes data to Azure Blob Storage. + +The Airbyte Azure Blob Storage destination allows you to sync data to Azure Blob Storage. Each stream is written to its own blob under the container. + +## Sync Mode + +| Feature | Support | Notes | +| :--- | :---: | :--- | +| Full Refresh Sync | ✅ | Warning: this mode deletes all previously synced data in the configured blob. | +| Incremental - Append Sync | ✅ | The append mode would only work for "Append blobs" blobs as per Azure limitations, more details https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs | + + +## Configuration + +| Parameter | Type | Notes | +| :--- | :---: | :--- | +| Endpoint Domain Name | string | This is Azure Blob Storage endpoint domain name. Leave default value (or leave it empty if run container from command line) to use Microsoft native one. | +| Azure blob storage container (Bucket) Name | string | A name of the Azure blob storage container. If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp. | +| Azure Blob Storage account name | string | The account's name of the Azure Blob Storage. | +| The Azure blob storage account key | string | Azure blob storage account key. Example: `abcdefghijklmnopqrstuvwxyz/0123456789+ABCDEFGHIJKLMNOPQRSTUVWXYZ/0123456789%++sampleKey==`. | +| Format | object | Format specific configuration. See below for details. | + +⚠️ Please note that under "Full Refresh Sync" mode, data in the configured blob will be wiped out before each sync. We recommend you to provision a dedicated Azure Blob Storage Container resource for this sync to prevent unexpected data deletion from misconfiguration. ⚠️ + + +## Output Schema + +Each stream will be outputted to its dedicated Blob according to the configuration. The complete datastore of each stream includes all the output files under that Blob. You can think of the Blob as equivalent of a Table in the database world. + +- Under Full Refresh Sync mode, old output files will be purged before new files are created. +- Under Incremental - Append Sync mode, new output files will be added that only contain the new data. + +### CSV + +Like most of the other Airbyte destination connectors, usually the output has three columns: a UUID, an emission timestamp, and the data blob. With the CSV output, it is possible to normalize (flatten) the data blob to multiple columns. + +| Column | Condition | Description | +| :--- | :--- | :--- | +| `_airbyte_ab_id` | Always exists | A uuid assigned by Airbyte to each processed record. | +| `_airbyte_emitted_at` | Always exists. | A timestamp representing when the event was pulled from the data source. | +| `_airbyte_data` | When no normalization (flattening) is needed, all data reside under this column as a json blob. | +| root level fields | When root level normalization (flattening) is selected, the root level fields are expanded. | + +For example, given the following json object from a source: + +```json +{ + "user_id": 123, + "name": { + "first": "John", + "last": "Doe" + } +} +``` + +With no normalization, the output CSV is: + +| `_airbyte_ab_id` | `_airbyte_emitted_at` | `_airbyte_data` | +| :--- | :--- | :--- | +| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | `{ "user_id": 123, name: { "first": "John", "last": "Doe" } }` | + +With root level normalization, the output CSV is: + +| `_airbyte_ab_id` | `_airbyte_emitted_at` | `user_id` | `name` | +| :--- | :--- | :--- | :--- | +| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | 123 | `{ "first": "John", "last": "Doe" }` | + +### JSON Lines (JSONL) + +[Json Lines](https://jsonlines.org/) is a text format with one JSON per line. Each line has a structure as follows: + +```json +{ + "_airbyte_ab_id": "", + "_airbyte_emitted_at": "", + "_airbyte_data": "" +} +``` + +For example, given the following two json objects from a source: + +```json +[ + { + "user_id": 123, + "name": { + "first": "John", + "last": "Doe" + } + }, + { + "user_id": 456, + "name": { + "first": "Jane", + "last": "Roe" + } + } +] +``` + +They will be like this in the output file: + +```jsonl +{ "_airbyte_ab_id": "26d73cde-7eb1-4e1e-b7db-a4c03b4cf206", "_airbyte_emitted_at": "1622135805000", "_airbyte_data": { "user_id": 123, "name": { "first": "John", "last": "Doe" } } } +{ "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } } +``` + +## Getting started + +### Requirements + +1. Create an AzureBlobStorage account. +2. Check if it works under https://portal.azure.com/ -> "Storage explorer (preview)". + +### Setup guide + +* Fill up AzureBlobStorage info + * **Endpoint Domain Name** + * Leave default value (or leave it empty if run container from command line) to use Microsoft native one or use your own. + * **Azure blob storage container** + * If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp.. + * **Azure Blob Storage account name** + * See [this](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) on how to create an account. + * **The Azure blob storage account key** + * Corresponding key to the above user. + * **Format** + * Data format that will be use for a migrated data representation in blob. +* Make sure your user has access to Azure from the machine running Airbyte. + * This depends on your networking setup. + * The easiest way to verify if Airbyte is able to connect to your Azure blob storage container is via the check connection tool in the UI. + + + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-08-30 | [#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. | diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index b5260b4c85e24..973cafd65a68f 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -32,6 +32,7 @@ write_standard_creds destination-redshift "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS" write_standard_creds destination-dynamodb "$DESTINATION_DYNAMODB_TEST_CREDS" write_standard_creds destination-oracle "$AWS_ORACLE_INTEGRATION_TEST_CREDS" write_standard_creds destination-s3 "$DESTINATION_S3_INTEGRATION_TEST_CREDS" +write_standard_creds destination-azure-blob-storage "$DESTINATION_AZURE_BLOB_CREDS" write_standard_creds destination-snowflake "$SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS" "copy_gcs_config.json" write_standard_creds destination-snowflake "$SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS" "copy_s3_config.json" write_standard_creds destination-snowflake "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "insert_config.json"