Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mainly spec/config updates #5792

Merged
merged 4 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
Expand All @@ -42,6 +43,10 @@ public class DatabricksDestination extends CopyDestination {

private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";

public static void main(String[] args) throws Exception {
new IntegrationRunner(new DatabricksDestination()).run(args);
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
return CopyConsumerFactory.create(
Expand All @@ -52,7 +57,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata
S3Config.getS3Config(config),
catalog,
new DatabricksStreamCopierFactory(),
config.get("schema").asText()
config.get("schema").asText().equals("") ? "default" : config.get("schema").asText()
);
}

Expand All @@ -69,9 +74,11 @@ public ExtendedNameTransformer getNameTransformer() {
@Override
public JdbcDatabase getDatabase(JsonNode databricksConfig) {
return Databases.createJdbcDatabase(
databricksConfig.get("username").asText(),
databricksConfig.has("password") ? databricksConfig.get("password").asText() : null,
databricksConfig.get("jdbc_url").asText(),
"token",
databricksConfig.get("pat").asText(),
String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s",
databricksConfig.get("serverHostname").asText(),
databricksConfig.get("httpPath").asText()),
DRIVER_CLASS
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.Timestamp;
import java.util.UUID;
import org.slf4j.Logger;
Expand All @@ -31,15 +32,20 @@ public class DatabricksStreamCopier implements StreamCopier {
private final AmazonS3 s3Client;
private final S3Config s3Config;
private final String tmpTableName;
private final DestinationSyncMode syncMode;
private final AirbyteStream stream;
private final JdbcDatabase db;
private final String database;
private final String streamName;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final DatabricksSqlOperations sqlOperations;
private final S3ParquetWriter parquetWriter;

public DatabricksStreamCopier(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
ConfiguredAirbyteStream configuredStream,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3Config s3Config,
Expand All @@ -48,14 +54,18 @@ public DatabricksStreamCopier(String stagingFolder,
S3WriterFactory writerFactory,
Timestamp uploadTime) throws Exception {
this.stream = configuredStream.getStream();
this.syncMode = syncMode;
this.db = db;
this.database = schema;
this.streamName = streamName;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(stream.getName());
this.sqlOperations = (DatabricksSqlOperations) sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.s3Client = s3Client;
this.s3Config = s3Config;
this.parquetWriter = (S3ParquetWriter) writerFactory
.create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime);
LOGGER.info(parquetWriter.parquetSchema.toString());
}

@Override
Expand All @@ -69,28 +79,46 @@ public void closeStagingUploader(boolean hasFailed) throws Exception {
}

@Override
public void createTemporaryTable() throws Exception {

public void createDestinationSchema() throws Exception {
LOGGER.info("Creating database in destination if it doesn't exist: {}", database);
sqlOperations.createSchemaIfNotExists(db, database);
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {

public void createTemporaryTable() throws Exception {
LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName);
LOGGER.info(parquetWriter.parquetSchema.toString());
sqlOperations.createTableIfNotExists(db, database, tmpTableName);
}

@Override
public void createDestinationSchema() throws Exception {

public void copyStagingFileToTemporaryTable() throws Exception {
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database);
// TODO: load data sql operation
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}


@Override
public String createDestinationTable() throws Exception {
return null;
var destTableName = nameTransformer.getRawTableName(streamName);
LOGGER.info("Preparing table {} in destination.", destTableName);
sqlOperations.createTableIfNotExists(db, database, destTableName);
LOGGER.info("Table {} in destination prepared.", tmpTableName);

return destTableName;
}

@Override
public String generateMergeStatement(String destTableName) throws Exception {
return null;
public String generateMergeStatement(String destTableName) {
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database);
var queries = new StringBuilder();
if (syncMode.equals(DestinationSyncMode.OVERWRITE)) {
queries.append(sqlOperations.truncateTableQuery(db, database, destTableName));
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database);
}
queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName));
return queries.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.Timestamp;

public class DatabricksStreamCopierFactory implements StreamCopierFactory<S3Config> {
Expand All @@ -27,13 +28,14 @@ public StreamCopier create(String configuredSchema,
SqlOperations sqlOperations) {
try {
AirbyteStream stream = configuredStream.getStream();
DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer);
AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
S3WriterFactory writerFactory = new ProductionWriterFactory();
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

return new DatabricksStreamCopier(
stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,93 @@
"title": "Databricks Destination Spec",
"type": "object",
"required": [
"jdbcUrl"
"serverHostname",
"httpPath",
"pat"
],
"additionalProperties": false,
"properties": {
"jdbcUrl": {
"title": "JDBC URL",
"serverHostname": {
"title": "Server Hostname",
"type": "string",
"description": "",
"examples": [""]
},
"httpPath": {
"title": "HTTP Path",
"type": "string",
"description": "",
"examples": [""]
},
"pat": {
"title": "Personal Access Token",
"type": "string",
"description": "",
"examples": [""],
"airbyte_secret": true
},
"database": {
"schema": {
"title": "Database",
"type": "string",
"description": "",
"examples": [""]
"description": ""
},
"s3_bucket_name": {
"title": "S3 Bucket Name",
"type": "string",
"description": "The name of the S3 bucket to use for intermittent staging of the data.",
"examples": ["airbyte.staging"]
},
"s3_bucket_region": {
"title": "S3 Bucket Region",
"type": "string",
"default": "",
"description": "The region of the S3 staging bucket to use if utilising a copy strategy.",
"enum": [
"",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"me-south-1"
]
},
"access_key_id": {
"type": "string",
"description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.",
"title": "S3 Key Id",
"airbyte_secret": true
},
"secret_access_key": {
"type": "string",
"description": "The corresponding secret to the above access key id.",
"title": "S3 Access Key",
"airbyte_secret": true
},
"part_size": {
"type": "integer",
"minimum": 10,
"maximum": 100,
"examples": ["10"],
"description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.",
"title": "Stream Part Size"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class S3ParquetWriter extends BaseS3Writer implements S3Writer {

private final ParquetWriter<Record> parquetWriter;
private final AvroRecordFactory avroRecordFactory;
public final Schema parquetSchema;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuliren is there a better way to extract the schema from the parquet writer than this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the right way.

The schema is usually persisted at the beginning of the parquet file, but since we already have it here, we don't need to read it from the file.


public S3ParquetWriter(S3DestinationConfig config,
AmazonS3 s3Client,
Expand Down Expand Up @@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config,
.withDictionaryEncoding(formatConfig.isDictionaryEncoding())
.build();
this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater);
this.parquetSchema = schema;
}

public static Configuration getHadoopConfig(S3DestinationConfig config) {
Expand Down