Skip to content

Commit

Permalink
馃帀 Destination snowflake: reduce memory consumption (#10297)
Browse files Browse the repository at this point in the history
* Avoid redundant adapter construction

* Remove unused logger

* Avoid redundant creation of buffer map

* Decrease max batch byte size to 128 mb

* Format code

* Move data adapter to an instance variable

* Bump version

* Bump version in seed
  • Loading branch information
tuliren committed Feb 15, 2022
1 parent ed276f4 commit 6301cfa
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 77 deletions.
Expand Up @@ -185,7 +185,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.9
dockerImageTag: 0.4.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
24 changes: 11 additions & 13 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Expand Up @@ -3817,7 +3817,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.9"
- dockerImage: "airbyte/destination-snowflake:0.4.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -3899,32 +3899,30 @@
description: "The loading method used to send data to Snowflake."
order: 8
oneOf:
- title: "[Recommended] Internal Staging"
- title: "Select another option"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
\ to Snowflake, then uses <pre>COPY INTO table</pre> to upload the file.\
\ Recommended for large production workloads for better speed and scalability."
description: "Select another option"
required:
- "method"
properties:
method:
type: "string"
enum:
- "Internal Staging"
default: "Internal Staging"
- title: "Standard Inserts"
- "Standard"
default: "Standard"
- title: "[Recommended] Internal Staging"
additionalProperties: false
description: "Uses <pre>INSERT</pre> statements to send batches of records\
\ to Snowflake. Easiest (no setup) but not recommended for large production\
\ workloads due to slow speed."
description: "Writes large batches of records to a file, uploads the file\
\ to Snowflake, then uses <pre>COPY INTO table</pre> to upload the file.\
\ Recommended for large production workloads for better speed and scalability."
required:
- "method"
properties:
method:
type: "string"
enum:
- "Standard"
default: "Standard"
- "Internal Staging"
default: "Internal Staging"
- title: "AWS S3 Staging"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
Expand Down
Expand Up @@ -23,10 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,13 +79,13 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private final RecordWriter recordWriter;
private final CheckedConsumer<Boolean, Exception> onClose;
private final Set<AirbyteStreamNameNamespacePair> streamNames;
private final List<AirbyteMessage> buffer;
private final ConfiguredAirbyteCatalog catalog;
private final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord;
private final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount;
private final Map<AirbyteStreamNameNamespacePair, Long> streamToIgnoredRecordCount;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final long maxQueueSizeInBytes;
private long bufferSizeInBytes;
private Map<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> streamBuffer;

private boolean hasStarted;
private boolean hasClosed;
Expand All @@ -112,9 +110,9 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.catalog = catalog;
this.streamNames = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
this.isValidRecord = isValidRecord;
this.buffer = new ArrayList<>(10_000);
this.bufferSizeInBytes = 0;
this.pairToIgnoredRecordCount = new HashMap<>();
this.streamToIgnoredRecordCount = new HashMap<>();
this.streamBuffer = new HashMap<>();
}

@Override
Expand All @@ -123,7 +121,7 @@ protected void startTracked() throws Exception {
Preconditions.checkState(!hasStarted, "Consumer has already been started.");
hasStarted = true;

pairToIgnoredRecordCount.clear();
streamToIgnoredRecordCount.clear();
LOGGER.info("{} started.", BufferedStreamConsumer.class);

onStart.call();
Expand All @@ -141,7 +139,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
}

if (!isValidRecord.apply(message.getRecord().getData())) {
pairToIgnoredRecordCount.put(stream, pairToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L);
streamToIgnoredRecordCount.put(stream, streamToIgnoredRecordCount.getOrDefault(stream, 0L) + 1L);
return;
}

Expand All @@ -151,15 +149,12 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
final long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
LOGGER.info("Flushing buffer...");
AirbyteSentry.executeWithTracing("FlushBuffer",
this::flushQueueToDestination,
Map.of("stream", stream.getName(),
"namespace", Objects.requireNonNullElse(stream.getNamespace(), "null"),
"bufferSizeInBytes", bufferSizeInBytes));
flushQueueToDestination(bufferSizeInBytes);
bufferSizeInBytes = 0;
}

