Skip to content

Commit

Permalink
Publish R2 destination (#17122)
Browse files Browse the repository at this point in the history
* Format code

* Update doc

* Add r2 to build doc

* Update readme

* Add r2 to destination definitions

* Move test resource to base-java-s3

* Remove redundant dependency

* Add missing dependency

* Format json

* Remove s3-destination-base-integration-test setting.gradle root

* Add missing lzo dependency

* Fix import in destination jdbc
  • Loading branch information
tuliren committed Sep 26, 2022
1 parent 3494405 commit d14b442
Show file tree
Hide file tree
Showing 26 changed files with 359 additions and 111 deletions.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.io/integrations/destinations/clickhouse
releaseStage: alpha
- name: Cloudflare R2
destinationDefinitionId: 0fb07be9-7c3b-4336-850d-5efc006152ee
dockerRepository: airbyte/destination-r2
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/r2
icon: cloudflare-r2.svg
releaseStage: alpha
- name: Databricks Lakehouse
destinationDefinitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-databricks
Expand Down
276 changes: 276 additions & 0 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,282 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-r2:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/r2"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "R2 Destination Spec"
type: "object"
required:
- "account_id"
- "access_key_id"
- "secret_access_key"
- "s3_bucket_name"
- "s3_bucket_path"
- "format"
properties:
account_id:
type: "string"
description: "Cloudflare account ID"
title: "Cloudflare account ID"
examples:
- "12345678aa1a1a11111aaa1234567abc"
order: 0
access_key_id:
type: "string"
description: "The access key ID to access the R2 bucket. Airbyte requires\
\ Read and Write permissions to the given bucket. Read more <a href=\"\
https://developers.cloudflare.com/r2/platform/s3-compatibility/tokens/\"\
>here</a>."
title: "R2 Key ID *"
airbyte_secret: true
examples:
- "A012345678910EXAMPLE"
order: 1
secret_access_key:
type: "string"
description: "The corresponding secret to the access key ID. Read more <a\
\ href=\"https://developers.cloudflare.com/r2/platform/s3-compatibility/tokens/\"\
>here</a>"
title: "R2 Access Key *"
airbyte_secret: true
examples:
- "a012345678910ABCDEFGHAbCdEfGhEXAMPLEKEY"
order: 2
s3_bucket_name:
title: "R2 Bucket Name"
type: "string"
description: "The name of the R2 bucket. Read more <a href=\"https://developers.cloudflare.com/r2/get-started/#3-create-your-bucket\"\
>here</a>."
examples:
- "r2_sync"
order: 3
s3_bucket_path:
title: "R2 Bucket Path"
description: "Directory under the R2 bucket where data will be written."
type: "string"
examples:
- "data_sync/test"
order: 4
format:
title: "Output Format *"
type: "object"
description: "Format of the data output. See <a href=\"https://docs.airbyte.com/integrations/destinations/s3/#supported-output-schema\"\
>here</a> for more details"
oneOf:
- title: "Avro: Apache Avro"
required:
- "format_type"
- "compression_codec"
properties:
format_type:
title: "Format Type *"
type: "string"
enum:
- "Avro"
default: "Avro"
order: 0
compression_codec:
title: "Compression Codec *"
description: "The compression algorithm used to compress data. Default\
\ to no compression."
type: "object"
oneOf:
- title: "No Compression"
required:
- "codec"
properties:
codec:
type: "string"
enum:
- "no compression"
default: "no compression"
- title: "Deflate"
required:
- "codec"
- "compression_level"
properties:
codec:
type: "string"
enum:
- "Deflate"
default: "Deflate"
compression_level:
title: "Deflate Level"
description: "0: no compression & fastest, 9: best compression\
\ & slowest."
type: "integer"
default: 0
minimum: 0
maximum: 9
- title: "bzip2"
required:
- "codec"
properties:
codec:
type: "string"
enum:
- "bzip2"
default: "bzip2"
- title: "xz"
required:
- "codec"
- "compression_level"
properties:
codec:
type: "string"
enum:
- "xz"
default: "xz"
compression_level:
title: "Compression Level"
description: "See <a href=\"https://commons.apache.org/proper/commons-compress/apidocs/org/apache/commons/compress/compressors/xz/XZCompressorOutputStream.html#XZCompressorOutputStream-java.io.OutputStream-int-\"\
>here</a> for details."
type: "integer"
default: 6
minimum: 0
maximum: 9
- title: "zstandard"
required:
- "codec"
- "compression_level"
properties:
codec:
type: "string"
enum:
- "zstandard"
default: "zstandard"
compression_level:
title: "Compression Level"
description: "Negative levels are 'fast' modes akin to lz4 or\
\ snappy, levels above 9 are generally for archival purposes,\
\ and levels above 18 use a lot of memory."
type: "integer"
default: 3
minimum: -5
maximum: 22
include_checksum:
title: "Include Checksum"
description: "If true, include a checksum with each data block."
type: "boolean"
default: false
- title: "snappy"
required:
- "codec"
properties:
codec:
type: "string"
enum:
- "snappy"
default: "snappy"
order: 1
- title: "CSV: Comma-Separated Values"
required:
- "format_type"
- "flattening"
properties:
format_type:
title: "Format Type *"
type: "string"
enum:
- "CSV"
default: "CSV"
flattening:
type: "string"
title: "Normalization (Flattening)"
description: "Whether the input json data should be normalized (flattened)\
\ in the output CSV. Please refer to docs for details."
default: "No flattening"
enum:
- "No flattening"
- "Root level flattening"
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".csv.gz\")."
oneOf:
- title: "No Compression"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
- title: "JSON Lines: Newline-delimited JSON"
required:
- "format_type"
properties:
format_type:
title: "Format Type *"
type: "string"
enum:
- "JSONL"
default: "JSONL"
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".jsonl.gz\")."
oneOf:
- title: "No Compression"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
order: 5
s3_path_format:
title: "R2 Path Format (Optional)"
description: "Format string on how data will be organized inside the R2\
\ bucket directory. Read more <a href=\"https://docs.airbyte.com/integrations/destinations/r2#:~:text=The%20full%20path%20of%20the%20output%20data%20with%20the%20default%20S3%20path%20format\"\
>here</a>"
type: "string"
examples:
- "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"
order: 6
file_name_pattern:
type: "string"
description: "The pattern allows you to set the file-name format for the\
\ R2 staging file(s)"
title: "R2 Filename pattern (Optional)"
examples:
- "{date}"
- "{date:yyyy_MM}"
- "{timestamp}"
- "{part_number}"
- "{sync_id}"
order: 7
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-databricks:0.2.6"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/databricks"
Expand Down
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/base-java-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {

implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'

// parquet
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {
Expand All @@ -31,6 +32,7 @@ dependencies {

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4'
testImplementation "org.mockito:mockito-inline:4.1.0"

testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public static S3DestinationConfig getS3DestinationConfig(@Nonnull final JsonNode
builder = builder.withEndpoint(endpoint);
}
builder = builder.withCheckIntegrity(false)
// https://developers.cloudflare.com/r2/platform/s3-compatibility/api/#implemented-object-level-operations
// 3 or less
.withUploadThreadsCount(S3StorageOperations.R2_UPLOAD_THREADS);
// https://developers.cloudflare.com/r2/platform/s3-compatibility/api/#implemented-object-level-operations
// 3 or less
.withUploadThreadsCount(S3StorageOperations.R2_UPLOAD_THREADS);
}
default -> {
if (config.has(S_3_ENDPOINT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,5 @@ public void testGetS3DestinationConfigCF_R2Provider() {
assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("paste-secret-access-key-here");
assertThat(result.isCheckIntegrity()).isEqualTo(false);
}

}

0 comments on commit d14b442

Please sign in to comment.