From 20cf9e2f4b75c919319e27143334520ff2e0b060 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 1 Sep 2021 10:54:36 +0100 Subject: [PATCH 1/4] changes toward creating database/tables through jdbc --- .../databricks/DatabricksDestination.java | 15 +- .../databricks/DatabricksSqlOperations.java | 231 +++++++++--------- .../databricks/DatabricksStreamCopier.java | 54 +++- .../DatabricksStreamCopierFactory.java | 4 +- .../src/main/resources/spec.json | 83 ++++++- .../s3/parquet/S3ParquetWriter.java | 2 + 6 files changed, 251 insertions(+), 138 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index 576627a3dc6254..f90fc235327170 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -28,6 +28,7 @@ import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; @@ -42,6 +43,10 @@ public class DatabricksDestination extends CopyDestination { private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; + public static void main(String[] args) throws Exception { + new IntegrationRunner(new DatabricksDestination()).run(args); + } + @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { return CopyConsumerFactory.create( @@ -52,7 +57,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata S3Config.getS3Config(config), catalog, new DatabricksStreamCopierFactory(), - config.get("schema").asText() + config.get("schema").asText().equals("") ? "default" : config.get("schema").asText() ); } @@ -69,9 +74,11 @@ public ExtendedNameTransformer getNameTransformer() { @Override public JdbcDatabase getDatabase(JsonNode databricksConfig) { return Databases.createJdbcDatabase( - databricksConfig.get("username").asText(), - databricksConfig.has("password") ? databricksConfig.get("password").asText() : null, - databricksConfig.get("jdbc_url").asText(), + "token", + databricksConfig.get("pat").asText(), + String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s", + databricksConfig.get("serverHostname").asText(), + databricksConfig.get("httpPath").asText()), DRIVER_CLASS ); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java index 31ba481c8f670c..40a3ed9b1ac610 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -40,139 +40,140 @@ public class DatabricksSqlOperations extends JdbcSqlOperations { - private boolean isLocalFileEnabled = false; @Override public void executeTransaction(JdbcDatabase database, List queries) throws Exception { database.executeWithinTransaction(queries); } - @Override - public void insertRecordsInternal(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName) - throws SQLException { - if (records.isEmpty()) { - return; - } - - verifyLocalFileEnabled(database); - try { - File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); - - loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); - - Files.delete(tmpFile.toPath()); - } catch (IOException e) { - throw new SQLException(e); - } - } - - private void loadDataIntoTable(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName, - File tmpFile) - throws SQLException { - database.execute(connection -> { - try { - writeBatchToFile(tmpFile, records); - - String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; - - String query = String.format( - "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", - absoluteFile, schemaName, tmpTableName); - - try (Statement stmt = connection.createStatement()) { - stmt.execute(query); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Override - protected JsonNode formatData(JsonNode data) { - return StandardNameTransformer.formatJsonPath(data); - } - - void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { - boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); - if (!localFileEnabled) { - tryEnableLocalFile(database); - } - isLocalFileEnabled = true; - } - - private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { - database.execute(connection -> { - try (Statement statement = connection.createStatement()) { - statement.execute("set global local_infile=true"); - } catch (Exception e) { - throw new RuntimeException( - "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", - e); - } - }); - } - - private double getVersion(JdbcDatabase database) throws SQLException { - List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), - resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); - return Double.parseDouble(value.get(0).substring(0, 3)); - } - - VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { - double version = getVersion(database); - return new VersionCompatibility(version, version >= 5.7); - } - - @Override - public boolean isSchemaRequired() { - return false; - } - - private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { - List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), - resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); - - return value.get(0).equalsIgnoreCase("on"); - } - @Override public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { - // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, - // 256 is enough + // Since Databricks uses Hive, there isn't an enforcement of primary keys or ability to set default column values return String.format( "CREATE TABLE IF NOT EXISTS %s.%s ( \n" - + "%s VARCHAR(256) PRIMARY KEY,\n" + + "%s STRING,\n" + "%s JSON,\n" - + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" + + "%s TIMESTAMP\n" + ");\n", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); } - public static class VersionCompatibility { - - private final double version; - private final boolean isCompatible; - - public VersionCompatibility(double version, boolean isCompatible) { - this.version = version; - this.isCompatible = isCompatible; + // THIS IS A DUMMY RIGHT NOW + @Override + public void insertRecordsInternal(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) + throws SQLException { + if (records.isEmpty()) { + return; } + throw new SQLException(); - public double getVersion() { - return version; - } + } - public boolean isCompatible() { - return isCompatible; - } - } +// @Override +// public void insertRecordsInternal(JdbcDatabase database, +// List records, +// String schemaName, +// String tmpTableName) +// throws SQLException { +// if (records.isEmpty()) { +// return; +// } +// +// verifyLocalFileEnabled(database); +// try { +// File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); +// +// loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); +// +// Files.delete(tmpFile.toPath()); +// } catch (IOException e) { +// throw new SQLException(e); +// } +// } +// +// private void loadDataIntoTable(JdbcDatabase database, +// List records, +// String schemaName, +// String tmpTableName, +// File tmpFile) +// throws SQLException { +// database.execute(connection -> { +// try { +// writeBatchToFile(tmpFile, records); +// +// String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; +// +// String query = String.format( +// "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", +// absoluteFile, schemaName, tmpTableName); +// +// try (Statement stmt = connection.createStatement()) { +// stmt.execute(query); +// } +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// }); +// } + +// private double getVersion(JdbcDatabase database) throws SQLException { +// List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), +// resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); +// return Double.parseDouble(value.get(0).substring(0, 3)); +// } +// +// VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { +// double version = getVersion(database); +// return new VersionCompatibility(version, version >= 5.7); +// } +// +// @Override +// public boolean isSchemaRequired() { +// return false; +// } +// +// private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { +// List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), +// resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); +// +// return value.get(0).equalsIgnoreCase("on"); +// } +// +// @Override +// public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { +// // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, +// // 256 is enough +// return String.format( +// "CREATE TABLE IF NOT EXISTS %s.%s ( \n" +// + "%s VARCHAR(256) PRIMARY KEY,\n" +// + "%s JSON,\n" +// + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" +// + ");\n", +// schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); +// } + +// public static class VersionCompatibility { +// +// private final double version; +// private final boolean isCompatible; +// +// public VersionCompatibility(double version, boolean isCompatible) { +// this.version = version; +// this.isCompatible = isCompatible; +// } +// +// public double getVersion() { +// return version; +// } +// +// public boolean isCompatible() { +// return isCompatible; +// } +// +// } } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 0c706d0686593f..1203f348e232ef 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -14,6 +14,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; import java.util.UUID; import org.slf4j.Logger; @@ -31,15 +32,20 @@ public class DatabricksStreamCopier implements StreamCopier { private final AmazonS3 s3Client; private final S3Config s3Config; private final String tmpTableName; + private final DestinationSyncMode syncMode; private final AirbyteStream stream; private final JdbcDatabase db; + private final String database; + private final String streamName; private final ExtendedNameTransformer nameTransformer; - private final SqlOperations sqlOperations; + private final DatabricksSqlOperations sqlOperations; private final S3ParquetWriter parquetWriter; public DatabricksStreamCopier(String stagingFolder, + DestinationSyncMode syncMode, String schema, ConfiguredAirbyteStream configuredStream, + String streamName, AmazonS3 s3Client, JdbcDatabase db, S3Config s3Config, @@ -48,14 +54,18 @@ public DatabricksStreamCopier(String stagingFolder, S3WriterFactory writerFactory, Timestamp uploadTime) throws Exception { this.stream = configuredStream.getStream(); + this.syncMode = syncMode; this.db = db; + this.database = schema; + this.streamName = streamName; this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(stream.getName()); + this.sqlOperations = (DatabricksSqlOperations) sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = s3Client; this.s3Config = s3Config; this.parquetWriter = (S3ParquetWriter) writerFactory .create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); + LOGGER.info(parquetWriter.parquetSchema.toString()); } @Override @@ -69,28 +79,48 @@ public void closeStagingUploader(boolean hasFailed) throws Exception { } @Override - public void createTemporaryTable() throws Exception { - + public void createDestinationSchema() throws Exception { + LOGGER.info("Creating database in destination if it doesn't exist: {}", database); + sqlOperations.createSchemaIfNotExists(db, database); } + // @Override - public void copyStagingFileToTemporaryTable() throws Exception { - + public void createTemporaryTable() throws Exception { + LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); + LOGGER.info(parquetWriter.parquetSchema.toString()); +// sqlOperations.createTableIfNotExists(db, database, tmpTableName); } + // DUMMY for now, throwing exception @Override - public void createDestinationSchema() throws Exception { - + public void copyStagingFileToTemporaryTable() throws Exception { + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); + throw new Exception(); +// LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } + @Override public String createDestinationTable() throws Exception { - return null; + var destTableName = nameTransformer.getRawTableName(streamName); + LOGGER.info("Preparing table {} in destination.", destTableName); + sqlOperations.createTableIfNotExists(db, database, destTableName); + LOGGER.info("Table {} in destination prepared.", tmpTableName); + + return destTableName; } @Override - public String generateMergeStatement(String destTableName) throws Exception { - return null; + public String generateMergeStatement(String destTableName) { + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database); + var queries = new StringBuilder(); + if (syncMode.equals(DestinationSyncMode.OVERWRITE)) { + queries.append(sqlOperations.truncateTableQuery(db, database, destTableName)); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database); + } + queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName)); + return queries.toString(); } @Override diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java index 3f73f5f0e7efee..096e134bd22339 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import java.sql.Timestamp; public class DatabricksStreamCopierFactory implements StreamCopierFactory { @@ -27,13 +28,14 @@ public StreamCopier create(String configuredSchema, SqlOperations sqlOperations) { try { AirbyteStream stream = configuredStream.getStream(); + DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode(); String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); S3WriterFactory writerFactory = new ProductionWriterFactory(); Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); return new DatabricksStreamCopier( - stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); + stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index f5264cb4312c52..f9a10ca18b85ca 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -9,22 +9,93 @@ "title": "Databricks Destination Spec", "type": "object", "required": [ - "jdbcUrl" + "serverHostname", + "httpPath", + "pat" ], "additionalProperties": false, "properties": { - "jdbcUrl": { - "title": "JDBC URL", + "serverHostname": { + "title": "Server Hostname", + "type": "string", + "description": "", + "examples": [""] + }, + "httpPath": { + "title": "HTTP Path", + "type": "string", + "description": "", + "examples": [""] + }, + "pat": { + "title": "Personal Access Token", "type": "string", "description": "", "examples": [""], "airbyte_secret": true }, - "database": { + "schema": { "title": "Database", "type": "string", - "description": "", - "examples": [""] + "description": "" + }, + "s3_bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "The name of the S3 bucket to use for intermittent staging of the data.", + "examples": ["airbyte.staging"] + }, + "s3_bucket_region": { + "title": "S3 Bucket Region", + "type": "string", + "default": "", + "description": "The region of the S3 staging bucket to use if utilising a copy strategy.", + "enum": [ + "", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1" + ] + }, + "access_key_id": { + "type": "string", + "description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.", + "title": "S3 Key Id", + "airbyte_secret": true + }, + "secret_access_key": { + "type": "string", + "description": "The corresponding secret to the above access key id.", + "title": "S3 Access Key", + "airbyte_secret": true + }, + "part_size": { + "type": "integer", + "minimum": 10, + "maximum": 100, + "examples": ["10"], + "description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", + "title": "Stream Part Size" } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index 806852411c9205..d64b2362c2fb74 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -56,6 +56,7 @@ public class S3ParquetWriter extends BaseS3Writer implements S3Writer { private final ParquetWriter parquetWriter; private final AvroRecordFactory avroRecordFactory; + public final Schema parquetSchema; public S3ParquetWriter(S3DestinationConfig config, AmazonS3 s3Client, @@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config, .withDictionaryEncoding(formatConfig.isDictionaryEncoding()) .build(); this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater); + this.parquetSchema = schema; } public static Configuration getHadoopConfig(S3DestinationConfig config) { From f9eab6c3852719f5ea38120decf6581ec0fa57fc Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 1 Sep 2021 18:55:49 +0100 Subject: [PATCH 2/4] Delete DatabricksSqlOperations.java --- .../databricks/DatabricksSqlOperations.java | 179 ------------------ 1 file changed, 179 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java deleted file mode 100644 index 40a3ed9b1ac610..00000000000000 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.databricks; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.StandardNameTransformer; -import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.stream.Collectors; - -public class DatabricksSqlOperations extends JdbcSqlOperations { - - - @Override - public void executeTransaction(JdbcDatabase database, List queries) throws Exception { - database.executeWithinTransaction(queries); - } - - @Override - public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { - // Since Databricks uses Hive, there isn't an enforcement of primary keys or ability to set default column values - return String.format( - "CREATE TABLE IF NOT EXISTS %s.%s ( \n" - + "%s STRING,\n" - + "%s JSON,\n" - + "%s TIMESTAMP\n" - + ");\n", - schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); - } - - // THIS IS A DUMMY RIGHT NOW - @Override - public void insertRecordsInternal(JdbcDatabase database, - List records, - String schemaName, - String tmpTableName) - throws SQLException { - if (records.isEmpty()) { - return; - } - throw new SQLException(); - - } - - -// @Override -// public void insertRecordsInternal(JdbcDatabase database, -// List records, -// String schemaName, -// String tmpTableName) -// throws SQLException { -// if (records.isEmpty()) { -// return; -// } -// -// verifyLocalFileEnabled(database); -// try { -// File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); -// -// loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); -// -// Files.delete(tmpFile.toPath()); -// } catch (IOException e) { -// throw new SQLException(e); -// } -// } -// -// private void loadDataIntoTable(JdbcDatabase database, -// List records, -// String schemaName, -// String tmpTableName, -// File tmpFile) -// throws SQLException { -// database.execute(connection -> { -// try { -// writeBatchToFile(tmpFile, records); -// -// String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; -// -// String query = String.format( -// "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", -// absoluteFile, schemaName, tmpTableName); -// -// try (Statement stmt = connection.createStatement()) { -// stmt.execute(query); -// } -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// }); -// } - -// private double getVersion(JdbcDatabase database) throws SQLException { -// List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), -// resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); -// return Double.parseDouble(value.get(0).substring(0, 3)); -// } -// -// VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { -// double version = getVersion(database); -// return new VersionCompatibility(version, version >= 5.7); -// } -// -// @Override -// public boolean isSchemaRequired() { -// return false; -// } -// -// private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { -// List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), -// resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); -// -// return value.get(0).equalsIgnoreCase("on"); -// } -// -// @Override -// public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { -// // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, -// // 256 is enough -// return String.format( -// "CREATE TABLE IF NOT EXISTS %s.%s ( \n" -// + "%s VARCHAR(256) PRIMARY KEY,\n" -// + "%s JSON,\n" -// + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" -// + ");\n", -// schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); -// } - -// public static class VersionCompatibility { -// -// private final double version; -// private final boolean isCompatible; -// -// public VersionCompatibility(double version, boolean isCompatible) { -// this.version = version; -// this.isCompatible = isCompatible; -// } -// -// public double getVersion() { -// return version; -// } -// -// public boolean isCompatible() { -// return isCompatible; -// } -// -// } - -} From 23278acb2b951b8eba6cacadf4ad90b2b07ee9e8 Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 1 Sep 2021 19:02:13 +0100 Subject: [PATCH 3/4] revert sqlops --- .../databricks/DatabricksSqlOperations.java | 178 ++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java new file mode 100644 index 00000000000000..31ba481c8f670c --- /dev/null +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.databricks; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public class DatabricksSqlOperations extends JdbcSqlOperations { + + private boolean isLocalFileEnabled = false; + + @Override + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + database.executeWithinTransaction(queries); + } + + @Override + public void insertRecordsInternal(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) + throws SQLException { + if (records.isEmpty()) { + return; + } + + verifyLocalFileEnabled(database); + try { + File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); + + loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); + + Files.delete(tmpFile.toPath()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + private void loadDataIntoTable(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName, + File tmpFile) + throws SQLException { + database.execute(connection -> { + try { + writeBatchToFile(tmpFile, records); + + String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; + + String query = String.format( + "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'", + absoluteFile, schemaName, tmpTableName); + + try (Statement stmt = connection.createStatement()) { + stmt.execute(query); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + protected JsonNode formatData(JsonNode data) { + return StandardNameTransformer.formatJsonPath(data); + } + + void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { + boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); + if (!localFileEnabled) { + tryEnableLocalFile(database); + } + isLocalFileEnabled = true; + } + + private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { + database.execute(connection -> { + try (Statement statement = connection.createStatement()) { + statement.execute("set global local_infile=true"); + } catch (Exception e) { + throw new RuntimeException( + "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", + e); + } + }); + } + + private double getVersion(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), + resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); + return Double.parseDouble(value.get(0).substring(0, 3)); + } + + VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { + double version = getVersion(database); + return new VersionCompatibility(version, version >= 5.7); + } + + @Override + public boolean isSchemaRequired() { + return false; + } + + private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); + + return value.get(0).equalsIgnoreCase("on"); + } + + @Override + public String createTableQuery(JdbcDatabase database, String schemaName, String tableName) { + // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, + // 256 is enough + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s ( \n" + + "%s VARCHAR(256) PRIMARY KEY,\n" + + "%s JSON,\n" + + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" + + ");\n", + schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + public static class VersionCompatibility { + + private final double version; + private final boolean isCompatible; + + public VersionCompatibility(double version, boolean isCompatible) { + this.version = version; + this.isCompatible = isCompatible; + } + + public double getVersion() { + return version; + } + + public boolean isCompatible() { + return isCompatible; + } + + } + +} From 73e77abeeb65969854a5902c6bea78b86f70ef5c Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Wed, 1 Sep 2021 19:05:54 +0100 Subject: [PATCH 4/4] minor changes --- .../destination/databricks/DatabricksStreamCopier.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 1203f348e232ef..3ae154c4bc1d85 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -84,20 +84,18 @@ public void createDestinationSchema() throws Exception { sqlOperations.createSchemaIfNotExists(db, database); } - // @Override public void createTemporaryTable() throws Exception { LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName); LOGGER.info(parquetWriter.parquetSchema.toString()); -// sqlOperations.createTableIfNotExists(db, database, tmpTableName); + sqlOperations.createTableIfNotExists(db, database, tmpTableName); } - // DUMMY for now, throwing exception @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database); - throw new Exception(); -// LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); + // TODO: load data sql operation + LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); }