Skip to content

Commit

Permalink
Remove schema from stream name. (#2807)
Browse files Browse the repository at this point in the history
Last step (besides documentation) of namespace changes. This is a follow up to #2767 .

After this change, the following JDBC sources will change their behaviour to the behaviour described in the above document.

Namely, instead of streamName = schema.tableName, this will become streamName = tableName and namespace = schema. This means that, when replicating from these sources, data will be replicated into a form matching the source. e.g. public.users (postgres source) -> public.users (postgres destination) instead of current behaviour of public.public_users. Since MySQL does not have schemas, the MySQL source uses the database as it's namespace.

I cleaned up some bits of the CatalogHelpers. This affected the destinations, so I'm also running the destination tests.
  • Loading branch information
davinchia committed Apr 12, 2021
1 parent 08621d8 commit 6e9d6fc
Show file tree
Hide file tree
Showing 29 changed files with 378 additions and 199 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Expand Up @@ -90,6 +90,7 @@ jobs:
id: publish
env:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
# Oracle expects this variable to be set. Although usually present, this is not set by default on Github virtual runners.
TZ: UTC
- name: Add Success Comment
if: github.event.inputs.comment-id && success()
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Expand Up @@ -89,6 +89,8 @@ jobs:
id: test
env:
ACTION_RUN_ID: ${{github.run_id}}
# Oracle expects this variable to be set. Although usually present, this is not set by default on Github virtual runners.
TZ: UTC
- name: Report Status
if: github.ref == 'refs/heads/master' && always()
run: ./tools/status/report.sh ${{ github.event.inputs.connector }} ${{github.repository}} ${{github.run_id}} ${{steps.test.outcome}}
Expand Down
43 changes: 43 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/type/Types.java
@@ -0,0 +1,43 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.type;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class Types {

/**
* Convenience method converting a list to a list of lists of the same type. Each item in the
* original list is inserted into its own list.
*/
public static <T> List<List<T>> boxToListofList(List<T> list) {
var nonNullEntries = list.stream().filter(Objects::nonNull);
return nonNullEntries.map(Collections::singletonList).collect(Collectors.toList());
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Expand Up @@ -19,5 +19,5 @@ WORKDIR /airbyte

ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/normalization
Expand Up @@ -53,14 +53,14 @@ def __init__(self, output_directory: str, destination_type: DestinationType):
self.destination_type: DestinationType = destination_type
self.name_transformer: DestinationNameTransformer = DestinationNameTransformer(destination_type)

def process(self, catalog_file: str, json_column_name: str, target_schema: str):
def process(self, catalog_file: str, json_column_name: str, default_schema: str):
"""
This method first parse and build models to handle top-level streams.
In a second loop will go over the substreams that were nested in a breadth-first traversal manner.
@param catalog_file input AirbyteCatalog file in JSON Schema describing the structure of the raw data
@param json_column_name is the column name containing the JSON Blob with the raw data
@param target_schema is the final schema where to output the final transformed data to
@param default_schema is the final schema where to output the final transformed data to
"""
# Registry of all tables in all schemas
tables_registry: Set[str] = set()
Expand All @@ -73,7 +73,7 @@ def process(self, catalog_file: str, json_column_name: str, target_schema: str):
for stream_processor in self.build_stream_processor(
catalog=catalog,
json_column_name=json_column_name,
target_schema=target_schema,
default_schema=default_schema,
name_transformer=self.name_transformer,
destination_type=self.destination_type,
tables_registry=tables_registry,
Expand All @@ -98,16 +98,22 @@ def process(self, catalog_file: str, json_column_name: str, target_schema: str):
def build_stream_processor(
catalog: Dict,
json_column_name: str,
target_schema: str,
default_schema: str,
name_transformer: DestinationNameTransformer,
destination_type: DestinationType,
tables_registry: Set[str],
) -> List[StreamProcessor]:
result = []
for configured_stream in get_field(catalog, "streams", "Invalid Catalog: 'streams' is not defined in Catalog"):
stream_config = get_field(configured_stream, "stream", "Invalid Stream: 'stream' is not defined in Catalog streams")
schema_name = name_transformer.normalize_schema_name(target_schema)
raw_schema_name = name_transformer.normalize_schema_name(f"_airbyte_{target_schema}", truncate=False)

# The logic here matches the logic in JdbcBufferedConsumerFactory.java. Any modifications need to be reflected there and vice versa.
schema = default_schema
if "namespace" in stream_config:
schema = stream_config["namespace"]

schema_name = name_transformer.normalize_schema_name(schema)
raw_schema_name = name_transformer.normalize_schema_name(f"_airbyte_{schema}", truncate=False)
stream_name = get_field(stream_config, "name", f"Invalid Stream: 'name' is not defined in stream: {str(stream_config)}")
raw_table_name = name_transformer.normalize_table_name(f"_airbyte_raw_{stream_name}", truncate=False)

Expand Down
Expand Up @@ -78,7 +78,7 @@ def process_catalog(self) -> None:
processor = CatalogProcessor(output_directory=output, destination_type=destination_type)
for catalog_file in self.config["catalog"]:
print(f"Processing {catalog_file}...")
processor.process(catalog_file=catalog_file, json_column_name=json_col, target_schema=schema)
processor.process(catalog_file=catalog_file, json_column_name=json_col, default_schema=schema)


def read_profiles_yml(profile_dir: str) -> Any:
Expand Down
Expand Up @@ -69,7 +69,7 @@ def test_stream_processor_tables_naming(integration_type: str, catalog_file: str
for stream_processor in CatalogProcessor.build_stream_processor(
catalog=catalog,
json_column_name="'json_column_name_test'",
target_schema="schema_test",
default_schema="schema_test",
name_transformer=DestinationNameTransformer(destination_type),
destination_type=destination_type,
tables_registry=tables_registry,
Expand Down
Expand Up @@ -257,7 +257,8 @@ public void testDiscover() throws Exception {
*/
@Test
public void testFullRefreshRead() throws Exception {
final List<AirbyteMessage> allMessages = runRead(withFullRefreshSyncModes(getConfiguredCatalog()));
ConfiguredAirbyteCatalog catalog = withFullRefreshSyncModes(getConfiguredCatalog());
final List<AirbyteMessage> allMessages = runRead(catalog);
final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).collect(Collectors.toList());
// the worker validates the message formats, so we just validate the message content
// We don't need to validate message format as long as we use the worker, which we will not want to
Expand Down
Expand Up @@ -104,18 +104,13 @@ class BigQueryDestinationTest {
private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));

private static final NamingConventionTransformer NAMING_RESOLVER = new StandardNameTransformer();

private JsonNode config;

private BigQuery bigquery;
private Dataset dataset;
private ConfiguredAirbyteCatalog catalog;

private boolean tornDown = true;

Expand All @@ -142,6 +137,13 @@ void setup(TestInfo info) throws IOException {

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);

catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId,
io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaPrimitive.STRING))));

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
dataset = bigquery.create(datasetInfo);

Expand Down Expand Up @@ -216,7 +218,7 @@ void testCheckFailure() {
@Test
void testWriteSuccess() throws Exception {
final BigQueryDestination destination = new BigQueryDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, CATALOG);
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog);

consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
Expand All @@ -235,7 +237,7 @@ void testWriteSuccess() throws Exception {
assertEquals(expectedTasksJson.size(), tasksActual.size());
assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson));

assertTmpTablesNotPresent(CATALOG.getStreams()
assertTmpTablesNotPresent(catalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
Expand All @@ -248,18 +250,18 @@ void testWriteFailure() throws Exception {
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();

final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, CATALOG));
final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, catalog));

assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage));
consumer.accept(MESSAGE_USERS2);
consumer.close();

final List<String> tableNames = CATALOG.getStreams()
final List<String> tableNames = catalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.collect(toList());
assertTmpTablesNotPresent(CATALOG.getStreams()
assertTmpTablesNotPresent(catalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
Expand Down
Expand Up @@ -97,9 +97,9 @@ class CsvDestinationTest {
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING),
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, null, Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, null, Field.of("goal", JsonSchemaPrimitive.STRING))));

private Path destinationPath;
private JsonNode config;
Expand Down
Expand Up @@ -103,6 +103,9 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(Nami
/**
* Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default
* schema. This namespace is source-provided, and can be potentially empty.
*
* The logic here matches the logic in the catalog_process.py for Normalization. Any modifications
* need to be reflected there and vice versa.
*/
private static String getOutputSchema(AirbyteStream stream, String defaultDestSchema) {
final String sourceSchema = stream.getNamespace();
Expand Down
Expand Up @@ -93,9 +93,9 @@ class LocalJsonDestinationTest {
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING),
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, null, Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, null, Field.of("goal", JsonSchemaPrimitive.STRING))));

private Path destinationPath;
private JsonNode config;
Expand Down

0 comments on commit 6e9d6fc

Please sign in to comment.