Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose creation and modification date for subscription #1158

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
import pl.allegro.tech.hermes.api.constraints.ValidContentType;

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -19,6 +21,7 @@
import static pl.allegro.tech.hermes.api.constraints.Names.ALLOWED_NAME_REGEX;

@ValidContentType(message = "AVRO content type is not supported in BATCH delivery mode")
@JsonIgnoreProperties(value = {"createdAt", "modifiedAt"}, allowGetters = true)
public class Subscription implements Anonymizable {

@Valid
Expand Down Expand Up @@ -90,6 +93,10 @@ public class Subscription implements Anonymizable {

private boolean subscriptionIdentityHeadersEnabled;

private Instant createdAt;

private Instant modifiedAt;

public enum State {
PENDING, ACTIVE, SUSPENDED
}
Expand Down Expand Up @@ -420,6 +427,22 @@ public boolean isSubscriptionIdentityHeadersEnabled() {
return subscriptionIdentityHeadersEnabled;
}

public Instant getCreatedAt() {
return createdAt;
}

public void setCreatedAt(Long createdAt) {
this.createdAt = Instant.ofEpochMilli(createdAt);
}

public Instant getModifiedAt() {
return modifiedAt;
}

public void setModifiedAt(Long modifiedAt) {
this.modifiedAt = Instant.ofEpochMilli(modifiedAt);
}

@Override
public Subscription anonymize() {
if (getEndpoint().containsCredentials() || hasOAuthPolicy()) {
Expand Down
36 changes: 32 additions & 4 deletions hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.Objects;

@JsonIgnoreProperties(value = {"createdAt", "modifiedAt"}, allowGetters = true)
public class Topic {

@Valid
Expand Down Expand Up @@ -59,11 +62,15 @@ public enum Ack {

private final TopicDataOfflineStorage offlineStorage;

private Instant createdAt;

private Instant modifiedAt;

public Topic(TopicName name, String description, OwnerId owner, RetentionTime retentionTime,
boolean migratedFromJsonType, Ack ack, boolean trackingEnabled, ContentType contentType,
boolean jsonToAvroDryRunEnabled, boolean schemaVersionAwareSerializationEnabled,
int maxMessageSize, PublishingAuth publishingAuth, boolean subscribingRestricted,
TopicDataOfflineStorage offlineStorage) {
TopicDataOfflineStorage offlineStorage, Instant createdAt, Instant modifiedAt) {
this.name = name;
this.description = description;
this.owner = owner;
Expand All @@ -78,6 +85,8 @@ public Topic(TopicName name, String description, OwnerId owner, RetentionTime re
this.publishingAuth = publishingAuth;
this.subscribingRestricted = subscribingRestricted;
this.offlineStorage = offlineStorage;
this.createdAt = createdAt;
this.modifiedAt = modifiedAt;
}

@JsonCreator
Expand All @@ -95,14 +104,17 @@ public Topic(
@JsonProperty("maxMessageSize") Integer maxMessageSize,
@JsonProperty("auth") PublishingAuth publishingAuth,
@JsonProperty("subscribingRestricted") boolean subscribingRestricted,
@JsonProperty("offlineStorage") TopicDataOfflineStorage offlineStorage
) {
@JsonProperty("offlineStorage") TopicDataOfflineStorage offlineStorage,
@JsonProperty("createdAt") Instant createdAt,
@JsonProperty("modifiedAt") Instant modifiedAt
) {
this(TopicName.fromQualifiedName(qualifiedName), description, owner, retentionTime, migratedFromJsonType, ack,
trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaVersionAwareSerializationEnabled,
maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize,
publishingAuth == null ? PublishingAuth.disabled() : publishingAuth,
subscribingRestricted,
offlineStorage == null ? TopicDataOfflineStorage.defaultOfflineStorage() : offlineStorage
offlineStorage == null ? TopicDataOfflineStorage.defaultOfflineStorage() : offlineStorage,
createdAt, modifiedAt
);
}

Expand Down Expand Up @@ -223,6 +235,22 @@ public TopicDataOfflineStorage getOfflineStorage() {
return offlineStorage;
}

public Instant getCreatedAt() {
return createdAt;
}

public void setCreatedAt(Long createdAt) {
this.createdAt = Instant.ofEpochMilli(createdAt);
}

public Instant getModifiedAt() {
return modifiedAt;
}

public void setModifiedAt(Long modifiedAt) {
this.modifiedAt = Instant.ofEpochMilli(modifiedAt);
}

@Override
public String toString() {
return "Topic(" + getQualifiedName() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.Objects;

public class TopicWithSchema extends Topic {
Expand All @@ -16,7 +17,8 @@ public TopicWithSchema(Topic topic, String schema) {
this(schema, topic.getQualifiedName(), topic.getDescription(), topic.getOwner(), topic.getRetentionTime(),
topic.isJsonToAvroDryRunEnabled(), topic.getAck(), topic.isTrackingEnabled(), topic.wasMigratedFromJsonType(),
topic.isSchemaVersionAwareSerializationEnabled(), topic.getContentType(), topic.getMaxMessageSize(),
topic.getPublishingAuth(), topic.isSubscribingRestricted(), topic.getOfflineStorage());
topic.getPublishingAuth(), topic.isSubscribingRestricted(), topic.getOfflineStorage(), topic.getCreatedAt(),
topic.getModifiedAt());
}

@JsonCreator
Expand All @@ -34,10 +36,12 @@ public TopicWithSchema(@JsonProperty("schema") String schema,
@JsonProperty("maxMessageSize") Integer maxMessageSize,
@JsonProperty("auth") PublishingAuth publishingAuth,
@JsonProperty("subscribingRestricted") boolean subscribingRestricted,
@JsonProperty("offlineStorage") TopicDataOfflineStorage offlineStorage) {
@JsonProperty("offlineStorage") TopicDataOfflineStorage offlineStorage,
@JsonProperty("createdAt") Instant createdAt,
@JsonProperty("modifiedAt") Instant modifiedAt) {
super(qualifiedName, description, owner, retentionTime, jsonToAvroDryRunEnabled, ack, trackingEnabled,
migratedFromJsonType, schemaVersionAwareSerializationEnabled, contentType, maxMessageSize,
publishingAuth, subscribingRestricted, offlineStorage);
publishingAuth, subscribingRestricted, offlineStorage, createdAt, modifiedAt);
this.topic = convertToTopic();
this.schema = schema;
}
Expand All @@ -54,7 +58,8 @@ private Topic convertToTopic() {
return new Topic(this.getQualifiedName(), this.getDescription(), this.getOwner(), this.getRetentionTime(),
this.isJsonToAvroDryRunEnabled(), this.getAck(), this.isTrackingEnabled(), this.wasMigratedFromJsonType(),
this.isSchemaVersionAwareSerializationEnabled(), this.getContentType(), this.getMaxMessageSize(),
this.getPublishingAuth(), this.isSubscribingRestricted(), this.getOfflineStorage());
this.getPublishingAuth(), this.isSubscribingRestricted(), this.getOfflineStorage(), this.getCreatedAt(),
this.getModifiedAt());
}

public String getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -87,6 +88,10 @@ protected <T> Optional<T> readFrom(String path, TypeReference<T> type, boolean q
return readFrom(path, b -> (T) mapper.readValue(b, type), quiet);
}

protected <T> Optional<T> readWithStatFrom(String path, Class<T> clazz, BiConsumer<T, Stat> statDecorator, boolean quiet) {
return readWithStatFrom(path, b -> mapper.readValue(b, clazz), statDecorator, quiet);
}

private <T> Optional<T> readFrom(String path, ThrowingReader<T> supplier, boolean quiet) {
try {
byte[] data = zookeeper.getData().forPath(path);
Expand All @@ -109,6 +114,27 @@ private <T> Optional<T> readFrom(String path, ThrowingReader<T> supplier, boolea
return Optional.empty();
}

private <T> Optional<T> readWithStatFrom(String path, ThrowingReader<T> supplier, BiConsumer<T, Stat> statDecorator, boolean quiet) {
try {
Stat stat = new Stat();
byte[] data = zookeeper.getData().storingStatIn(stat).forPath(path);
if (ArrayUtils.isNotEmpty(data)) {
T t = supplier.read(data);
statDecorator.accept(t, stat);
return Optional.of(t);
}
} catch (JsonMappingException malformedException) {
logWarnOrThrowException("Unable to read data from path " + path,
new MalformedDataException(path, malformedException), quiet);
} catch (InternalProcessingException e) {
throw e;
} catch (Exception exception) {
logWarnOrThrowException("Unable to read data from path " + path, new InternalProcessingException(exception),
quiet);
}
return Optional.empty();
}

private void logWarnOrThrowException(String message, RuntimeException e, Boolean quiet) {
if (quiet) {
logger.warn(message, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Instant;
import java.util.Collection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -93,7 +95,16 @@ public void updateSubscriptionState(TopicName topicName, String subscriptionName

@Override
public Subscription getSubscriptionDetails(TopicName topicName, String subscriptionName) {
return getSubscriptionDetails(topicName, subscriptionName, false).get();
ensureSubscriptionExists(topicName, subscriptionName);
return readWithStatFrom(
paths.subscriptionPath(topicName, subscriptionName),
Subscription.class,
(sub, stat) -> {
sub.setCreatedAt(stat.getCtime());
sub.setModifiedAt(stat.getMtime());
},
false
).get();
}

private Optional<Subscription> getSubscriptionDetails(TopicName topicName, String subscriptionName, boolean quiet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,16 @@ public void touchTopic(TopicName topicName) {

@Override
public Topic getTopicDetails(TopicName topicName) {
return getTopicDetails(topicName, false).get();
ensureTopicExists(topicName);
return readWithStatFrom(
paths.topicPath(topicName),
Topic.class,
(topic, stat) -> {
topic.setCreatedAt(stat.getCtime());
topic.setModifiedAt(stat.getMtime());
},
false
).get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException
import pl.allegro.tech.hermes.infrastructure.MalformedDataException
import pl.allegro.tech.hermes.test.IntegrationTest

import java.time.Instant

import static pl.allegro.tech.hermes.api.PatchData.patchData
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
Expand Down Expand Up @@ -82,6 +84,7 @@ class ZookeeperSubscriptionRepositoryTest extends IntegrationTest {

def "should return subscription details"() {
given:
def timestamp = Instant.now()
repository.createSubscription(subscription(TOPIC, 'details', EndpointAddress.of('hello'))
.withDescription('my description')
.build())
Expand All @@ -93,6 +96,10 @@ class ZookeeperSubscriptionRepositoryTest extends IntegrationTest {
then:
subscription.description == 'my description'
subscription.endpoint == EndpointAddress.of('hello')

and: 'createdAt and modifiedAt are greater or equal than timestamp'
!subscription.createdAt.isBefore(timestamp)
!subscription.modifiedAt.isBefore(timestamp)
}

def "should throw exception when trying to return details of unknown subscription"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException
import pl.allegro.tech.hermes.infrastructure.MalformedDataException
import pl.allegro.tech.hermes.test.IntegrationTest

import java.time.Instant

import static pl.allegro.tech.hermes.metrics.PathContext.pathContext
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
Expand Down Expand Up @@ -107,6 +109,7 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {

def "should load topic details"() {
given:
def timestamp = Instant.now()
repository.createTopic(topic(GROUP, 'details').withDescription('description').build())
wait.untilTopicCreated(GROUP, 'details')

Expand All @@ -115,6 +118,10 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {

then:
retrievedTopic.description == 'description'

and: 'createdAt and modifiedAt are greater than or equal to timestamp'
!retrievedTopic.createdAt.isBefore(timestamp)
!retrievedTopic.modifiedAt.isBefore(timestamp)
}

def "should update topic"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pl.allegro.tech.hermes.api.TopicDataOfflineStorage;
import pl.allegro.tech.hermes.api.TopicName;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -72,7 +73,7 @@ public Topic build() {
name, description, owner, retentionTime, migratedFromJsonType, ack, trackingEnabled, contentType,
jsonToAvroDryRunEnabled, schemaVersionAwareSerialization, maxMessageSize,
new PublishingAuth(publishers, authEnabled, unauthenticatedAccessEnabled), subscribingRestricted,
offlineStorage
offlineStorage, null, null
);
}

Expand Down