Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector #1

Merged

Conversation

nirtsruya
Copy link
Contributor

Move flink-connector-aws-connector from the flink respository

* {@link DynamoDbWriteRequest} that may be persisted.
*/
@Internal
public class DynamoDbSinkElementConverter<InputT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have created wrappers around the AWS SDK DynamoDB model within the sink. Can you please explain your reasoning behind this approach? The idea behind hiding the ElementConverter is to 1/ hide the internal implementation and 2/ enable sink to manage de/serialisation. However I am struggling to see the benefits here since the 2x models are very similar. If we will copy the underlying model it might be best to use that model, since users might already be creating these objects.

Did you consider using a more generic ElementConverter, like the DynamoDBMapper? I am not familiar with this so not sure if there are performance/feature limitations

Copy link
Contributor

@YuriGusev YuriGusev Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a discussion earlier on that matter here

If we go for hiding the converter we do not leak underlying dynamodb sdk classes to the user, but we have to create a wrapper around their model. This means if the dynamodb client changes over time we will have to propagate model changes too.

I think it may be better to allow the user to define the element converter and not do any translation ourselves (like we had it before this PR: YuriGusev/flink@de83346#diff-1c18ee5b3c5b4d827778ffdf4e5d2219dbcb02fc45be0d3665357633a134995c).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we revert the change, we must ensure that if the dynamo client classes change over time we update our de/serialiser. Otherwise we might get in the situation where user ElementConverter generates classes that are not supported by our serialiser. The impact would be that we lose/break attributes when restoring from checkpoints. We can mitigate this by calling it out in the docs/PR template as something to check when updating the SDK.

I think it may be better to allow the user to define the element converter and not do any translation ourselves (like we had it before this PR

Given we cannot escape these wrappers, I agree with this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that having a wrapper provides limited benefits in this case and that it makes more sense to expose the DynamoDB SDK classes directly.

To mitigate the risk of not updating the de/serializers when doing a SDK update, it would be great if we could find a way to programmatically verify during compile time that the SDK is still compatible with the de/serializers. But I'm not sure how much work that would required and if it's worth the effort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sthm good comment
If I understand correcty, we are concerned that the sink uses version x of dynamodb sdk, while the user is using version y, and the versions are incompatible for some reason? I think during compile time it would be hard to detect, as this is somewhat a dependency management issue - unless we shade the aws sdk and only allow to pass these shaded objects as an input?
Otherwise, do we think an error during sink startup makes sense that the sdk classes used are incompatible with the one used in the sink due to serialization issues? similar to how flink fails if an object that is passed to the operators is not serializable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hlteoh37 / @YuriGusev / @nirtsruya there has been discussed regarding support for Conditional Writes and non-batch support. I note that the batch API uses PutRequest and non-batch uses PutItemRequest. They both internally use the same Map<String, AttributeValue>, but the PutItemRequest adds additional fields.

My concern is that we would not be able to toggle batch mode without implementing a new connector, since the generic RequestItem type will change. Therefore, maybe we need some middle ground here. We could use a POJO that contains an Map<String, AttributeValue> that we can transform to either PutRequest or PutItemRequest. Then we can evolve this element converter to transparently support either API.

To summarise, I am considering if we actually need a RequestEntryT to be something like:

interface DynamoDbWriteRequest<T> {

    Map<String, AttributeValue> getItems(T record);

    // And some other method to decide if it is put or delete
}

Then later we can support for getConditionalExpression()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycranmer makes sense, I can go ahead with implementing that if you agree @YuriGusev
regarding @sthm comment on serialization, do we want to shade the dynamodb sdk? is it something the community was considering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather avoid shading the SDK. It means that end users cannot update the SDK without building from source, this has caused issues before. For example, there is no way to add support for new AWS regions for older versions of the connector without building from source. On top of this, if you include multiple connectors, you end up with multiple versions of the SDK bloating the size of the jar.

We can think about a build time check as a follow up https://issues.apache.org/jira/browse/FLINK-29688

flink-connector-aws-dynamodb/pom.xml Outdated Show resolved Hide resolved
flink-connector-aws-dynamodb/pom.xml Outdated Show resolved Hide resolved
pom.xml Outdated Show resolved Hide resolved
pom.xml Outdated Show resolved Hide resolved
pom.xml Outdated Show resolved Hide resolved
@dannycranmer
Copy link
Contributor

Apologies for the delay, I have left some comments. As commented in the main pom I will open a PR to contribute the general repo files so we can decouple from this contribution.

@dannycranmer
Copy link
Contributor

Please rebase on main after this is merged #2

@YuriGusev
Copy link
Contributor

YuriGusev commented Sep 13, 2022

Thanks for the review @dannycranmer, I'm currently away from home, but I hope to be back and start working on the fixes next week

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some general callouts on docs and tests

}
}