buffer.add(message);
final List<AirbyteRecordMessage> bufferedRecords = streamBuffer.computeIfAbsent(stream, k -> new ArrayList<>());
bufferedRecords.add(message.getRecord());
bufferSizeInBytes += messageSizeInBytes;

} else if (message.getType() == Type.STATE) {
Expand All @@ -170,16 +165,13 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {

}

private void flushQueueToDestination() throws Exception {
final Map<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> recordsByStream = buffer.stream()
.map(AirbyteMessage::getRecord)
.collect(Collectors.groupingBy(AirbyteStreamNameNamespacePair::fromRecordMessage));

buffer.clear();

for (final Map.Entry<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> entry : recordsByStream.entrySet()) {
recordWriter.accept(entry.getKey(), entry.getValue());
}
private void flushQueueToDestination(long bufferSizeInBytes) throws Exception {
AirbyteSentry.executeWithTracing("FlushBuffer", () -> {
for (final Map.Entry<AirbyteStreamNameNamespacePair, List<AirbyteRecordMessage>> entry : streamBuffer.entrySet()) {
recordWriter.accept(entry.getKey(), entry.getValue());
}
}, Map.of("bufferSizeInBytes", bufferSizeInBytes));
streamBuffer = new HashMap<>();

if (pendingState != null) {
lastFlushedState = pendingState;
Expand All @@ -199,13 +191,13 @@ protected void close(final boolean hasFailed) throws Exception {
Preconditions.checkState(!hasClosed, "Has already closed.");
hasClosed = true;

pairToIgnoredRecordCount
streamToIgnoredRecordCount
.forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair));
if (hasFailed) {
LOGGER.error("executing on failed close procedure.");
} else {
LOGGER.info("executing on success close procedure.");
flushQueueToDestination();
flushQueueToDestination(bufferSizeInBytes);
}

try {
Expand Down
Expand Up @@ -8,13 +8,9 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(DataAdapter.class);

private final Predicate<JsonNode> filterValueNode;
private final Function<JsonNode, JsonNode> valueNodeAdapter;

Expand Down
Expand Up @@ -19,18 +19,28 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public abstract class JdbcSqlOperations implements SqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSqlOperations.class);
protected static final String SHOW_SCHEMAS = "show schemas;";
protected static final String NAME = "name";

// this adapter modifies record message before inserting them to the destination
protected final Optional<DataAdapter> dataAdapter;

protected JdbcSqlOperations() {
this.dataAdapter = Optional.empty();
}

protected JdbcSqlOperations(final DataAdapter dataAdapter) {
this.dataAdapter = Optional.of(dataAdapter);
}

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
if (!isSchemaExists(database, schemaName)) {
Expand Down Expand Up @@ -63,21 +73,14 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
}

protected void writeBatchToFile(final File tmpFile, final List<AirbyteRecordMessage> records) throws Exception {
PrintWriter writer = null;
try {
writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
final var csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);

try (final PrintWriter writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
final CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) {
for (final AirbyteRecordMessage record : records) {
final var uuid = UUID.randomUUID().toString();
final var jsonData = Jsons.serialize(formatData(record.getData()));
final var emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
csvPrinter.printRecord(uuid, jsonData, emittedAt);
}
} finally {
if (writer != null) {
writer.close();
}
}
}

Expand Down Expand Up @@ -137,7 +140,7 @@ public final void insertRecords(final JdbcDatabase database,
throws Exception {
AirbyteSentry.executeWithTracing("InsertRecords",
() -> {
records.forEach(airbyteRecordMessage -> getDataAdapter().adapt(airbyteRecordMessage.getData()));
dataAdapter.ifPresent(adapter -> records.forEach(airbyteRecordMessage -> adapter.adapt(airbyteRecordMessage.getData())));
insertRecordsInternal(database, records, schemaName, tableName);
},
Map.of("schema", Objects.requireNonNullElse(schemaName, "null"), "table", tableName, "recordCount", records.size()));
Expand All @@ -149,8 +152,4 @@ protected abstract void insertRecordsInternal(JdbcDatabase database,
String tableName)
throws Exception;

protected DataAdapter getDataAdapter() {
return new DataAdapter(j -> false, c -> c);
}

}
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.destination.postgres;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.DataAdapter;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.BufferedReader;
Expand All @@ -17,12 +16,12 @@
import java.util.List;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlOperations.class);
public PostgresSqlOperations() {
super(new PostgresDataAdapter());
}

@Override
public void insertRecordsInternal(final JdbcDatabase database,
Expand Down Expand Up @@ -58,9 +57,4 @@ public void insertRecordsInternal(final JdbcDatabase database,
});
}

@Override
protected DataAdapter getDataAdapter() {
return new PostgresDataAdapter();
}

}
Expand Up @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV APPLICATION_VERSION 0.4.9
ENV APPLICATION_VERSION 0.4.10
ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.9
LABEL io.airbyte.version=0.4.10
LABEL io.airbyte.name=airbyte/destination-snowflake
Expand Up @@ -10,13 +10,9 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestination.class);

enum DestinationType {
COPY_S3,
COPY_GCS,
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class SnowflakeInternalStagingConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingConsumerFactory.class);

private static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256mb
private static final long MAX_BATCH_SIZE_BYTES = 128 * 1024 * 1024; // 128mb
private final String CURRENT_SYNC_PATH = UUID.randomUUID().toString();

public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/destinations/snowflake.md
Expand Up @@ -217,6 +217,8 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.4.10 | 2022-02-14 | [\#10297](https://github.com/airbytehq/airbyte/pull/10297) | Halve the record buffer size to reduce memory consumption. |
| 0.4.9 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `ExitOnOutOfMemoryError` JVM flag. |
| 0.4.8 | 2022-02-01 | [\#9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. |
| 0.4.7 | 2022-01-29 | [\#9745](https://github.com/airbytehq/airbyte/pull/9745) | Integrate with Sentry. |
| 0.4.6 | 2022-01-28 | [#9623](https://github.com/airbytehq/airbyte/pull/9623) | Add jdbc_url_params support for optional JDBC parameters |
Expand Down

0 comments on commit 6301cfa

Please sign in to comment.