Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Namespace Field. #2704

Merged
merged 21 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class Config:
None,
description="If the source defines the primary key, paths to the fields that will be used as a primary key. If not provided by the source, the end user will have to specify the primary key themselves.",
)
namespace: Optional[str] = Field(
None,
description="Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. Airbyte streams from the same sources should have the same namespace.",
)


class ConfiguredAirbyteStream(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.WriteConfig;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.OnCloseFunction;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.OnStartFunction;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.RecordWriter;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -77,25 +78,45 @@ public static AirbyteMessageConsumer create(JdbcDatabase database,
private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
final Instant now = Instant.now();
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now)).collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(NamingConventionTransformer namingResolver,
JsonNode config,
Instant now) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();

return catalog.getStreams().stream().map(stream -> {
final String streamName = stream.getStream().getName();
final String schemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String defaultSchemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String outputSchema = getOutputSchema(abStream, defaultSchemaName);

final String streamName = abStream.getName();
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
return new WriteConfig(streamName, schemaName, tmpTableName, tableName, syncMode);
}).collect(Collectors.toList());

return new WriteConfig(streamName, outputSchema, tmpTableName, tableName, syncMode);
};
}

/**
* Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default
* schema. This namespace is source-provided, and can be potentially empty.
*/
private static String getOutputSchema(AirbyteStream stream, String defaultDestSchema) {
final String sourceSchema = stream.getNamespace();
if (sourceSchema != null) {
return sourceSchema;
}
return defaultDestSchema;
}

private static OnStartFunction onStartFunction(JdbcDatabase database, SqlOperations sqlOperations, List<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String schemaName = writeConfig.getOutputSchemaName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Preparing tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(),
schemaName, tmpTableName);
Expand All @@ -121,7 +142,7 @@ private static RecordWriter recordWriterFunction(JdbcDatabase database,
}

final WriteConfig writeConfig = streamNameToWriteConfig.get(streamName);
sqlOperations.insertRecords(database, recordStream, writeConfig.getOutputNamespaceName(), writeConfig.getTmpTableName());
sqlOperations.insertRecords(database, recordStream, writeConfig.getOutputSchemaName(), writeConfig.getTmpTableName());
};
}

Expand All @@ -132,7 +153,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
final StringBuilder queries = new StringBuilder();
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String schemaName = writeConfig.getOutputSchemaName();
final String srcTableName = writeConfig.getTmpTableName();
final String dstTableName = writeConfig.getOutputTableName();
LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName,
Expand All @@ -155,7 +176,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
// clean up
LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String schemaName = writeConfig.getOutputSchemaName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName,
tmpTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@
* SOFTWARE.
*/

package io.airbyte.integrations.destination;
package io.airbyte.integrations.destination.jdbc;

import io.airbyte.protocol.models.DestinationSyncMode;

/**
* Write configuration POJO for all destinations extending {@link AbstractJdbcDestination}.
*/
public class WriteConfig {

private final String streamName;
private final String outputNamespaceName;
private final String outputSchemaName;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since namespace is something that doesn't appear in the destination concept space now, I felt it was clearer to rename this field.

private final String tmpTableName;
private final String outputTableName;
private final DestinationSyncMode syncMode;

public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) {
public WriteConfig(String streamName, String outputSchemaName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) {
this.streamName = streamName;
this.outputNamespaceName = outputNamespaceName;
this.outputSchemaName = outputSchemaName;
this.tmpTableName = tmpTableName;
this.outputTableName = outputTableName;
this.syncMode = syncMode;
Expand All @@ -50,8 +53,8 @@ public String getTmpTableName() {
return tmpTableName;
}

public String getOutputNamespaceName() {
return outputNamespaceName;
public String getOutputSchemaName() {
return outputSchemaName;
}

public String getOutputTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

Expand Down Expand Up @@ -262,6 +263,35 @@ void testWriteNewSchema() throws Exception {
assertThrows(RuntimeException.class, () -> recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)));
}

@Test
@DisplayName("Should use Airbyte Stream Namespace as schema if it is present")
void testUseAirbyteStreamNamespaceIfNotNull() throws Exception {
final String srcNamespace = "source_namespace";
JsonNode newConfig = createConfig("default_schema");
final JdbcDestination destination = new JdbcDestination();
CATALOG.getStreams().forEach(stream -> stream.getStream().setNamespace(srcNamespace));

final AirbyteMessageConsumer consumer = destination.getConsumer(newConfig, CATALOG);
consumer.start();
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_TASKS2);
consumer.accept(MESSAGE_STATE);
consumer.close();

String streamName = srcNamespace + "." + NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME);
Set<JsonNode> usersActual = recordRetriever(streamName);
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);

streamName = srcNamespace + "." + NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME);
Set<JsonNode> tasksActual = recordRetriever(streamName);
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);

}

@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testWriteFailure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

@DisplayName("PostgresDestination")
class PostgresDestinationTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
Expand Down Expand Up @@ -272,6 +274,42 @@ void testWriteNewSchema() throws Exception {
assertThrows(RuntimeException.class, () -> recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)));
}

@Test
@DisplayName("Should use Airbyte Stream Namespace as schema if it is present")
void testUseAirbyteStreamNamespaceIfNotNull() throws Exception {
final String srcNamespace = "source_namespace";
JsonNode newConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("schema", "default_schema")
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.build());
final PostgresDestination destination = new PostgresDestination();
CATALOG.getStreams().forEach(stream -> stream.getStream().setNamespace(srcNamespace));

final AirbyteMessageConsumer consumer = destination.getConsumer(newConfig, CATALOG);
consumer.start();
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
consumer.accept(MESSAGE_USERS2);
consumer.accept(MESSAGE_TASKS2);
consumer.accept(MESSAGE_STATE);
consumer.close();

String streamName = srcNamespace + "." + NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME);
Set<JsonNode> usersActual = recordRetriever(streamName);
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);

streamName = srcNamespace + "." + NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME);
Set<JsonNode> tasksActual = recordRetriever(streamName);
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);

}

@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testWriteFailure() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ definitions:
type: array
items:
type: string
namespace:
description: Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. Airbyte streams from the same sources should have the same namespace.
davinchia marked this conversation as resolved.
Show resolved Hide resolved
type: string
ConfiguredAirbyteCatalog:
description: Airbyte stream schema catalog
type: object
Expand Down