Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Destination S3 and GCS - Fixed connector's bug that prevent writing streams with more than 50GB #5890

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DefaultCheckConnectionWorker;
Expand All @@ -82,9 +84,12 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
Expand Down Expand Up @@ -1156,4 +1161,125 @@ public String toString() {

}

/**
* This test MUST be disabled by default, but you may uncomment it and use when need to reproduce a
* performance issue for destination. This test helps you to emulate lot's of stream and messages in
* each simply changing the "streamsSize" args to set a number of tables\streams and the
* "messagesNumber" to a messages number that would be written in each stream. !!! Do NOT forget to
* manually remove all generated objects !!! Hint: To check the destination container output run
* "docker ps" command in console to find the container's id. Then run "docker container attach
* your_containers_id" (ex. docker container attach 18cc929f44c8) to see the container's output
*/
@Test
@Disabled
public void testStressPerformance() throws Exception {
final int streamsSize = 5; // number of generated streams
final int messagesNumber = 300; // number of msg to be written to each generated stream

// Each stream will have an id and name fields
final String USERS_STREAM_NAME = "users"; // stream's name prefix. Will get "user0", "user1", etc.
final String ID = "id";
final String NAME = "name";

// generate schema\catalogs
List<AirbyteStream> configuredAirbyteStreams = new ArrayList<>();
for (int i = 0; i < streamsSize; i++) {
configuredAirbyteStreams
.add(CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME + i,
Field.of(NAME, JsonSchemaPrimitive.STRING),
Field
.of(ID, JsonSchemaPrimitive.STRING)));
}
final AirbyteCatalog testCatalog = new AirbyteCatalog().withStreams(configuredAirbyteStreams);
final ConfiguredAirbyteCatalog configuredTestCatalog = CatalogHelpers
.toDefaultConfiguredCatalog(testCatalog);

final JsonNode config = getConfig();
final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig()
.withConnectionId(UUID.randomUUID())
.withCatalog(configuredTestCatalog)
.withDestinationConnectionConfiguration(config);
final AirbyteDestination destination = getDestination();

// Start destination
destination.start(destinationConfig, jobRoot);

AtomicInteger currentStreamNumber = new AtomicInteger(0);
AtomicInteger currentRecordNumberForStream = new AtomicInteger(0);

// this is just a current state logger. Useful when running long hours tests to see the progress
Thread countPrinter = new Thread(() -> {
while (true) {
System.out.println(
"currentStreamNumber=" + currentStreamNumber + ", currentRecordNumberForStream="
+ currentRecordNumberForStream + ", " + DateTime.now());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
countPrinter.start();

// iterate through streams
for (int streamCounter = 0; streamCounter < streamsSize; streamCounter++) {
LOGGER.info("Started new stream processing with #" + streamCounter);
// iterate through msm inside a particular stream
// Generate messages and put it to stream
for (int msgCounter = 0; msgCounter < messagesNumber; msgCounter++) {
AirbyteMessage msg = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME + streamCounter)
.withData(
Jsons.jsonNode(
ImmutableMap.builder().put(NAME, LOREM_IPSUM)
.put(ID, streamCounter + "_" + msgCounter)
.build()))
.withEmittedAt(Instant.now().toEpochMilli()));
try {
destination.accept(msg);
} catch (Exception e) {
LOGGER.error("Failed to write a RECORD message: " + e);
throw new RuntimeException(e);
}

currentRecordNumberForStream.set(msgCounter);
}

// send state message here, it's required
AirbyteMessage msgState = new AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage()
.withData(
Jsons.jsonNode(ImmutableMap.builder().put("start_date", "2020-09-02").build())));
try {
destination.accept(msgState);
} catch (Exception e) {
LOGGER.error("Failed to write a STATE message: " + e);
throw new RuntimeException(e);
}

currentStreamNumber.set(streamCounter);
}

LOGGER.info(String
.format("Added %s messages to each of %s streams", currentRecordNumberForStream,
currentStreamNumber));
// Close destination
destination.notifyEndOfStream();
}

