-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
RedshiftStagingS3Destination.java
278 lines (256 loc) · 14.6 KB
/
RedshiftStagingS3Destination.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.redshift;
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig.getS3DestinationConfig;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS;
import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig;
import static io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants.UPLOADING_METHOD;
import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig;
import io.airbyte.cdk.integrations.destination.s3.NoEncryption;
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks;
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations;
import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedshiftStagingS3Destination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStagingS3Destination.class);
public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
public RedshiftStagingS3Destination() {
super(RedshiftInsertDestination.DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations());
}
private boolean isEphemeralKeysAndPurgingStagingData(final JsonNode config, final EncryptionConfig encryptionConfig) {
return !isPurgeStagingData(config) && encryptionConfig instanceof final AesCbcEnvelopeEncryption c && c.keyType() == KeyType.EPHEMERAL;
}
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final S3DestinationConfig s3Config = getS3DestinationConfig(findS3Options(config));
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(
"You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt.");
}
S3BaseChecks.attemptS3WriteAndDelete(new S3StorageOperations(new RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), s3Config,
s3Config.getBucketPath());
final NamingConventionTransformer nameTransformer = getNamingResolver();
final RedshiftS3StagingSqlOperations redshiftS3StagingSqlOperations =
new RedshiftS3StagingSqlOperations(nameTransformer, s3Config.getS3Client(), s3Config, encryptionConfig);
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get(JdbcUtils.SCHEMA_KEY).asText());
attemptTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations, false);
RedshiftUtil.checkSvvTableAccess(database);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException e) {
final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e);
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(message);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
} finally {
try {
DataSourceFactory.close(dataSource);
} catch (final Exception e) {
LOGGER.warn("Unable to close data source.", e);
}
}
}
@Override
public DataSource getDataSource(final JsonNode config) {
final var jdbcConfig = getJdbcConfig(config);
return DataSourceFactory.create(
jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(),
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
RedshiftInsertDestination.DRIVER_CLASS,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
getDefaultConnectionProperties(config),
Duration.ofMinutes(2));
}
@Override
protected NamingConventionTransformer getNamingResolver() {
return new RedshiftSQLNameTransformer();
}
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
// TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination into a
// base class.
// The following properties can be overriden through jdbcUrlParameters in the config.
final Map<String, String> connectionOptions = new HashMap<>();
// Redshift properties
// https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option
// connectTimeout is different from Hikari pool's connectionTimout, driver defaults to 10seconds so
// increase it to match hikari's default
connectionOptions.put("connectTimeout", "120");
// HikariPool properties
// https://github.com/brettwooldridge/HikariCP?tab=readme-ov-file#frequently-used
// connectionTimeout is set explicitly to 2 minutes when creating data source.
// Do aggressive keepAlive with minimum allowed value, this only applies to connection sitting idle
// in the pool.
connectionOptions.put("keepaliveTime", Long.toString(Duration.ofSeconds(30).toMillis()));
connectionOptions.putAll(SSL_JDBC_PARAMETERS);
return connectionOptions;
}
// this is a no op since we override getDatabase.
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.emptyObject();
}
@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new RedshiftSqlGenerator(getNamingResolver());
}
@Override
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
}
@Override
@Deprecated
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
throw new NotImplementedException("Should use the getSerializedMessageConsumer instead");
}
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final EncryptionConfig encryptionConfig =
config.has(UPLOADING_METHOD) ? EncryptionConfig.fromJson(config.get(UPLOADING_METHOD).get(JdbcUtils.ENCRYPTION_KEY)) : new NoEncryption();
final JsonNode s3Options = findS3Options(config);
final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options);
final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options);
if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) {
LOGGER.warn("""
Increasing the number of file buffers past {} can lead to increased performance but
leads to increased memory usage. If the number of file buffers exceeds the number
of streams {} this will create more buffers than necessary, leading to nonexistent gains
""", FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size());
}
final String defaultNamespace = config.get("schema").asText();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (StringUtils.isEmpty(stream.getStream().getNamespace())) {
stream.getStream().setNamespace(defaultNamespace);
}
}
final RedshiftSqlGenerator sqlGenerator = new RedshiftSqlGenerator(getNamingResolver());
final ParsedCatalog parsedCatalog;
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database);
final CatalogParser catalogParser;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
} else {
catalogParser = new CatalogParser(sqlGenerator);
}
parsedCatalog = catalogParser.parseCatalog(catalog);
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final int defaultThreadCount = 8;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
defaultThreadCount);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
}
return new StagingConsumerFactory().createAsync(
outputRecordCollector,
database,
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
getNamingResolver(),
config,
catalog,
isPurgeStagingData(s3Options),
new TypeAndDedupeOperationValve(),
typerDeduper,
parsedCatalog,
defaultNamespace,
true);
}
/**
* Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of
* file buffers and sets the minimum number to the default
*
* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has
* not been thoroughly load tested across all instance sizes
*
* @param config user configurations
* @return number of file buffers if configured otherwise default
*/
@VisibleForTesting
public int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}
private boolean isPurgeStagingData(final JsonNode config) {
return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean();
}
}