/
SnowflakeDestination.java
48 lines (39 loc) · 1.95 KB
/
SnowflakeDestination.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.snowflake;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.jdbc.copy.SwitchingDestination;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
// TODO: Remove the Switching Destination from this class as part of code cleanup.
@Slf4j
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {
public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
private final String airbyteEnvironment;
enum DestinationType {
INTERNAL_STAGING
}
public SnowflakeDestination(final String airbyteEnvironment) {
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig,
SnowflakeDestinationResolver.getTypeToDestination(airbyteEnvironment));
this.airbyteEnvironment = airbyteEnvironment;
}
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}
@Override
public Boolean isV2Destination() {
return true;
}
}