Skip to content

Commit

Permalink
馃悶 S3 / GCS staging: use correct staging filename (#11768)
Browse files Browse the repository at this point in the history
* Return uploaded filename in staging

* Update variable names

* Parse new filename from full object key

* Add back sentry tracing
  • Loading branch information
tuliren committed Apr 6, 2022
1 parent aa0ef1e commit d778ac7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
Expand Down Expand Up @@ -97,36 +98,38 @@ public void createBucketObjectIfNotExists(final String objectPath) {
}

@Override
public String uploadRecordsToBucket(final SerializableBuffer recordsData, final String namespace, final String streamName, final String objectPath)
throws Exception {
public String uploadRecordsToBucket(final SerializableBuffer recordsData,
final String namespace,
final String streamName,
final String objectPath) {
final List<Exception> exceptionsThrown = new ArrayList<>();
boolean succeeded = false;
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT && !succeeded) {
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
if (exceptionsThrown.size() > 0) {
LOGGER.info("Retrying to upload records into storage {} ({}/{}})", objectPath, exceptionsThrown.size(), UPLOAD_RETRY_LIMIT);
// Force a reconnection before retrying in case error was due to network issues...
s3Client = s3Config.resetS3Client();
}

try {
loadDataIntoBucket(objectPath, recordsData);
succeeded = true;
return loadDataIntoBucket(objectPath, recordsData);
} catch (final Exception e) {
LOGGER.error("Failed to upload records into storage {}", objectPath, e);
exceptionsThrown.add(e);
}
if (!succeeded) {
LOGGER.info("Retrying to upload records into storage {} ({}/{}})", objectPath, exceptionsThrown.size(), UPLOAD_RETRY_LIMIT);
// Force a reconnection before retrying in case error was due to network issues...
s3Client = s3Config.resetS3Client();
}
}
if (!succeeded) {
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}
return recordsData.getFilename();
throw new RuntimeException(String.format("Exceptions thrown while uploading records into storage: %s", Strings.join(exceptionsThrown, "\n")));
}

private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
/**
* Upload the file from {@code recordsData} to S3 and simplify the filename as <partId>.<extension>.
* @return the uploaded filename, which is different from the serialized buffer filename
*/
private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE;
final String bucket = s3Config.getBucketName();
final String objectKeyWithPartId = String.format("%s%s%s", objectPath, getPartId(objectPath), getExtension(recordsData.getFilename()));
final String fullObjectKey = objectPath + getPartId(objectPath) + getExtension(recordsData.getFilename());
final StreamTransferManager uploadManager = StreamTransferManagerHelper
.getDefault(bucket, objectKeyWithPartId, s3Client, partSize)
.getDefault(bucket, fullObjectKey, s3Client, partSize)
.checkIntegrity(true)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
Expand All @@ -145,10 +148,18 @@ private void loadDataIntoBucket(final String objectPath, final SerializableBuffe
uploadManager.complete();
}
}
if (!s3Client.doesObjectExist(bucket, objectKeyWithPartId)) {
LOGGER.error("Failed to upload data into storage, object {} not found", objectKeyWithPartId);
if (!s3Client.doesObjectExist(bucket, fullObjectKey)) {
LOGGER.error("Failed to upload data into storage, object {} not found", fullObjectKey);
throw new RuntimeException("Upload failed");
}
final String newFilename = getFilename(fullObjectKey);
LOGGER.info("Uploaded buffer file to storage: {} -> {} (filename: {})", recordsData.getFilename(), fullObjectKey, newFilename);
return newFilename;
}

@VisibleForTesting
static String getFilename(final String fullPath) {
return fullPath.substring(fullPath.lastIndexOf("/") + 1);
}

protected static String getExtension(final String filename) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,12 @@ void testCleanUpBucketObject() {
assertEquals(OBJECT_TO_DELETE, deleteRequest.getValue().getKeys().get(0).getKey());
}

@Test
void testGetFilename() {
assertEquals("filename", S3StorageOperations.getFilename("filename"));
assertEquals("filename", S3StorageOperations.getFilename("/filename"));
assertEquals("filename", S3StorageOperations.getFilename("/p1/p2/filename"));
assertEquals("filename.csv", S3StorageOperations.getFilename("/p1/p2/filename.csv"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ public String uploadRecordsToStage(final JdbcDatabase database,
final SerializableBuffer recordsData,
final String schemaName,
final String stageName,
final String stagingPath)
throws Exception {
AirbyteSentry.executeWithTracing("UploadRecordsToStage",
() -> s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath),
final String stagingPath) {
return AirbyteSentry.queryWithTracing("UploadRecordsToStage", () ->
s3StorageOperations.uploadRecordsToBucket(recordsData, schemaName, stageName, stagingPath),
Map.of("stage", stageName, "path", stagingPath));
return recordsData.getFilename();
}

@Override
Expand Down

0 comments on commit d778ac7

Please sign in to comment.