LOG.warn("DynamoDB Sink failed to persist {} entries", unprocessed.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might help to also mention these entries will be retried. (same below for handleFullyFailedRequest()

.witItem("tableOne", ImmutableMap.of("pk", "2", "sk", "1"))
.witItem("tableTwo", ImmutableMap.of("pk", "1", "sk", "1"))
.withDescription(
"requests should not be deduplecated if belong to different tables")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"requests should not be deduplecated if belong to different tables")
"requests should not be deduplicated if belong to different tables")

}

@Test
public void testPartitionKeysOfTwoDifferentRequestsEqual() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test the case where config is null?

getAdvancedOption(configuration, SdkAdvancedClientOption.USER_AGENT_PREFIX))
.isEqualTo(AWSDynamoDbUtil.getFlinkUserAgentPrefix(properties));

Assertions.assertThat(configuration.retryPolicy().isPresent()).isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For a better message when tests fail

Suggested change
Assertions.assertThat(configuration.retryPolicy().isPresent()).isTrue();
Assertions.assertThat(configuration.retryPolicy()).isEmpty();

extends AsyncSinkBaseBuilder<InputT, DynamoDbWriteRequest, DynamoDbSinkBuilder<InputT>> {

private static final int DEFAULT_MAX_BATCH_SIZE = 25;
private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am thinking how this will work for users if there are multiple inflight requests with records that have the same PK/SK.

  • Seems like it will be a race condition where the first request to complete will take precedence
  • If there are failures/partial failures in a batch, the "latest" record in the stream might no longer take precedence. That said, this situation is more likely to occur if there are more records with the same PK/SK, which helps us, because that means eventual consistency will be attained quickly.

In the scenario above, it seems setting MAX_IN_FLIGHT_REQUESTS to 1 would be the main mitigation. I wonder if we should set this as default, or at least call this out to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, I was thinking about it, I think we should definately add a comment about this.
We need to be specific here, as if we are using parallelism, MAX_IN_FLIGHT_REQUESTS alone will not help and we need to ensure key and ordering from the source as well
But in general, I think we should document that the sink has this "problem"
Maybe in the future we can allow to use single PutItem requests and allow expressions as you mentioned in another comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right. Users will also need to ensure that a keyBy is done before the sink such that all records with same SK/PK goes to the same subtask.

Yep, agreed that we can set the default, and create a JIRA for using single PutItem requests with conditional expressions in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking, that if we want to really support that we probably have to remove that deduplication that we are doing in case we find a duplicate key, and instead "split" the batch, to make sure we do not lose records with the same primary key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YuriGusev I think it makes sense, what do you think? should we split batches instead of deduplication?

Copy link
Contributor

@YuriGusev YuriGusev Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

Duplicated keys only matter when they get to the same batch request, then dynamodb service will fail the request. I took a simple approach to have accumulator to deduplicate entries based on PK/SK in the same request. E.g. if we had 25 entries and 1 entry has duplicated PK/SK we would then write 24 entries in the request.

But if two entries with the same PK/SK get to two different in-flight batch request this is not a problem as deduplication then happens on DynamoDB side.

If the order of the duplicate/update entries matters I think batch sink can not be used anyway, as the order is mixed up by partial write failures/full retries and parallelism in the sink. Then it is up to user to dedup before the sink.

I may be misunderstood the question. :)

