-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 fix s3/gcs bucket cleanup #11728
🐛 fix s3/gcs bucket cleanup #11728
Conversation
@@ -29,9 +28,11 @@ | |||
/** | |||
* Remove files that were just stored in the bucket | |||
*/ | |||
void cleanUpBucketObject(String streamName, List<String> stagedFiles) throws Exception; | |||
void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a better name
@@ -49,15 +49,18 @@ | |||
LOGGER.info("S3 format config: {}", formatConfig.toString()); | |||
switch (formatConfig.getFormat()) { | |||
case AVRO -> { | |||
final Callable<BufferStorage> createStorageFunctionWithExtension = () -> createStorageFunctionWithoutExtension.apply(AvroSerializedBuffer.DEFAULT_SUFFIX); | |||
final Callable<BufferStorage> createStorageFunctionWithExtension = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a different line width?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change was done by ./gradlew format
(i wasn't editing this file in my IDE. btw my IDE is set to 150, and this line seems to be longer than 150)
...tination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java
Show resolved
Hide resolved
|
||
@Test | ||
void testCleanUpBucketObject() { | ||
final String pathFormat = S3DestinationConstants.DEFAULT_PATH_FORMAT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test have other objects in the bucket? should it do a list after to make sure only the relevant objects are deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a unit test using a mock object instead of a real s3 client / API
Maybe the acceptance tests should also check this though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I think it should to prevent this from happening again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/test connector=connectors/destination-gcs
|
/test connector=connectors/destination-s3
|
airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java
Outdated
Show resolved
Hide resolved
I confirmed with manual testing that two different connections writing to the same bucket path won't be conflicting with each other in overwrite mode anymore with this fix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/publish connector=connectors/destination-s3
|
/publish connector=connectors/destination-gcs
|
/publish connector=connectors/destination-gcs
|
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) { | ||
for (final AirbyteMessage message : messages) { | ||
if (message.getType() == Type.RECORD) { | ||
final var iterator = message.getRecord().getData().fieldNames(); | ||
while (iterator.hasNext()) { | ||
final var fieldName = iterator.next(); | ||
if (message.getRecord().getData().get(fieldName).isContainerNode()) { | ||
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> { | ||
final var data = message.getRecord().getData().get(fieldName).get(f); | ||
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f, | ||
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\"")); | ||
try { | ||
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData)); | ||
} catch (final JsonProcessingException e) { | ||
e.printStackTrace(); | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seemed to be missing from #9816 for these tests to pass...
Same thing in all the other acceptanceTest files from this PR
What
When cleaning up a bucket in overwrite sync mode, the destination might drop too many files from the bucket path if other streams are being written to the same bucket path.
How
Restrict clean up to objects matching the custom s3 path format string instead
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.