-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Destination.java
172 lines (149 loc) · 6.36 KB
/
Destination.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.base;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
public interface Destination extends Integration {
/**
* Return a consumer that writes messages to the destination.
*
* @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param catalog - schema of the incoming messages.
* @return Consumer that accepts message. The {@link AirbyteMessageConsumer#accept(AirbyteMessage)}
* will be called n times where n is the number of messages.
* {@link AirbyteMessageConsumer#close()} will always be called once regardless of success
* or failure.
* @throws Exception - any exception.
*/
AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector)
throws Exception;
/**
* Default implementation allows us to not have to touch existing destinations while avoiding a lot
* of conditional statements in {@link IntegrationRunner}. This is preferred over #getConsumer and
* is the default Async Framework method.
*
* @param config config
* @param catalog catalog
* @param outputRecordCollector outputRecordCollector
* @return AirbyteMessageConsumer wrapped in SerializedAirbyteMessageConsumer to maintain legacy
* behavior.
* @throws Exception exception
*/
default SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector));
}
static void defaultOutputRecordCollector(final AirbyteMessage message) {
System.out.println(Jsons.serialize(message));
}
/**
* Backwards-compatibility wrapper for an AirbyteMessageConsumer. Strips the sizeInBytes argument
* away from the .accept call.
*/
@Slf4j
class ShimToSerializedAirbyteMessageConsumer implements SerializedAirbyteMessageConsumer {
private final AirbyteMessageConsumer consumer;
public ShimToSerializedAirbyteMessageConsumer(final AirbyteMessageConsumer consumer) {
this.consumer = consumer;
}
@Override
public void start() throws Exception {
consumer.start();
}
/**
* Consumes an {@link AirbyteMessage} for processing.
* <p>
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
*
* @param inputString JSON representation of an {@link AirbyteMessage}.
* @throws Exception if an invalid state message is provided or the consumer is unable to accept the
* provided message.
*/
@Override
public void accept(final String inputString, final Integer sizeInBytes) throws Exception {
consumeMessage(consumer, inputString);
}
@Override
public void close() throws Exception {
consumer.close();
}
/**
* Consumes an {@link AirbyteMessage} for processing.
* <p>
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
*
* @param consumer An {@link AirbyteMessageConsumer} that can handle the provided message.
* @param inputString JSON representation of an {@link AirbyteMessage}.
* @throws Exception if an invalid state message is provided or the consumer is unable to accept the
* provided message.
*/
@VisibleForTesting
static void consumeMessage(final AirbyteMessageConsumer consumer, final String inputString) throws Exception {
final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
if (isStateMessage(inputString)) {
throw new IllegalStateException("Invalid state message: " + inputString);
} else {
log.error("Received invalid message: " + inputString);
}
}
}
/**
* Tests whether the provided JSON string represents a state message.
*
* @param input a JSON string that represents an {@link AirbyteMessage}.
* @return {@code true} if the message is a state message, {@code false} otherwise.
*/
@SuppressWarnings("OptionalIsPresent")
private static boolean isStateMessage(final String input) {
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class);
if (deserialized.isPresent()) {
return deserialized.get().getType() == Type.STATE;
} else {
return false;
}
}
/**
* Custom class for parsing a JSON message to determine the type of the represented
* {@link AirbyteMessage}. Do the bare minimum deserialisation by reading only the type field.
*/
private static class AirbyteTypeMessage {
@JsonProperty("type")
@JsonPropertyDescription("Message type")
private AirbyteMessage.Type type;
@JsonProperty("type")
public AirbyteMessage.Type getType() {
return type;
}
@JsonProperty("type")
public void setType(final AirbyteMessage.Type type) {
this.type = type;
}
}
}
/**
* Denotes if the destination fully supports Destinations V2.
*/
default Boolean isV2Destination() {
return false;
}
}