Skip to content

Commit

Permalink
15310: Destination Scylla: Handle per-stream state (#15399)
Browse files Browse the repository at this point in the history
* 15310: Destination Scylla: Handle per-stream state

* 15399: test fix

* 15318: test fix

* 15318: updating version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
kimerinn and octavia-squidington-iii committed Aug 10, 2022
1 parent c724630 commit aaa3aae
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 8 deletions.
Expand Up @@ -297,7 +297,7 @@
- name: Scylla
destinationDefinitionId: 3dc6f384-cd6b-4be3-ad16-a41450899bf0
dockerRepository: airbyte/destination-scylla
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/scylla
icon: scylla.svg
releaseStage: alpha
Expand Down
Expand Up @@ -5040,7 +5040,7 @@
supported_destination_sync_modes:
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-scylla:0.1.2"
- dockerImage: "airbyte/destination-scylla:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/scylla"
connectionSpecification:
Expand Down
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-scylla

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-scylla
Expand Up @@ -20,6 +20,7 @@ dependencies {

implementation "com.scylladb:scylla-driver-core:${scyllaDriver}"

testImplementation project(':airbyte-integrations:bases:standard-destination-test')
// https://mvnrepository.com/artifact/org.assertj/assertj-core
testImplementation "org.assertj:assertj-core:${assertVersion}"
// https://mvnrepository.com/artifact/org.testcontainers/testcontainers
Expand Down
Expand Up @@ -27,8 +27,6 @@ public class ScyllaMessageConsumer extends FailureTrackingAirbyteMessageConsumer

private final ScyllaCqlProvider scyllaCqlProvider;

private AirbyteMessage lastMessage = null;

public ScyllaMessageConsumer(ScyllaConfig scyllaConfig,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
Expand Down Expand Up @@ -66,7 +64,7 @@ protected void acceptTracked(AirbyteMessage message) {
var data = Jsons.serialize(messageRecord.getData());
scyllaCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data);
} else if (message.getType() == AirbyteMessage.Type.STATE) {
this.lastMessage = message;
outputRecordCollector.accept(message);
} else {
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
}
Expand All @@ -92,7 +90,6 @@ protected void close(boolean hasFailed) {
LOGGER.error("Error while copying data to table {}: ", v.getTableName(), e);
}
});
outputRecordCollector.accept(lastMessage);
}

scyllaStreams.forEach((k, v) -> {
Expand Down
@@ -0,0 +1,71 @@
package io.airbyte.integrations.destination.scylla;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.GenericContainer;

@DisplayName("ScyllaRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest {
private static ScyllaContainer scyllaContainer;

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private ScyllaMessageConsumer consumer;

@Mock
ScyllaConfig scyllaConfig;

@Mock
private ConfiguredAirbyteCatalog configuredCatalog;

public static ScyllaContainer initContainer() {
if (scyllaContainer == null) {
scyllaContainer = new ScyllaContainer()
.withExposedPorts(9042)
// single cpu core cluster
.withCommand("--smp 1");
}
scyllaContainer.start();
return scyllaContainer;
}

@BeforeEach
public void init() {
ScyllaContainer scyllaContainer = initContainer();
JsonNode configJson = TestDataFactory.jsonConfig(
HostPortResolver.resolveHost(scyllaContainer),
HostPortResolver.resolvePort(scyllaContainer));
var scyllaConfig = new ScyllaConfig(configJson);
consumer = new ScyllaMessageConsumer(scyllaConfig, configuredCatalog, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

static class ScyllaContainer extends GenericContainer<ScyllaContainer> {

public ScyllaContainer() {
super("scylladb/scylla:4.5.0");
}

}
}
5 changes: 4 additions & 1 deletion docs/integrations/destinations/scylla.md
Expand Up @@ -42,5 +42,8 @@ and handle any amount of data from the connector.
* Replication [optional] [default: 1]

### Setup guide
## Changelog

###### TODO: more info, screenshots?, etc...
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------|
| 0.1.3 | 2022-08-10 | [153999](https://github.com/airbytehq/airbyte/pull/15399) | handling per-stream state |

0 comments on commit aaa3aae

Please sign in to comment.