Skip to content

Commit

Permalink
make message thread safe (#1870)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Jun 12, 2024
1 parent 5587796 commit b594e7e
Showing 1 changed file with 63 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,35 @@

import static java.util.stream.Collectors.toList;

/**
* Implementation note: this class is partially mutable and may be accessed from multiple
* threads involved in message lifecycle, it must be thread safe.
*/
public class Message implements FilterableMessage {

private String id;
private PartitionOffset partitionOffset;
private final String id;
private final PartitionOffset partitionOffset;

private String topic;
private String subscription;
private boolean hasSubscriptionIdentityHeaders;
private ContentType contentType;
private Optional<CompiledSchema<Schema>> schema;
private final String topic;
private final String subscription;
private final boolean hasSubscriptionIdentityHeaders;
private final ContentType contentType;
private final Optional<CompiledSchema<Schema>> schema;

private long publishingTimestamp;
private long readingTimestamp;
private byte[] data;
private final long publishingTimestamp;
private final long readingTimestamp;
private final byte[] data;

private int retryCounter = 0;
private long partitionAssignmentTerm = -1;
private Map<String, String> externalMetadata = Collections.emptyMap();
private final long partitionAssignmentTerm;
private final Map<String, String> externalMetadata;

private List<Header> additionalHeaders = Collections.emptyList();
private final List<Header> additionalHeaders;

private Set<String> succeededUris = Sets.newHashSet();
private final Set<String> succeededUris = Sets.newHashSet();

private long currentMessageBackoff = -1;

private Message() {}

public Message(String id,
String topic,
byte[] content,
Expand Down Expand Up @@ -118,12 +120,12 @@ public boolean isTtlExceeded(long ttlMillis) {
return currentTimestamp > readingTimestamp + ttlMillis;
}

public void incrementRetryCounter(Collection<URI> succeededUris) {
public synchronized void incrementRetryCounter(Collection<URI> succeededUris) {
this.retryCounter++;
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).collect(toList()));
}

public int getRetryCounter() {
public synchronized int getRetryCounter() {
return retryCounter;
}

Expand All @@ -138,14 +140,14 @@ public String getId() {

@Override
public Map<String, String> getExternalMetadata() {
return Collections.unmodifiableMap(externalMetadata);
return externalMetadata;
}

public List<Header> getAdditionalHeaders() {
return Collections.unmodifiableList(additionalHeaders);
return additionalHeaders;
}

public long updateAndGetCurrentMessageBackoff(SubscriptionPolicy subscriptionPolicy) {
public synchronized long updateAndGetCurrentMessageBackoff(SubscriptionPolicy subscriptionPolicy) {
if (currentMessageBackoff == -1) {
currentMessageBackoff = subscriptionPolicy.getMessageBackoff();
} else {
Expand Down Expand Up @@ -184,7 +186,7 @@ public PartitionOffset getPartitionOffset() {
return partitionOffset;
}

public boolean hasNotBeenSentTo(String uri) {
public synchronized boolean hasNotBeenSentTo(String uri) {
return !succeededUris.contains(uri);
}

Expand All @@ -201,64 +203,80 @@ public String getSubscription() {
}

public static class Builder {
private String id;
private PartitionOffset partitionOffset;

private String topic;
private String subscription;
private boolean hasSubscriptionIdentityHeaders;
private ContentType contentType;
private Optional<CompiledSchema<Schema>> schema;

private long publishingTimestamp;
private long readingTimestamp;
private byte[] data;

private long partitionAssignmentTerm = -1;
private Map<String, String> externalMetadata = Collections.emptyMap();

private final Message message;
private List<Header> additionalHeaders = Collections.emptyList();

public Builder() {
message = new Message();
}

public Builder fromMessage(Message message) {
this.message.id = message.getId();
this.message.data = message.getData();
this.message.contentType = message.getContentType();
this.message.topic = message.getTopic();
this.message.subscription = message.getSubscription();
this.message.hasSubscriptionIdentityHeaders = message.hasSubscriptionIdentityHeaders();
this.message.publishingTimestamp = message.getPublishingTimestamp();
this.message.readingTimestamp = message.getReadingTimestamp();
this.message.partitionOffset = message.partitionOffset;
this.message.partitionAssignmentTerm = message.partitionAssignmentTerm;
this.message.externalMetadata = message.getExternalMetadata();
this.message.additionalHeaders = message.getAdditionalHeaders();
this.message.schema = message.getSchema();
this.id = message.getId();
this.data = message.getData();
this.contentType = message.getContentType();
this.topic = message.getTopic();
this.subscription = message.getSubscription();
this.hasSubscriptionIdentityHeaders = message.hasSubscriptionIdentityHeaders();
this.publishingTimestamp = message.getPublishingTimestamp();
this.readingTimestamp = message.getReadingTimestamp();
this.partitionOffset = message.partitionOffset;
this.partitionAssignmentTerm = message.partitionAssignmentTerm;
this.externalMetadata = message.getExternalMetadata();
this.additionalHeaders = message.getAdditionalHeaders();
this.schema = message.getSchema();

return this;
}

public Builder withData(byte [] data) {
this.message.data = data;
public Builder withData(byte[] data) {
this.data = data;
return this;
}

public Builder withSchema(CompiledSchema<Schema> schema) {
this.message.schema = Optional.of(schema);
this.schema = Optional.of(schema);
return this;
}

public Builder withExternalMetadata(Map<String, String> externalMetadata) {
this.message.externalMetadata = ImmutableMap.copyOf(externalMetadata);
this.externalMetadata = ImmutableMap.copyOf(externalMetadata);
return this;
}

public Builder withAdditionalHeaders(List<Header> additionalHeaders) {
this.message.additionalHeaders = ImmutableList.copyOf(additionalHeaders);
this.additionalHeaders = ImmutableList.copyOf(additionalHeaders);
return this;
}

public Builder withContentType(ContentType contentType) {
this.message.contentType = contentType;
this.contentType = contentType;

return this;
}

public Builder withNoSchema() {
this.message.schema = Optional.empty();
this.schema = Optional.empty();
return this;
}

public Message build() {
return message;
return new Message(
id, topic, data, contentType, schema, publishingTimestamp, readingTimestamp, partitionOffset, partitionAssignmentTerm, externalMetadata, additionalHeaders, subscription, hasSubscriptionIdentityHeaders
);
}
}
}

0 comments on commit b594e7e

Please sign in to comment.