Skip to content

Commit

Permalink
🎉 Destination Azure blob storage: introduced new connector with jsonl…
Browse files Browse the repository at this point in the history
… and csv formats (#5332)

* [3447] Added destination Azure blob storage connector (CSV and JSONL formats)
  • Loading branch information
etsybaev committed Aug 29, 2021
1 parent 073a1d5 commit 61842ed
Show file tree
Hide file tree
Showing 44 changed files with 2,621 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ jobs:
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
DESTINATION_AZURE_BLOB_CREDS: ${{ secrets.DESTINATION_AZURE_BLOB_CREDS }}
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
APIFY_INTEGRATION_TEST_CREDS: ${{ secrets.APIFY_INTEGRATION_TEST_CREDS }}
DESTINATION_DYNAMODB_TEST_CREDS: ${{ secrets.DESTINATION_DYNAMODB_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ jobs:
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
DESTINATION_AZURE_BLOB_CREDS: ${{ secrets.DESTINATION_AZURE_BLOB_CREDS }}
DESTINATION_GCS_CREDS: ${{ secrets.DESTINATION_GCS_CREDS }}
DESTINATION_DYNAMODB_TEST_CREDS: ${{ secrets.DESTINATION_DYNAMODB_TEST_CREDS }}
APIFY_INTEGRATION_TEST_CREDS: ${{ secrets.APIFY_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb",
"name": "Azure Blob Storage",
"dockerRepository": "airbyte/destination-azure-blob-storage",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
dockerImageTag: 0.3.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb
name: Azure Blob Storage
dockerRepository: airbyte/destination-azure-blob-storage
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/azureblobstorage
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@

|name |status |
| :--- | :--- |
| Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) |
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
| Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) |
| Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-azure-blob-storage

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-azure-blob-storage
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Azure Blob Storage Test Configuration

In order to test the Azure Blob Storage destination, you need a Microsoft account.

## Community Contributor

As a community contributor, you will need access to Azure to run the integration tests.

- Create an AzureBlobStorage account for testing. Check if it works under https://portal.azure.com/ -> "Storage explorer (preview)".
- Get your `azure_blob_storage_account_name` and `azure_blob_storage_account_key` that can read and write to the Azure Container.
- Paste the accountName and key information into the config files under [`./sample_secrets`](secrets).
- Rename the directory from `sample_secrets` to `secrets`.
- Feel free to modify the config files with different settings in the acceptance test file (e.g. `AzureBlobStorageJsonlDestinationAcceptanceTest.java`, method `getFormatConfig`), as long as they follow the schema defined in [spec.json](src/main/resources/spec.json).

## Airbyte Employee
- Access the `Azure Blob Storage Account` secrets on Last Pass.
- Replace the `config.json` under `sample_secrets`.
- Rename the directory from `sample_secrets` to `secrets`.

## Add New Output Format
- Add a new enum in `AzureBlobStorageFormat'.
- Modify `spec.json` to specify the configuration of this new format.
- Update `AzureBlobStorageFormatConfigs` to be able to construct a config for this new format.
- Create a new package under `io.airbyte.integrations.destination.azure_blob_storage`.
- Implement a new `AzureBlobStorageWriter`. The implementation can extend `BaseAzureBlobStorageWriter`.
- Write an acceptance test for the new output format. The test can extend `AzureBlobStorageDestinationAcceptanceTest`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'com.azure:azure-storage-blob:12.12.0'
implementation 'org.apache.commons:commons-csv:1.4'

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-azure-blob-storage')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"azure_blob_storage_endpoint_domain_name": "blob.core.windows.net",
"azure_blob_storage_account_name": "your_account_name_here",
"azure_blob_storage_account_key": "your_account_key_here",
"azure_blob_storage_container_name": "testcontainername"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.azure_blob_storage;

import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.specialized.AppendBlobClient;
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureBlobStorageConnectionChecker {

private static final String TEST_BLOB_NAME_PREFIX = "testConnectionBlob";
private BlobContainerClient containerClient; // aka schema in SQL DBs
private final AppendBlobClient appendBlobClient; // aka "SQL Table"

private static final Logger LOGGER = LoggerFactory.getLogger(
AzureBlobStorageConnectionChecker.class);

public AzureBlobStorageConnectionChecker(
AzureBlobStorageDestinationConfig azureBlobStorageConfig) {

StorageSharedKeyCredential credential = new StorageSharedKeyCredential(
azureBlobStorageConfig.getAccountName(),
azureBlobStorageConfig.getAccountKey());

this.appendBlobClient =
new SpecializedBlobClientBuilder()
.endpoint(azureBlobStorageConfig.getEndpointUrl())
.credential(credential)
.containerName(azureBlobStorageConfig.getContainerName()) // Like schema in DB
.blobName(TEST_BLOB_NAME_PREFIX + UUID.randomUUID()) // Like table in DB
.buildAppendBlobClient();
}

/*
* This a kinda test method that is used in CHECK operation to make sure all works fine with the
* current config
*/
public void attemptWriteAndDelete() {
initTestContainerAndBlob();
writeUsingAppendBlock("Some test data");
listBlobsInContainer()
.forEach(
blobItem -> LOGGER.info(
"Blob name: " + blobItem.getName() + "Snapshot: " + blobItem.getSnapshot()));

deleteBlob();
}

private void initTestContainerAndBlob() {
// create container if absent (aka SQl Schema)
this.containerClient = appendBlobClient.getContainerClient();
if (!containerClient.exists()) {
containerClient.create();
}

// create a storage container if absent (aka Table is SQL BD)
if (!appendBlobClient.exists()) {
appendBlobClient.create();
LOGGER.info("blobContainerClient created");
} else {
LOGGER.info("blobContainerClient already exists");
}
}

/*
* This options may be used to write and flush right away. Note: Azure SDK fails for empty lines,
* but those are not supposed to be written here
*/
public void writeUsingAppendBlock(String data) {
LOGGER.info("Writing test data to Azure Blob storage: " + data);
InputStream dataStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));

Integer blobCommittedBlockCount = appendBlobClient.appendBlock(dataStream, data.length())
.getBlobCommittedBlockCount();

LOGGER.info("blobCommittedBlockCount: " + blobCommittedBlockCount);
}

/*
* List the blob(s) in our container.
*/
public PagedIterable<BlobItem> listBlobsInContainer() {
return containerClient.listBlobs();
}

/*
* Delete the blob we created earlier.
*/
public void deleteBlob() {
LOGGER.info("Deleting blob: " + appendBlobClient.getBlobName());
appendBlobClient.delete(); // remove aka "SQL Table" used
}

/*
* Delete the Container. Be very careful when you ise ir. It removes thw whole bucket and supposed
* to be used in check connection ony for writing tmp data
*/
public void deleteContainer() {
LOGGER.info("Deleting blob: " + containerClient.getBlobContainerName());
containerClient.delete(); // remove aka "SQL Schema" used
}

}
Loading

0 comments on commit 61842ed

Please sign in to comment.