Skip to content

Commit

Permalink
add test for unread count
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Pröschel committed Oct 7, 2020
1 parent 226888d commit c2b08ef
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 5 deletions.
Expand Up @@ -54,7 +54,8 @@ public class Stores implements ApplicationListener<ApplicationStartedEvent>, Dis
private void startStream() {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, Message> messageStream = builder.<String, Message>stream(new ApplicationCommunicationMessages().name());
final KStream<String, Message> messageStream = builder.<String, Message>stream(new ApplicationCommunicationMessages().name())
.selectKey((messageId, message) -> message.getConversationId());

final KTable<String, Channel> channelTable = builder.table(new ApplicationCommunicationChannels().name());

Expand Down Expand Up @@ -100,7 +101,7 @@ private void startStream() {
// TODO send to websocket queue
});

final KGroupedStream<String, Message> messageGroupedStream = messageStream.groupBy((messageId, message) -> message.getConversationId());
final KGroupedStream<String, Message> messageGroupedStream = messageStream.groupByKey();

// messages store
messageGroupedStream
Expand Down
@@ -0,0 +1,132 @@
package co.airy.core.api.conversations;

import co.airy.avro.communication.Channel;
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.avro.communication.ReadReceipt;
import co.airy.core.api.conversations.util.ConversationGenerator;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.test.TestHelper;
import co.airy.kafka.test.junit.SharedKafkaTestResource;
import co.airy.spring.core.AirySpringBootApplication;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.servlet.MockMvc;

import java.time.Instant;
import java.util.UUID;

import static co.airy.core.api.conversations.util.ConversationGenerator.getConversationRecords;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"kafka.cleanup=true",
"kafka.commit-interval-ms=100"
}, classes = AirySpringBootApplication.class)
@ExtendWith(SpringExtension.class)
@AutoConfigureMockMvc
class UnreadCountTest {


@RegisterExtension
public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource();
private static TestHelper testHelper;

@Autowired
private MockMvc mvc;

private static final ApplicationCommunicationMessages applicationCommunicationMessages = new ApplicationCommunicationMessages();
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationReadReceipts applicationCommunicationReadReceipts = new ApplicationCommunicationReadReceipts();

@BeforeAll
static void beforeAll() throws Exception {
testHelper = new TestHelper(sharedKafkaTestResource,
applicationCommunicationMessages,
applicationCommunicationChannels,
applicationCommunicationMetadata,
applicationCommunicationReadReceipts
);

testHelper.beforeAll();
}

@AfterAll
static void afterAll() throws Exception {
testHelper.afterAll();
}

@BeforeEach
void init() throws Exception {
testHelper.waitForCondition(
() -> mvc.perform(get("/health")).andExpect(status().isOk()),
"Application is not healthy"
);
}

@Test
void shouldResetTheUnreadCount() throws Exception {
final Channel channel = Channel.newBuilder()
.setConnectionState(ChannelConnectionState.CONNECTED)
.setId("channel-id")
.setName("channel-name")
.setSource("facebook")
.setSourceChannelId("ps-id")
.build();

testHelper.produceRecord(new ProducerRecord<>(applicationCommunicationChannels.name(), channel.getId(), channel));

final String conversationId = UUID.randomUUID().toString();

final Integer unreadMessages = 3;

testHelper.produceRecords(getConversationRecords(
ConversationGenerator.CreateConversation.builder()
.channel(channel)
.messageCount(unreadMessages.longValue())
.conversationId(conversationId)
.build()
));

testHelper.waitForCondition(
() -> mvc.perform(post("/conversations.by_id")
.headers(buildHeaders())
.content("{\"conversation_id\":\"" + conversationId + "\"}"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.unread_message_count", equalTo(unreadMessages))),
"Conversation list not showing unread count"
);

testHelper.produceRecord(new ProducerRecord<>(applicationCommunicationReadReceipts.name(), conversationId,
ReadReceipt.newBuilder()
.setConversationId(conversationId)
.setLastReadDate(Instant.now().toEpochMilli())
.build()
));
}

private HttpHeaders buildHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());
return headers;
}
}
6 changes: 3 additions & 3 deletions docs/api.md
Expand Up @@ -87,7 +87,7 @@ Example Response:
"id": "1e7674d7-b575-4683-8a77-d2651b9e3149-relayed",
"sent_at": "2019-01-07T09:01:44.000Z"
},
"min_unread_message_count": 1
"unread_message_count": 1,
}
],
"response_metadata": {
Expand Down Expand Up @@ -118,7 +118,7 @@ Example Response:

**Sample Response**

```
```json5
{
"id": "a688d36c-a85e-44af-bc02-4248c2c97622",
"channel": {
Expand All @@ -139,7 +139,7 @@ Example Response:
"id": "1e7674d7-b575-4683-8a77-d2651b9e3149-relayed",
"sent_at": "2019-01-07T09:01:44.000Z"
},
"min_unread_message_count": 1
"unread_message_count": 1
}
```

Expand Down

0 comments on commit c2b08ef

Please sign in to comment.