Copy link
Contributor

@hlteoh37 hlteoh37 Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking, that if we want to really support that we probably have to remove that deduplication that we are doing in case we find a duplicate key, and instead "split" the batch, to make sure we do not lose records with the same primary key

IMO deduplication within a batch on the Flink DDB sink side is a useful feature, because it reduces writes to DDB. The current deduplication ensures the newest record takes precedence, which is the behaviour we see on consecutive BatchWriteItem requests.

The difference I can see here is that the ChangeDataCapture stream off the DDB table will be different depending on whether deduping happens in Flink DDB Sink. This might be possible if the user wants to write to DDB but also track number of changes in the stream. This sounds like a relatively niche use case though, since this use case is more suited to a Streaming Destination like Kinesis.

So, I think it makes sense to keep the deduping. @nirtsruya do you have a particular use case where splitting into batch would be preferable?

But if two entries with the same PK/SK get to two different in-flight batch request this is not a problem as deduplication then happens on DynamoDB side.
If the order of the duplicate/update entries matters I think batch sink can not be used anyway, as the order is mixed up by partial write failures/full retries and parallelism in the sink. Then it is up to user to dedup before the sink.

As far as I can tell, the second request in DDB will take precedence and overwrite the first request. I can see Batching being very useful for Batch jobs where deduping already occured, and we just want to dump a large amt of data to DDB.

For streaming use case, this situation is mitigated if user sets the MAX_IN_FLIGHT_REQUESTS to 1, because the AsyncSinkWriter will put any failed items from requests to the front of the queue, so the "older" items will be retried first.

In the future, I was thinking we can implement an improvement to use PutItem API, and the DDB Sink can add an additional key-value with eventTime as the value. Whenever an item with older event time attempts to be written, we can just drop the older record.

Copy link
Contributor

@YuriGusev YuriGusev Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we could provide a setting to turn off deduplication on the sink side. That would be on-pair with boto3 clients for dynamodb (overwrite_by_pkeys=['partition_key', 'sort_key'] config). Wdyt?

In the future, I was thinking we can implement an improvement to use PutItem API, and the DDB Sink can add an additional key-value with eventTime as the value. Whenever an item with older event time attempts to be written, we can just drop the older record.

Yes I agree that would be a nice feature to add. We actually had it planned for the first sink version (not based on AsyncSyncBase). There was interface that allows different types of producers (batching/non-batching) to be able to extend for this use case in the future. We can introduce something similar here and plan for adding this feature. For non-batching sink we could also support conditional expressions for individual PutItem/DeleteItem requests (as you mentioned below)

Copy link
Contributor

@hlteoh37 hlteoh37 Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we could provide a setting to turn off deduplication on the sink side

Currently we can do this via tableConfig, isn't it? Or do you think a separate config will be better? But I agree - providing a setting to toggle deduplication is a good idea

Copy link
Contributor

@YuriGusev YuriGusev Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is right if there is no configuration it won't deduplicate anything. It's been a while I forgot I added test for this :)

@hlteoh37
Copy link
Contributor

Question for general discussion:
Do we have any thoughts on using conditional expressions? I note that the BatchWriteItem API we are using doesn't support this. Maybe we can create a followup JIRA to track this feature.

