Skip to content

Commit

Permalink
:tada Namespace support. Supported source-destination pairs will now …
Browse files Browse the repository at this point in the history
…sync data into the same namespace as the source. (#2862)

This PR introduces the following behavior for JDBC sources:
Instead of streamName = schema.tableName,  this is now streamName = tableName and namespace = schema. This means that, when replicating from these sources, data will be replicated into a form matching the source. e.g. public.users (postgres source) -> public.users (postgres destination) instead of current behaviour of public.public_users. Since MySQL does not have schemas, the MySQL source uses the database as it's namespace.

To do so:
- Make namespace a field class concept in Airbyte Protocol. This allows the source to propagate namespace and destinations to write to a source-defined namespace. Also sets us up for future namespace related configurability.
- Add an optional namespace field to the AirbyteRecordMessage. This field will be set by sources that support namespace.
- Introduce AirbyteStreamNameNamespacePair as a type-safe manner of identifying streams throughout our code base.
- Modify base_normalisation to better support source defined namespace, specifically allowing normalisation of tables with the same name to different schemas.
  • Loading branch information
davinchia committed Apr 17, 2021
1 parent 737e70c commit b9014ac
Show file tree
Hide file tree
Showing 52 changed files with 1,054 additions and 450 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ jobs:
id: publish
env:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
# Oracle expects this variable to be set. Although usually present, this is not set by default on Github virtual runners.
TZ: UTC
- name: Add Success Comment
if: github.event.inputs.comment-id && success()
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
id: test
env:
ACTION_RUN_ID: ${{github.run_id}}
# Oracle expects this variable to be set. Although usually present, this is not set by default on Github virtual runners.
TZ: UTC
- name: Report Status
if: github.ref == 'refs/heads/master' && always()
run: ./tools/status/report.sh ${{ github.event.inputs.connector }} ${{github.repository}} ${{github.run_id}} ${{steps.test.outcome}}
Expand Down
43 changes: 43 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/type/Types.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.type;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class Types {

/**
* Convenience method converting a list to a list of lists of the same type. Each item in the
* original list is inserted into its own list.
*/
public static <T> List<List<T>> boxToListofList(List<T> list) {
var nonNullEntries = list.stream().filter(Objects::nonNull);
return nonNullEntries.map(Collections::singletonList).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ class AirbyteRecordMessage(BaseModel):
class Config:
extra = Extra.allow

stream: str = Field(..., description="the name of the stream for this record")
stream: str = Field(..., description="the name of this record's stream")
data: Dict[str, Any] = Field(..., description="the record data")
emitted_at: int = Field(
...,
description="when the data was emitted from the source. epoch in millisecond.",
)
namespace: Optional[str] = Field(None, description="the namespace of this record's stream")


class AirbyteStateMessage(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.base;

import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/**
* Wraps an {@link io.airbyte.protocol.models.AirbyteStream}'s name and namespace fields to simplify
* comparison checks. This is helpful since these two fields are often used as an Airbyte Stream's
* unique identifiers.
*/
public class AirbyteStreamNameNamespacePair implements Comparable<AirbyteStreamNameNamespacePair> {

final private String name;
final private String namespace;

public AirbyteStreamNameNamespacePair(String name, String namespace) {
this.name = name;
this.namespace = namespace;
}

public String getName() {
return name;
}

public String getNamespace() {
return namespace;
}

@Override
public String toString() {
return "AirbyteStreamNameNamespacePair{" +
"name='" + name + '\'' +
", namespace='" + namespace + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AirbyteStreamNameNamespacePair that = (AirbyteStreamNameNamespacePair) o;
return Objects.equals(name, that.name) && Objects.equals(namespace, that.namespace);
}

@Override
public int hashCode() {
return Objects.hash(name, namespace);
}

@Override
public int compareTo(AirbyteStreamNameNamespacePair o) {
if (o == null) {
return 1;
}

// first sort by name
int nameCheck = name.compareTo(o.getName());
if (nameCheck != 0) {
return nameCheck;
}

// then sort by namespace
if (namespace == null && o.getNamespace() == null) {
return 0;
}
if (namespace == null && o.getNamespace() != null) {
return -1;
}
if (namespace != null && o.getNamespace() == null) {
return 1;
}
return namespace.compareTo(o.getNamespace());
}

public static void main(String[] args) {
System.out.println("test".compareTo(null));
}

public static AirbyteStreamNameNamespacePair fromRecordMessage(AirbyteRecordMessage msg) {
return new AirbyteStreamNameNamespacePair(msg.getStream(), msg.getNamespace());
}

public static AirbyteStreamNameNamespacePair fromAirbyteSteam(AirbyteStream stream) {
return new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace());
}

public static AirbyteStreamNameNamespacePair fromConfiguredAirbyteSteam(ConfiguredAirbyteStream stream) {
return fromAirbyteSteam(stream.getStream());
}

public static Set<AirbyteStreamNameNamespacePair> fromConfiguredCatalog(ConfiguredAirbyteCatalog catalog) {
var pairs = new HashSet<AirbyteStreamNameNamespacePair>();

for (ConfiguredAirbyteStream stream : catalog.getStreams()) {
var pair = fromAirbyteSteam(stream.getStream());
pairs.add(pair);
}

return pairs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableQueue;
import io.airbyte.commons.lang.Queues;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -80,31 +81,31 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private static final int BATCH_SIZE = 10000;

private final VoidCallable onStart;
private final CheckedBiConsumer<String, Stream<AirbyteRecordMessage>, Exception> recordWriter;
private final RecordWriter recordWriter;
private final CheckedConsumer<Boolean, Exception> onClose;
private final Set<String> streamNames;
private final Map<String, CloseableQueue<byte[]>> writeBuffers;
private final Set<AirbyteStreamNameNamespacePair> pairs;
private final Map<AirbyteStreamNameNamespacePair, CloseableQueue<byte[]>> pairToWriteBuffer;
private final ScheduledExecutorService writerPool;
private final ConfiguredAirbyteCatalog catalog;

private boolean hasStarted;

public BufferedStreamConsumer(VoidCallable onStart,
CheckedBiConsumer<String, Stream<AirbyteRecordMessage>, Exception> recordWriter,
RecordWriter recordWriter,
CheckedConsumer<Boolean, Exception> onClose,
ConfiguredAirbyteCatalog catalog,
Set<String> streamNames) {
Set<AirbyteStreamNameNamespacePair> pairs) {
this.hasStarted = false;
this.onStart = onStart;
this.recordWriter = recordWriter;
this.onClose = onClose;
this.catalog = catalog;
this.streamNames = streamNames;
this.pairs = pairs;

this.writerPool = Executors.newSingleThreadScheduledExecutor();
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofMinutes(GRACEFUL_SHUTDOWN_MINUTES), writerPool));

this.writeBuffers = new HashMap<>();
this.pairToWriteBuffer = new HashMap<>();
}

@Override
Expand All @@ -115,19 +116,23 @@ protected void startTracked() throws Exception {

LOGGER.info("{} started.", BufferedStreamConsumer.class);

LOGGER.info("Buffer creation started for {} streams.", streamNames.size());
LOGGER.info("Buffer creation started for {} streams.", pairs.size());
final Path queueRoot = Files.createTempDirectory("queues");
for (String streamName : streamNames) {
LOGGER.info("Buffer creation for stream {}.", streamName);
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(streamName), streamName);
writeBuffers.put(streamName, writeBuffer);
for (AirbyteStreamNameNamespacePair pair : pairs) {
LOGGER.info("Buffer creation for stream {}.", pair);
try {
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(pair.getName()), pair.getName());
pairToWriteBuffer.put(pair, writeBuffer);
} catch (Exception e) {
LOGGER.error("Error creating buffer: ", e);
}
}
LOGGER.info("Buffer creation completed.");

onStart.call();

LOGGER.info("write buffers: {}", pairToWriteBuffer);
writerPool.scheduleWithFixedDelay(
() -> writeStreamsWithNRecords(MIN_RECORDS, streamNames, writeBuffers, recordWriter),
() -> writeStreamsWithNRecords(MIN_RECORDS, pairToWriteBuffer, recordWriter),
THREAD_DELAY_MILLIS,
THREAD_DELAY_MILLIS,
TimeUnit.MILLISECONDS);
Expand All @@ -139,12 +144,13 @@ protected void acceptTracked(AirbyteRecordMessage message) {

// ignore other message types.
final String streamName = message.getStream();
if (!streamNames.contains(streamName)) {
final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromRecordMessage(message);
if (!pairs.contains(pair)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}
writeBuffers.get(streamName).offer(Jsons.serialize(message).getBytes(Charsets.UTF_8));
pairToWriteBuffer.get(pair).offer(Jsons.serialize(message).getBytes(Charsets.UTF_8));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand All @@ -164,22 +170,21 @@ protected void close(boolean hasFailed) throws Exception {
writerPool.awaitTermination(GRACEFUL_SHUTDOWN_MINUTES, TimeUnit.MINUTES);

// write anything that is left in the buffers.
writeStreamsWithNRecords(0, streamNames, writeBuffers, recordWriter);
writeStreamsWithNRecords(0, pairToWriteBuffer, recordWriter);
}

onClose.accept(hasFailed);

for (CloseableQueue<byte[]> writeBuffer : writeBuffers.values()) {
for (CloseableQueue<byte[]> writeBuffer : pairToWriteBuffer.values()) {
writeBuffer.close();
}
}

private static void writeStreamsWithNRecords(int minRecords,
Set<String> streamNames,
Map<String, CloseableQueue<byte[]>> writeBuffers,
CheckedBiConsumer<String, Stream<AirbyteRecordMessage>, Exception> recordWriter) {
for (final String streamName : streamNames) {
final CloseableQueue<byte[]> writeBuffer = writeBuffers.get(streamName);
Map<AirbyteStreamNameNamespacePair, CloseableQueue<byte[]>> pairToWriteBuffers,
RecordWriter recordWriter) {
for (final AirbyteStreamNameNamespacePair pair : pairToWriteBuffers.keySet()) {
final CloseableQueue<byte[]> writeBuffer = pairToWriteBuffers.get(pair);
while (writeBuffer.size() > minRecords) {
try {
final List<AirbyteRecordMessage> records = Queues.toStream(writeBuffer)
Expand All @@ -188,8 +193,8 @@ private static void writeStreamsWithNRecords(int minRecords,
.collect(Collectors.toList());

LOGGER.info("Writing stream {}. Max batch size: {}, Actual batch size: {}, Remaining buffered records: {}",
streamName, BufferedStreamConsumer.BATCH_SIZE, records.size(), writeBuffer.size());
recordWriter.accept(streamName, records.stream());
pair, BufferedStreamConsumer.BATCH_SIZE, records.size(), writeBuffer.size());
recordWriter.accept(pair, records.stream());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -199,10 +204,10 @@ private static void writeStreamsWithNRecords(int minRecords,

public interface OnStartFunction extends VoidCallable {}

public interface RecordWriter extends CheckedBiConsumer<String, Stream<AirbyteRecordMessage>, Exception> {
public interface RecordWriter extends CheckedBiConsumer<AirbyteStreamNameNamespacePair, Stream<AirbyteRecordMessage>, Exception> {

@Override
void accept(String streamName, Stream<AirbyteRecordMessage> recordStream) throws Exception;
void accept(AirbyteStreamNameNamespacePair pair, Stream<AirbyteRecordMessage> recordStream) throws Exception;

}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ WORKDIR /airbyte

ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/normalization
Loading

0 comments on commit b9014ac

Please sign in to comment.