/
DatabricksDestination.java
83 lines (70 loc) · 3.05 KB
/
DatabricksDestination.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.databricks;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
public class DatabricksDestination extends CopyDestination {
public DatabricksDestination() {
super("database_schema");
}
public static void main(final String[] args) throws Exception {
new IntegrationRunner(new DatabricksDestination()).run(args);
}
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
return CopyConsumerFactory.create(
outputRecordCollector,
getDatabase(config),
getSqlOperations(),
getNameTransformer(),
databricksConfig,
catalog,
new DatabricksStreamCopierFactory(),
databricksConfig.getDatabaseSchema());
}
@Override
public void checkPersistence(final JsonNode config) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
S3Destination.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig(), "");
}
@Override
public ExtendedNameTransformer getNameTransformer() {
return new DatabricksNameTransformer();
}
@Override
public JdbcDatabase getDatabase(final JsonNode jsonConfig) {
return getDatabase(DatabricksDestinationConfig.get(jsonConfig));
}
@Override
public SqlOperations getSqlOperations() {
return new DatabricksSqlOperations();
}
static String getDatabricksConnectionString(final DatabricksDestinationConfig databricksConfig) {
return String.format("jdbc:spark://%s:%s/default;transportMode=http;ssl=1;httpPath=%s;UserAgentEntry=Airbyte",
databricksConfig.getDatabricksServerHostname(),
databricksConfig.getDatabricksPort(),
databricksConfig.getDatabricksHttpPath());
}
static JdbcDatabase getDatabase(final DatabricksDestinationConfig databricksConfig) {
return Databases.createJdbcDatabase(
DatabricksConstants.DATABRICKS_USERNAME,
databricksConfig.getDatabricksPersonalAccessToken(),
getDatabricksConnectionString(databricksConfig),
DatabricksConstants.DATABRICKS_DRIVER_CLASS);
}
}