@YuriGusev YuriGusev force-pushed the FLINK-24229-connector-aws-dynamodb branch from fd28524 to b38aa65 Compare October 17, 2022 18:44
Comment on lines 47 to 72
public enum BackoffStrategy {

/**
* Backoff strategy that uses a full jitter strategy for computing the next backoff delay. A
* full jitter strategy will always compute a new random delay between and the computed
* exponential backoff for each subsequent request. See example: {@link
* FullJitterBackoffStrategy}
*/
FULL_JITTER,

/**
* Backoff strategy that uses equal jitter for computing the delay before the next retry. An
* equal jitter backoff strategy will first compute an exponential delay based on the
* current number of retries, base delay and max delay. The final computed delay before the
* next retry will keep half of this computed delay plus a random delay computed as a random
* number between 0 and half of the exponential delay plus one. See example: {@link
* EqualJitterBackoffStrategy}
*/
EQUAL_JITTER,

/**
* Simple backoff strategy that always uses a fixed delay for the delay * before the next
* retry attempt. See example: {@link FixedDelayBackoffStrategy}
*/
FIXED_DELAY
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up, we should generalise and pull this config into aws-base. Just a callout, nothing to do now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A DynamoDB Sink that performs async requests against a destination stream using the buffering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A DynamoDB Sink that performs async requests against a destination stream using the buffering
* A DynamoDB Sink that performs async requests against a destination table using the buffering

@Override
public StatefulSinkWriter<InputT, BufferedRequestState<DynamoDbWriteRequest>> createWriter(
InitContext context) throws IOException {
return new DynamoDbSinkWriter<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can simplify with:

return restoreWriter(context, Collections.emptyList());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats neat. Thanks fixed.

Comment on lines 92 to 95
/**
* @param elementConverter the {@link ElementConverter} to be used for the sink
* @return {@link DynamoDbSinkBuilder} itself
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this comment does not add anything above the method signature, consider removing, as per the coding guidelines

@dannycranmer
Copy link
Contributor

Question for general discussion: Do we have any thoughts on using conditional expressions? I note that the BatchWriteItem API we are using doesn't support this. Maybe we can create a followup JIRA to track this feature.

I had considered the same. I see that conditional write is supported on the PutItem, but not the Batch api. Based on this, and the discussion above about supporting non-batch mode, I would vote to proceed without it, and add later.

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good.

Comment on lines 113 to 116
* @param overwriteByPKeys provide partition key and (optionally) sort key name if you want to
* bypass no duplication limitation of single batch write request. Batching DynamoDB sink
* will drop request items in the buffer if their primary keys(composite) values are the
* same as newly added one.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and (optionally) sort key name

It is not clear from this comment or the method signature how I provide a sort key. If it is element 0 and 1, could we split to individual fields instead?

Copy link
Contributor

@YuriGusev YuriGusev Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had it as a separate DTO, I changed it here. Order/number of keys doesn't matter for the deduplication logic. I can fix the comment to make it more clear, or return previous approach (or to have two individual fields as well).

I changed this is to match similar approach in the boto3 dynamodb batch client.

Otherwise I'm fine to have two fields, however we would need to keep nullable field in the builder for sort key and pass both fields down the tree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycranmer do you think it is ok to keep this way or you would like to change the approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to use the list approach. It's more flexible, and given its similar in boto

Comment on lines 47 to 98
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DynamoDbWriteRequest that = (DynamoDbWriteRequest) o;
return Objects.equals(writeRequest, that.writeRequest);
}

@Override
public int hashCode() {
return Objects.hash(writeRequest);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this class to remove later when we migrate to the interface approach (@nirtsruya) is working on that

@YuriGusev YuriGusev force-pushed the FLINK-24229-connector-aws-dynamodb branch from e3318ce to ce7f45d Compare October 20, 2022 17:10
* AWS_SECRET_ACCESS_KEY} through environment variables etc.
*/
@Internal
class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, WriteRequest> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirtsruya sorry I might not have explained my idea well. I actually meant that here we would use your new type:

  • class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRequest> {

Now we can modify DynamoDbWriteRequest later to support the conditional fields for non-batch mode. Inside submitRequestEntries we would convert DynamoDbWriteRequest into WriteRequest for batch or PutItemRequest for non batch. We would need to also convert WriteRequest back into DynamoDbWriteRequest for failed records (need to check we have all the required info).

There will be a slight performance hit when we retry, as records are converted multiple times. But this allows us to evolve the connector to support batch/non-batch without having to duplicate all the classes as you explained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense,
I have one comment on the DynamoDbWriteRequest - it seems like a DTO, do we really need to make it an interface?
would help when deserializing for example to have the same concrete class maybe for de/serialization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one comment on the DynamoDbWriteRequest - it seems like a DTO, do we really need to make it an interface?

Yes I agree that an interface does not make sense here. Class is fine.

Comment on lines 37 to 55
* private static class DummyDynamoDbElementConverter implements ElementConverter<String, DynamoDbWriteRequest> {
*
* @Override
* public DynamoDbWriteRequest apply(String s) {
* final Map<String, AttributeValue> item = new HashMap<>();
* item.put("your-key", AttributeValue.builder().s(s).build());
* return new DynamoDbWriteRequest(
* WriteRequest.builder()
* .putRequest(PutRequest.builder()
* .item(item)
* .build())
* .build()
* );
* }
* }
* DynamoDbSink<String> dynamoDbSink = DynamoDbSink.<String>builder()
* .setElementConverter(new DummyDynamoDbElementConverter())
* .setDestinationTableName("your-table-name")
* .build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is now outdated

* <li>{@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000}
* <li>{@code failOnError} will be false
* <li>{@code destinationTableName} destination table for the sink
* <li>{@code overwriteByPKeys} will be empty meaning no records deduplication will be performed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is outdated

private final boolean failOnError;
private final String tableName;

private List<String> overwriteByPKeys;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update name to overwriteByPartitionKeys to match the other classes?

Copy link
Contributor

@YuriGusev YuriGusev Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry missed this

DeleteRequest.builder().key(dynamoDbWriteRequest.getItem()).build())
.build();
} else {
throw new IllegalArgumentException("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a message to the exception? Given this should not happen we can quite explicitly say the convertToWriteRequest method must need updating

.setType(DynamoDbWriteRequestType.DELETE)
.build();
} else {
throw new IllegalArgumentException("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


private final Map<String, WriteRequest> container;

private List<String> overwriteByPKeys;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update field name to match the other one overwriteByPartitionKeys

Comment on lines 114 to 217
TableRequestsContainer container = new TableRequestsContainer(overwriteByPKeys);
for (DynamoDbWriteRequest requestEntry : requestEntries) {
container.put(convertToWriteRequest(requestEntry));
}

CompletableFuture<BatchWriteItemResponse> future =
client.batchWriteItem(
BatchWriteItemRequest.builder()
.requestItems(
ImmutableMap.of(tableName, container.getRequestItems()))
.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an optimization, we could completely skip PrimaryKeyBuilder.build when CollectionUtil.isNullOrEmpty(overwriteByPKeys). This would remove the UUID generation and map overhead. Something like this:

        final List<WriteRequest> writeRequests;
        
        if (CollectionUtil.isNullOrEmpty(overwriteByPKeys)) {
            writeRequests = new ArrayList<>();
            for (DynamoDbWriteRequest requestEntry : requestEntries) {
                writeRequests.put(convertToWriteRequest(requestEntry));
            }
        } else {
            TableRequestsContainer container = new TableRequestsContainer(overwriteByPKeys);
            for (DynamoDbWriteRequest requestEntry : requestEntries) {
                container.put(convertToWriteRequest(requestEntry));
            }

            writeRequests = container.getRequestItems();
        }

        CompletableFuture<BatchWriteItemResponse> future =
                client.batchWriteItem(
                        BatchWriteItemRequest.builder()
                                .requestItems(ImmutableMap.of(tableName, writeRequests))
                                .build());

Comment on lines 32 to 74
public enum DynamoDbWriteRequestType {
PUT,
DELETE,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth trying use a more general Flink type to represent this, something like https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java?

Maybe @MartijnVisser would be able to help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycranmer I like using the byte representation for de/serialization. will also apply it for DynamoDbType

@YuriGusev YuriGusev force-pushed the FLINK-24229-connector-aws-dynamodb branch 2 times, most recently from 7ba4e4f to 2d1c15a Compare October 29, 2022 23:36
Comment on lines 64 to 69
* <li>{@code dynamoDbTablesConfig}: if provided for the table, the DynamoDb sink will attempt to
* deduplicate records with the same primary and/or secondary keys in the same batch request.
* Only the latest record with the same combination of key attributes is preserved in the
* request.
* </ul>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is out of sync with the recent changes to the class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and other javadocs also updated

Comment on lines 160 to 166
Set<String> stringSet = new LinkedHashSet<>(stringSetSize);
for (int i = 0; i < stringSetSize; i++) {
stringSet.add(in.readUTF());
}
return AttributeValue.builder().ss(stringSet).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a Set here? Does this mean this is a non-symmetric transform? We could hit a situation where:
!originalObject.equals(deserialize(serialize(originalObject)))

If so, I think we should not be deduplicating here and the transform should be symmetrical

protected long getSizeInBytes(DynamoDbWriteRequest requestEntry) {
// dynamodb calculates item size as a sum of all attributes and all values, but doing so on
// every operation may be too expensive, so this is just an estimate
return requestEntry.getItem().toString().getBytes(StandardCharsets.UTF_8).length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, it is not practical to compute the sizeInBytes given that:

  • We do not know how the client serializes the request payload
  • This will be an expensive operation and we would potentially be serializing each record twice

We agree to:

  • Return a dummy constant value here
  • Follow up with an improvement to make this flush strategy and validation optional (FLINK-29854)

The failure modes are already covered:

  • Total batch size too large: this will be handled organically by AIMD
  • Single DynamoDbWriteRequest too large: if we try to send a single request that is too large it will result in a fatal validation exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed. I set to 0 and updated builder class to raise exceptions if corresponding parameters were used

Comment on lines +133 to +143
@Override
public DynamoDbSinkBuilder<InputT> setMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
throw new InvalidConfigurationException(
"Max batch size in bytes is not supported by the DynamoDB sink implementation.");
}

@Override
public DynamoDbSinkBuilder<InputT> setMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
throw new InvalidConfigurationException(
"Max record size in bytes is not supported by the DynamoDB sink implementation.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this

Comment on lines 21 to 25
/** Exception is thrown when DynamoDb sink failed to write data. */
public class DynamoDbSinkException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @PublicEvolving

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the annotations were recently removed, why was this? 598373a

Comment on lines 21 to 22
/** Exception is thrown if a DynamoDB configuration was invalid. */
public class InvalidConfigurationException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @PublicEvolving

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not a subtype of DynamoDbSinkException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the annotations were recently removed, why was this? 598373a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see my answer below. Fixed this now

Comment on lines 21 to 22
/** Exception is thrown if a DynamoDB request was invalid. */
public class InvalidRequestException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @PublicEvolving

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not a sub type of DynamoDbSinkException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the annotations were recently removed, why was this? 598373a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I first started adding them for other exceptions that we expose now, and then I saw there are no PublicEvolving annotations for exceptions in the kinesis sink. Wasn’t sure if thats right , but decided to do similarly to existing code. I'll add them back now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this now

@YuriGusev YuriGusev force-pushed the FLINK-24229-connector-aws-dynamodb branch 3 times, most recently from c67f439 to 7a76f7b Compare November 4, 2022 20:01
Co-authored-by: Yuri Gusev <yura.gusev@gmail.com>
@dannycranmer
Copy link
Contributor

As discussed offline. The build takes ~9 minutes to run due to excessive use of integration tests. We will follow up to migrate these tests to unit tests, and generally improve coverage ahead of the release:

@YuriGusev YuriGusev force-pushed the FLINK-24229-connector-aws-dynamodb branch from 7a76f7b to ec7f1e8 Compare November 4, 2022 20:14
@dannycranmer
Copy link
Contributor

dannycranmer commented Nov 4, 2022

Seems like the checks are broken, raised a Jira to fix:

LGTM, running tests locally and waiting for @hlteoh37 to approve before merging

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Flink : Connectors : Amazon DynamoDB Parent 1.0.0-SNAPSHOT:
[INFO]
[INFO] Flink : Connectors : Amazon DynamoDB Parent ........ SUCCESS [  2.091 s]
[INFO] Flink : Connectors : Amazon DynamoDB ............... SUCCESS [09:13 min]
[INFO] Flink : Connectors : SQL : Amazon DynamoDB ......... SUCCESS [  7.273 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

@hlteoh37
Copy link
Contributor

hlteoh37 commented Nov 4, 2022

LGTM

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved

@dannycranmer dannycranmer merged commit 11bcbf9 into apache:main Nov 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants