diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index 576627a3dc625..f90fc23532717 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -28,6 +28,7 @@ import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; @@ -42,6 +43,10 @@ public class DatabricksDestination extends CopyDestination { private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DatabricksDestination()).run(args); + } + @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { return CopyConsumerFactory.create( @@ -52,7 +57,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata S3Config.getS3Config(config), catalog, new DatabricksStreamCopierFactory(), - config.get("schema").asText() + config.get("schema").asText().equals("") ? "default" : config.get("schema").asText() ); } @@ -69,9 +74,11 @@ public ExtendedNameTransformer getNameTransformer() { @Override public JdbcDatabase getDatabase(JsonNode databricksConfig) { return Databases.createJdbcDatabase( - databricksConfig.get("username").asText(), - databricksConfig.has("password") ? databricksConfig.get("password").asText() : null, - databricksConfig.get("jdbc_url").asText(), + "token", + databricksConfig.get("pat").asText(), + String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s", + databricksConfig.get("serverHostname").asText(), + databricksConfig.get("httpPath").asText()), DRIVER_CLASS ); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 0c706d0686593..3ae154c4bc1d8 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -14,6 +14,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; import java.util.UUID; import org.slf4j.Logger; @@ -31,15 +32,20 @@ public class DatabricksStreamCopier implements StreamCopier { private final AmazonS3 s3Client; private final S3Config s3Config; private final String tmpTableName; + private final DestinationSyncMode syncMode; private final AirbyteStream stream; private final JdbcDatabase db; + private final String database; + private final String streamName; private final ExtendedNameTransformer nameTransformer; - private final SqlOperations sqlOperations; + private final DatabricksSqlOperations sqlOperations; private final S3ParquetWriter parquetWriter; public DatabricksStreamCopier(String stagingFolder, + DestinationSyncMode syncMode, String schema, ConfiguredAirbyteStream configuredStream, + String streamName, AmazonS3 s3Client, JdbcDatabase db, S3Config s3Config, @@ -48,14 +54,18 @@ public DatabricksStreamCopier(String stagingFolder, S3WriterFactory writerFactory, Timestamp uploadTime) throws Exception { this.stream = configuredStream.getStream(); + this.syncMode = syncMode; this.db = db; + this.database = schema; + this.streamName = streamName; this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(stream.getName()); + this.sqlOperations = (DatabricksSqlOperations) sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = s3Client; this.s3Config = s3Config; this.parquetWriter = (S3ParquetWriter) writerFactory .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); + LOGGER.info(parquetWriter.parquetSchema.toString()); } @Override @@ -69,28 +79,46 @@ public void closeStagingUploader(boolean hasFailed) throws Exception { } @Override - public void createTemporaryTable() throws Exception { - + public void createDestinationSchema() throws Exception { + LOGGER.info("Creating database in destination if it doesn't exist: {}", database); + sqlOperations.createSchemaIfNotExists(db, database); } @Override - public void copyStagingFileToTemporaryTable() throws Exception { - + public void createTemporaryTable() throws Exception { + LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); + LOGGER.info(parquetWriter.parquetSchema.toString()); + sqlOperations.createTableIfNotExists(db, database, tmpTableName); } @Override - public void createDestinationSchema() throws Exception { - + public void copyStagingFileToTemporaryTable() throws Exception { + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); + // TODO: load data sql operation + LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } + @Override public String createDestinationTable() throws Exception { - return null; + var destTableName = nameTransformer.getRawTableName(streamName); + LOGGER.info("Preparing table {} in destination.", destTableName); + sqlOperations.createTableIfNotExists(db, database, destTableName); + LOGGER.info("Table {} in destination prepared.", tmpTableName); + + return destTableName; } @Override - public String generateMergeStatement(String destTableName) throws Exception { - return null; + public String generateMergeStatement(String destTableName) { + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database); + var queries = new StringBuilder(); + if (syncMode.equals(DestinationSyncMode.OVERWRITE)) { + queries.append(sqlOperations.truncateTableQuery(db, database, destTableName)); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database); + } + queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName)); + return queries.toString(); } @Override diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java index 3f73f5f0e7efe..096e134bd2233 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; public class DatabricksStreamCopierFactory implements StreamCopierFactory { @@ -27,13 +28,14 @@ public StreamCopier create(String configuredSchema, SqlOperations sqlOperations) { try { AirbyteStream stream = configuredStream.getStream(); + DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); S3WriterFactory writerFactory = new ProductionWriterFactory(); Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); return new DatabricksStreamCopier( - stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index f5264cb4312c5..f9a10ca18b85c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -9,22 +9,93 @@ "title": "Databricks Destination Spec", "type": "object", "required": [ - "jdbcUrl" + "serverHostname", + "httpPath", + "pat" ], "additionalProperties": false, "properties": { - "jdbcUrl": { - "title": "JDBC URL", + "serverHostname": { + "title": "Server Hostname", + "type": "string", + "description": "", + "examples": [""] + }, + "httpPath": { + "title": "HTTP Path", + "type": "string", + "description": "", + "examples": [""] + }, + "pat": { + "title": "Personal Access Token", "type": "string", "description": "", "examples": [""], "airbyte_secret": true }, - "database": { + "schema": { "title": "Database", "type": "string", - "description": "", - "examples": [""] + "description": "" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the S3 bucket to use for intermittent staging of the data.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", + "title": "S3 Key Id", + "airbyte_secret": true + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the above access key id.", + "title": "S3 Access Key", + "airbyte_secret": true + }, + "part_size": { + "type": "integer", + "minimum": 10, + "maximum": 100, + "examples": ["10"], + "description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", + "title": "Stream Part Size" } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index 806852411c920..d64b2362c2fb7 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -56,6 +56,7 @@ public class S3ParquetWriter extends BaseS3Writer implements S3Writer { private final ParquetWriter parquetWriter; private final AvroRecordFactory avroRecordFactory; + public final Schema parquetSchema; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) {