Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unread count #42

Merged
merged 4 commits into from Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/api/conversations/BUILD
Expand Up @@ -6,12 +6,14 @@ app_deps = [
"//backend/avro/communication:message",
"//backend/avro/communication:channel",
"//backend/avro/communication:metadata-action",
"//backend/avro/communication:read-receipt",
"//backend/lib/payload",
"//backend/lib/pagination",
"//backend/lib/kafka/schema:kafka-schema",
"//backend/lib/kafka/schema:application-communication-messages",
"//backend/lib/kafka/schema:application-communication-channels",
"//backend/lib/kafka/schema:application-communication-metadata",
"//backend/lib/kafka/schema:application-communication-read-receipts",
"//backend/lib/spring/kafka/core:spring-kafka-core",
"//backend/lib/spring/kafka/streams:spring-kafka-streams",
]
Expand Down
Expand Up @@ -44,6 +44,7 @@ public static ConversationResponsePayload fromConversation(Conversation conversa
.name(conversation.getChannel().getName())
.build())
.id(conversation.getId())
.unreadMessageCount(conversation.getUnreadCount())
.createdAt(ISO_FROM_MILLIS(conversation.getCreatedAt()))
.contact(ContactResponsePayload.builder()
.avatarUrl(metadata.get(MetadataKeys.SOURCE.CONTACT.AVATAR_URL))
Expand Down
Expand Up @@ -2,20 +2,26 @@

import co.airy.avro.communication.Channel;
import co.airy.avro.communication.Message;
import co.airy.avro.communication.ReadReceipt;
import co.airy.avro.communication.MetadataAction;
import co.airy.avro.communication.MetadataActionType;
import co.airy.avro.communication.SenderType;
import co.airy.core.api.conversations.dto.Conversation;
import co.airy.core.api.conversations.dto.CountAction;
import co.airy.core.api.conversations.dto.MessagesTreeSet;
import co.airy.core.api.conversations.dto.UnreadCountState;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.javatuples.Pair;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
Expand All @@ -27,9 +33,12 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.toCollection;

@Component
@RestController
public class Stores implements ApplicationListener<ApplicationStartedEvent>, DisposableBean {
Expand Down Expand Up @@ -62,8 +71,40 @@ private void startStream() {
return aggregate;
});

messageStream
final KStream<String, Pair<CountAction, Long>> resetStream = builder.<String, ReadReceipt>stream(new ApplicationCommunicationReadReceipts().name())
.mapValues((readReceipt -> Pair.with(CountAction.RESET, readReceipt.getLastReadDate())));

// unread counts
final KTable<String, UnreadCountState> unreadCountTable = messageStream
.mapValues((message -> Pair.with(CountAction.INCREMENT, message.getSentAt())))
.merge(resetStream)
.groupByKey()
.aggregate(UnreadCountState::new, (conversationId, pair, unreadCountState) -> {
final CountAction countAction = pair.getValue0();
final Long actionDate = pair.getValue1();

if (countAction.equals(CountAction.INCREMENT)) {
unreadCountState.getMessageSentDates().add(actionDate);
} else {
unreadCountState.setMessageSentDates(
unreadCountState.getMessageSentDates().stream()
.filter((timestamp) -> timestamp > actionDate)
.collect(toCollection(HashSet::new))
);
}

return unreadCountState;
});

unreadCountTable.toStream()
.peek((conversationId, unreadCountState) -> {
// TODO send to websocket queue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending the websocket PR

});

final KGroupedStream<String, Message> messageGroupedStream = messageStream.groupByKey();

// messages store
messageGroupedStream
.aggregate(MessagesTreeSet::new,
((key, value, aggregate) -> {
aggregate.add(value);
Expand All @@ -72,8 +113,8 @@ private void startStream() {
Materialized.as(MESSAGES_STORE)
);

messageStream
.groupByKey()
// conversations store
messageGroupedStream
.aggregate(Conversation::new,
(conversationId, message, aggregate) -> {
if (aggregate.getLastMessage() == null) {
Expand All @@ -99,7 +140,15 @@ private void startStream() {
return conversation;
})
.leftJoin(metadataTable, (conversation, metadataMap) -> {
conversation.setMetadata(metadataMap);
if (metadataMap != null) {
lucapette marked this conversation as resolved.
Show resolved Hide resolved
conversation.setMetadata(metadataMap);
}
return conversation;
})
.leftJoin(unreadCountTable, (conversation, unreadCountState) -> {
if (unreadCountState != null) {
conversation.setUnreadCount(unreadCountState.getUnreadCount());
}
return conversation;
}, Materialized.as(CONVERSATIONS_STORE));

Expand Down
Expand Up @@ -23,17 +23,20 @@ public class Conversation implements Serializable {
private String sourceConversationId;
private Channel channel;

Integer unreadCount;

@Builder.Default
Map<String, String> metadata = new HashMap<>();

public String getDisplayName() {
if (this.metadata == null) {
final String firstName = this.metadata.get(MetadataKeys.SOURCE.CONTACT.FIRST_NAME);
final String lastName = this.metadata.get(MetadataKeys.SOURCE.CONTACT.LAST_NAME);

if (firstName == null && lastName == null) {
return null;
}

return String.format("%s %s", this.metadata.get(MetadataKeys.SOURCE.CONTACT.FIRST_NAME),
this.metadata.get(MetadataKeys.SOURCE.CONTACT.LAST_NAME)
);
return String.format("%s %s", firstName, lastName).trim();
}

public String getId() {
Expand Down
@@ -0,0 +1,6 @@
package co.airy.core.api.conversations.dto;

public enum CountAction {
INCREMENT,
RESET
}
@@ -0,0 +1,19 @@
package co.airy.core.api.conversations.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.HashSet;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UnreadCountState implements Serializable {
HashSet<Long> messageSentDates = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set<Long> for the declaration


public Integer getUnreadCount() {
return messageSentDates.size();
}
}
Expand Up @@ -6,6 +6,7 @@
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.test.TestHelper;
import co.airy.kafka.test.junit.SharedKafkaTestResource;
import co.airy.spring.core.AirySpringBootApplication;
Expand Down Expand Up @@ -52,13 +53,15 @@ class ConversationsByIdTest {
private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationReadReceipts applicationCommunicationReadReceipts = new ApplicationCommunicationReadReceipts();

@BeforeAll
static void beforeAll() throws Exception {
testHelper = new TestHelper(sharedKafkaTestResource,
applicationCommunicationMessages,
applicationCommunicationChannels,
applicationCommunicationMetadata
applicationCommunicationMetadata,
applicationCommunicationReadReceipts
);

testHelper.beforeAll();
Expand Down
Expand Up @@ -6,6 +6,7 @@
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.test.TestHelper;
import co.airy.kafka.test.junit.SharedKafkaTestResource;
import co.airy.spring.core.AirySpringBootApplication;
Expand Down Expand Up @@ -56,13 +57,15 @@ class ConversationsFilterTest {
private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationReadReceipts applicationCommunicationReadReceipts = new ApplicationCommunicationReadReceipts();

@BeforeAll
static void beforeAll() throws Exception {
testHelper = new TestHelper(sharedKafkaTestResource,
applicationCommunicationMessages,
applicationCommunicationChannels,
applicationCommunicationMetadata
applicationCommunicationMetadata,
applicationCommunicationReadReceipts
);

testHelper.beforeAll();
Expand Down
Expand Up @@ -6,6 +6,7 @@
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.test.TestHelper;
import co.airy.kafka.test.junit.SharedKafkaTestResource;
import co.airy.payload.format.DateFormat;
Expand All @@ -30,9 +31,7 @@
import java.util.UUID;

import static co.airy.core.api.conversations.util.ConversationGenerator.getMessages;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
Expand All @@ -58,13 +57,15 @@ public class MessagesTest {
private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationReadReceipts applicationCommunicationReadReceipts = new ApplicationCommunicationReadReceipts();

@BeforeAll
static void beforeAll() throws Exception {
testHelper = new TestHelper(sharedKafkaTestResource,
applicationCommunicationMessages,
applicationCommunicationChannels,
applicationCommunicationMetadata
applicationCommunicationMetadata,
applicationCommunicationReadReceipts
);

testHelper.beforeAll();
Expand Down
Expand Up @@ -8,6 +8,7 @@
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.test.TestHelper;
import co.airy.kafka.test.junit.SharedKafkaTestResource;
import co.airy.spring.core.AirySpringBootApplication;
Expand Down Expand Up @@ -43,12 +44,9 @@
}, classes = AirySpringBootApplication.class)
@ExtendWith(SpringExtension.class)
@AutoConfigureMockMvc
public class SendMessageRequestControllerIntegrationTest {
public class SendMessageRequestControllerTest {
@RegisterExtension
public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static TestHelper testHelper;
private static String facebookConversationId = "facebook-conversation-id";
private static boolean testDataInitialized = false;
Expand All @@ -69,17 +67,24 @@ public class SendMessageRequestControllerIntegrationTest {
@Autowired
private MockMvc mvc;

private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationReadReceipts applicationCommunicationReadReceipts = new ApplicationCommunicationReadReceipts();

@BeforeAll
static void beforeAll() throws Exception {
testHelper = new TestHelper(sharedKafkaTestResource,
applicationCommunicationMetadata,
applicationCommunicationMessages,
applicationCommunicationChannels
applicationCommunicationChannels,
applicationCommunicationMetadata,
applicationCommunicationReadReceipts
);

testHelper.beforeAll();
}


@AfterAll
static void afterAll() throws Exception {
testHelper.afterAll();
Expand Down