private final static String LOREM_IPSUM =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque malesuada lacinia aliquet. Nam feugiat mauris vel magna dignissim feugiat. Nam non dapibus sapien, ac mattis purus. Donec mollis libero erat, a rutrum ipsum pretium id. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Integer nec aliquam leo. Aliquam eu dictum augue, a ornare elit.\n"
+ "\n"
+ "Nulla viverra blandit neque. Nam blandit varius efficitur. Nunc at sapien blandit, malesuada lectus vel, tincidunt orci. Proin blandit metus eget libero facilisis interdum. Aenean luctus scelerisque orci, at scelerisque sem vestibulum in. Nullam ornare massa sed dui efficitur, eget volutpat lectus elementum. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Integer elementum mi vitae erat eleifend iaculis. Nullam eget tincidunt est, eget tempor est. Sed risus velit, iaculis vitae est in, volutpat consectetur odio. Aenean ut fringilla elit. Suspendisse non aliquet massa. Curabitur suscipit metus nunc, nec porttitor velit venenatis vel. Fusce vestibulum eleifend diam, lobortis auctor magna.\n"
+ "\n"
+ "Etiam maximus, mi feugiat pharetra mattis, nulla neque euismod metus, in congue nunc sem nec ligula. Curabitur aliquam, risus id convallis cursus, nunc orci sollicitudin enim, quis scelerisque nibh dui in ipsum. Suspendisse mollis, metus a dapibus scelerisque, sapien nulla pretium ipsum, non finibus sem orci et lectus. Aliquam dictum magna nisi, a consectetur urna euismod nec. In pulvinar facilisis nulla, id mollis libero pulvinar vel. Nam a commodo leo, eu commodo dolor. In hac habitasse platea dictumst. Curabitur auctor purus quis tortor laoreet efficitur. Quisque tincidunt, risus vel rutrum fermentum, libero urna dignissim augue, eget pulvinar nibh ligula ut tortor. Vivamus convallis non risus sed consectetur. Etiam accumsan enim ac nisl suscipit, vel congue lorem volutpat. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce non orci quis lacus rhoncus vestibulum nec ut magna. In varius lectus nec quam posuere finibus. Vivamus quis lectus vitae tortor sollicitudin fermentum.\n"
+ "\n"
+ "Pellentesque elementum vehicula egestas. Sed volutpat velit arcu, at imperdiet sapien consectetur facilisis. Suspendisse porttitor tincidunt interdum. Morbi gravida faucibus tortor, ut rutrum magna tincidunt a. Morbi eu nisi eget dui finibus hendrerit sit amet in augue. Aenean imperdiet lacus enim, a volutpat nulla placerat at. Suspendisse nibh ipsum, venenatis vel maximus ut, fringilla nec felis. Sed risus mi, egestas quis quam ullamcorper, pharetra vestibulum diam.\n"
+ "\n"
+ "Praesent finibus scelerisque elit, accumsan condimentum risus mattis vitae. Donec tristique hendrerit facilisis. Curabitur metus purus, venenatis non elementum id, finibus eu augue. Quisque posuere rhoncus ligula, et vehicula erat pulvinar at. Pellentesque vel quam vel lectus tincidunt congue quis id sapien. Ut efficitur mauris vitae pretium iaculis. Aliquam consectetur iaculis nisi vitae laoreet. Integer vel odio quis diam mattis tempor eget nec est. Donec iaculis facilisis neque, at dictum magna vestibulum ut. Sed malesuada non nunc ac consequat. Maecenas tempus lectus a nisl congue, ac venenatis diam viverra. Nam ac justo id nulla iaculis lobortis in eu ligula. Vivamus et ligula id sapien efficitur aliquet. Curabitur est justo, tempus vitae mollis quis, tincidunt vitae felis. Vestibulum molestie laoreet justo, nec mollis purus vulputate at.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public GcsAvroWriter(GcsDestinationConfig config,
objectKey);

this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater);
this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public GcsCsvWriter(GcsDestinationConfig config,
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
objectKey);

this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public GcsJsonlWriter(GcsDestinationConfig config,

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);

this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());

// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
this.printWriter = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@
}
}
]
},
etsybaev marked this conversation as resolved.
Show resolved Hide resolved
"part_size_mb": {
"title": "Block Size (MB) for GCS multipart upload",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"examples": [5]
}
}
},
Expand All @@ -247,6 +254,13 @@
"description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.",
"default": "No flattening",
"enum": ["No flattening", "Root level flattening"]
},
"part_size_mb": {
"title": "Block Size (MB) for GCS multipart upload",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"examples": [5]
}
}
},
Expand All @@ -258,6 +272,13 @@
"type": "string",
"enum": ["JSONL"],
"default": "JSONL"
},
"part_size_mb": {
"title": "Block Size (MB) for GCS multipart upload",
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
"type": "integer",
"default": 5,
"examples": [5]
}
}
},
Expand Down
Loading