Skip to content

Commit

Permalink
[#2274] Introduce the source API (#2327)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismatix committed Sep 7, 2021
1 parent 73526d0 commit 0f3ab35
Show file tree
Hide file tree
Showing 108 changed files with 2,449 additions and 277 deletions.
2 changes: 2 additions & 0 deletions .bazelignore
@@ -1,2 +1,4 @@
docs/node_modules
docs/.docusaurus
docs/build
node_modules
Expand Up @@ -8,7 +8,6 @@
import co.airy.model.channel.dto.ChannelContainer;
import co.airy.model.metadata.MetadataKeys;
import co.airy.model.metadata.dto.MetadataMap;
import co.airy.uuid.UUIDv5;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -37,18 +36,18 @@ public ChannelsController(Stores stores) {
}

@PostMapping("/channels.list")
ResponseEntity<ChannelsResponsePayload> listChannels(@RequestBody(required = false) @Valid ListChannelRequestPayload requestPayload) {
ResponseEntity<ChannelsResponsePayload> listChannels(@RequestBody(required = false) @Valid ListChannelRequestPayload payload) {
final List<ChannelContainer> channels = stores.getChannels();
final String sourceToFilter = Optional.ofNullable(requestPayload).map(ListChannelRequestPayload::getSource).orElse(null);
final String sourceToFilter = Optional.ofNullable(payload).map(ListChannelRequestPayload::getSource).orElse(null);
return ResponseEntity.ok(new ChannelsResponsePayload(channels.stream()
.filter((container) -> sourceToFilter == null || sourceToFilter.equals(container.getChannel().getSource()))
.map(ChannelPayload::fromChannelContainer)
.collect(toList())));
}

@PostMapping("/channels.info")
ResponseEntity<?> getChannel(@RequestBody @Valid GetChannelRequestPayload requestPayload) {
final ChannelContainer container = stores.getChannel(requestPayload.getChannelId().toString());
ResponseEntity<?> getChannel(@RequestBody @Valid GetChannelRequestPayload payload) {
final ChannelContainer container = stores.getChannel(payload.getChannelId().toString());
if (container == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}
Expand All @@ -57,19 +56,19 @@ ResponseEntity<?> getChannel(@RequestBody @Valid GetChannelRequestPayload reques
}

@PostMapping("/channels.update")
ResponseEntity<?> updateChannel(@RequestBody @Valid UpdateChannelRequestPayload requestPayload) {
final String channelId = requestPayload.getChannelId().toString();
ResponseEntity<?> updateChannel(@RequestBody @Valid UpdateChannelRequestPayload payload) {
final String channelId = payload.getChannelId().toString();
final ChannelContainer container = stores.getChannel(channelId);
if (container == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}

final MetadataMap metadataMap = container.getMetadataMap();
if (requestPayload.getName() != null) {
metadataMap.put(MetadataKeys.ChannelKeys.NAME, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, requestPayload.getName()));
if (payload.getName() != null) {
metadataMap.put(MetadataKeys.ChannelKeys.NAME, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, payload.getName()));
}
if (requestPayload.getImageUrl() != null) {
metadataMap.put(MetadataKeys.ChannelKeys.IMAGE_URL, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, requestPayload.getImageUrl()));
if (payload.getImageUrl() != null) {
metadataMap.put(MetadataKeys.ChannelKeys.IMAGE_URL, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, payload.getImageUrl()));
}

try {
Expand All @@ -82,17 +81,17 @@ ResponseEntity<?> updateChannel(@RequestBody @Valid UpdateChannelRequestPayload
}

@PostMapping("/channels.chatplugin.connect")
ResponseEntity<?> connect(@RequestBody @Valid ConnectChannelRequestPayload requestPayload) {
final String sourceChannelId = requestPayload.getName();
ResponseEntity<?> connect(@RequestBody @Valid ConnectChannelRequestPayload payload) {
final String sourceChannelId = payload.getName();
final String sourceIdentifier = "chatplugin";

final String channelId = UUID.randomUUID().toString();

List<Metadata> metadataList = new ArrayList<>();
metadataList.add(newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, requestPayload.getName()));
metadataList.add(newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, payload.getName()));

if (requestPayload.getImageUrl() != null) {
metadataList.add(newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, requestPayload.getImageUrl()));
if (payload.getImageUrl() != null) {
metadataList.add(newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, payload.getImageUrl()));
}

final ChannelContainer container = ChannelContainer.builder()
Expand All @@ -116,8 +115,8 @@ ResponseEntity<?> connect(@RequestBody @Valid ConnectChannelRequestPayload reque
}

@PostMapping("/channels.chatplugin.disconnect")
ResponseEntity<?> disconnect(@RequestBody @Valid ChannelDisconnectRequestPayload requestPayload) {
final String channelId = requestPayload.getChannelId().toString();
ResponseEntity<?> disconnect(@RequestBody @Valid ChannelDisconnectRequestPayload payload) {
final String channelId = payload.getChannelId().toString();

final ChannelContainer container = stores.getConnectedChannelsStore().get(channelId);

Expand Down
Expand Up @@ -36,7 +36,6 @@

import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.getSubject;
import static co.airy.model.metadata.MetadataRepository.isChannelMetadata;

@Component
public class Stores implements HealthIndicator, ApplicationListener<ApplicationStartedEvent>, DisposableBean {
Expand Down Expand Up @@ -65,9 +64,8 @@ public Stores(KafkaStreamsWrapper streams, KafkaProducer<String, SpecificRecordB
public void onApplicationEvent(ApplicationStartedEvent event) {
final StreamsBuilder builder = new StreamsBuilder();

// metadata table keyed by channel id
// metadata table keyed by subject id
final KTable<String, MetadataMap> metadataTable = builder.<String, Metadata>table(applicationCommunicationMetadata)
.filter((metadataId, metadata) -> isChannelMetadata(metadata))
.groupBy((metadataId, metadata) -> KeyValue.pair(getSubject(metadata).getIdentifier(), metadata))
.aggregate(MetadataMap::new, MetadataMap::adder, MetadataMap::subtractor);

Expand Down
Expand Up @@ -2,16 +2,15 @@

import co.airy.avro.communication.Status;
import co.airy.avro.communication.Webhook;
import co.airy.core.api.admin.payload.WebhookInfoRequestPayload;
import co.airy.core.api.admin.payload.WebhookListResponsePayload;
import co.airy.core.api.admin.payload.WebhookResponsePayload;
import co.airy.core.api.admin.payload.WebhookInfoRequestPayload;
import co.airy.core.api.admin.payload.WebhookSubscribePayload;
import co.airy.core.api.admin.payload.WebhookUnsubscribePayload;
import co.airy.core.api.config.ServiceDiscovery;
import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.model.event.payload.EventType;
import co.airy.spring.web.payload.RequestErrorResponsePayload;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down
Expand Up @@ -6,7 +6,7 @@
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -19,7 +19,7 @@
public class WebhookSubscribePayload {
private UUID id;
@NotNull
private URI url;
private URL url;
private Map<String, String> headers = new HashMap<>();
private List<EventType> events = new ArrayList<>();
private String signatureKey;
Expand Down
Expand Up @@ -5,8 +5,6 @@
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@AllArgsConstructor
Expand Down
Expand Up @@ -3,12 +3,8 @@
import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.core.api.config.dto.ServiceInfo;
import co.airy.core.api.config.payload.ServicesResponsePayload;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
Expand Down
Expand Up @@ -3,7 +3,6 @@
import co.airy.avro.communication.Status;
import co.airy.avro.communication.Webhook;
import co.airy.core.api.config.ServiceDiscovery;
import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.core.api.config.dto.ServiceInfo;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
Expand All @@ -23,7 +22,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -39,7 +37,6 @@
import java.util.UUID;

import static co.airy.test.Timing.retryOnException;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down
Expand Up @@ -2,7 +2,6 @@

import co.airy.avro.communication.Metadata;
import co.airy.avro.communication.ReadReceipt;
import co.airy.model.conversation.Conversation;
import co.airy.core.api.communication.dto.LuceneQueryResult;
import co.airy.core.api.communication.lucene.AiryAnalyzer;
import co.airy.core.api.communication.lucene.ExtendedQueryParser;
Expand All @@ -15,6 +14,7 @@
import co.airy.core.api.communication.payload.ConversationTagRequestPayload;
import co.airy.core.api.communication.payload.ConversationUpdateContactRequestPayload;
import co.airy.core.api.communication.payload.PaginationData;
import co.airy.model.conversation.Conversation;
import co.airy.model.metadata.MetadataKeys;
import co.airy.model.metadata.Subject;
import co.airy.model.metadata.dto.MetadataMap;
Expand Down Expand Up @@ -119,9 +119,9 @@ private ResponseEntity<?> queryConversations(Query query, Integer cursor, int pa
}

@PostMapping("/conversations.info")
ResponseEntity<?> conversationInfo(@RequestBody @Valid ConversationByIdRequestPayload requestPayload) {
ResponseEntity<?> conversationInfo(@RequestBody @Valid ConversationByIdRequestPayload payload) {
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(requestPayload.getConversationId().toString());
final Conversation conversation = store.get(payload.getConversationId().toString());

if (conversation == null) {
return ResponseEntity.notFound().build();
Expand All @@ -145,9 +145,9 @@ private List<Conversation> fetchAllConversations() {
}

@PostMapping("/conversations.markRead")
ResponseEntity<?> conversationMarkRead(@RequestBody @Valid ConversationByIdRequestPayload requestPayload) {
ResponseEntity<?> conversationMarkRead(@RequestBody @Valid ConversationByIdRequestPayload payload) {
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final String conversationId = requestPayload.getConversationId().toString();
final String conversationId = payload.getConversationId().toString();
final Conversation conversation = store.get(conversationId);

if (conversation == null) {
Expand All @@ -169,9 +169,9 @@ ResponseEntity<?> conversationMarkRead(@RequestBody @Valid ConversationByIdReque
}

@PostMapping("/conversations.tag")
ResponseEntity<?> conversationTag(@RequestBody @Valid ConversationTagRequestPayload requestPayload) {
final String conversationId = requestPayload.getConversationId().toString();
final String tagId = requestPayload.getTagId().toString();
ResponseEntity<?> conversationTag(@RequestBody @Valid ConversationTagRequestPayload payload) {
final String conversationId = payload.getConversationId().toString();
final String tagId = payload.getTagId().toString();
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(conversationId);

Expand All @@ -191,9 +191,9 @@ ResponseEntity<?> conversationTag(@RequestBody @Valid ConversationTagRequestPayl
}

@PostMapping("/conversations.untag")
ResponseEntity<?> conversationUntag(@RequestBody @Valid ConversationTagRequestPayload requestPayload) {
final String conversationId = requestPayload.getConversationId().toString();
final String tagId = requestPayload.getTagId().toString();
ResponseEntity<?> conversationUntag(@RequestBody @Valid ConversationTagRequestPayload payload) {
final String conversationId = payload.getConversationId().toString();
final String tagId = payload.getTagId().toString();
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(conversationId);

Expand All @@ -213,9 +213,9 @@ ResponseEntity<?> conversationUntag(@RequestBody @Valid ConversationTagRequestPa
}

@PostMapping("/conversations.setState")
ResponseEntity<?> conversationSetState(@RequestBody @Valid ConversationSetStateRequestPayload requestPayload) {
final String conversationId = requestPayload.getConversationId().toString();
final String state = requestPayload.getState();
ResponseEntity<?> conversationSetState(@RequestBody @Valid ConversationSetStateRequestPayload payload) {
final String conversationId = payload.getConversationId().toString();
final String state = payload.getState();
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(conversationId);

Expand All @@ -235,8 +235,8 @@ ResponseEntity<?> conversationSetState(@RequestBody @Valid ConversationSetStateR
}

@PostMapping("/conversations.removeState")
ResponseEntity<?> conversationRemoveState(@RequestBody @Valid ConversationByIdRequestPayload requestPayload) {
final String conversationId = requestPayload.getConversationId().toString();
ResponseEntity<?> conversationRemoveState(@RequestBody @Valid ConversationByIdRequestPayload payload) {
final String conversationId = payload.getConversationId().toString();
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(conversationId);

Expand All @@ -255,9 +255,9 @@ ResponseEntity<?> conversationRemoveState(@RequestBody @Valid ConversationByIdRe
}

@PostMapping("/conversations.updateContact")
ResponseEntity<?> conversationUpdateContact(@RequestBody @Valid ConversationUpdateContactRequestPayload requestPayload) {
final String conversationId = requestPayload.getConversationId().toString();
final String displayName = requestPayload.getDisplayName();
ResponseEntity<?> conversationUpdateContact(@RequestBody @Valid ConversationUpdateContactRequestPayload payload) {
final String conversationId = payload.getConversationId().toString();
final String displayName = payload.getDisplayName();
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final Conversation conversation = store.get(conversationId);

Expand Down
Expand Up @@ -4,9 +4,9 @@
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.avro.communication.DeliveryState;
import co.airy.avro.communication.Message;
import co.airy.model.conversation.Conversation;
import co.airy.core.api.communication.payload.SendMessageRequestPayload;
import co.airy.kafka.schema.application.ApplicationCommunicationMessages;
import co.airy.model.conversation.Conversation;
import co.airy.model.message.dto.MessageContainer;
import co.airy.model.message.dto.MessageResponsePayload;
import co.airy.model.metadata.dto.MetadataMap;
Expand Down
Expand Up @@ -4,7 +4,6 @@
import co.airy.avro.communication.Message;
import co.airy.avro.communication.Metadata;
import co.airy.avro.communication.ReadReceipt;
import co.airy.model.conversation.Conversation;
import co.airy.core.api.communication.dto.CountAction;
import co.airy.core.api.communication.dto.Messages;
import co.airy.core.api.communication.dto.UnreadCountState;
Expand All @@ -18,6 +17,7 @@
import co.airy.kafka.schema.application.ApplicationCommunicationReadReceipts;
import co.airy.kafka.streams.KafkaStreamsWrapper;
import co.airy.model.channel.dto.ChannelContainer;
import co.airy.model.conversation.Conversation;
import co.airy.model.message.dto.MessageContainer;
import co.airy.model.metadata.MetadataKeys;
import co.airy.model.metadata.Subject;
Expand All @@ -39,7 +39,6 @@
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -58,7 +57,6 @@
import static java.util.stream.Collectors.toCollection;

@Component
@RestController
public class Stores implements HealthIndicator, ApplicationListener<ApplicationStartedEvent>, DisposableBean {
private static final String appId = "api.CommunicationStores";

Expand Down
Expand Up @@ -3,13 +3,8 @@
import co.airy.model.message.dto.MessageContainer;
import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Comparator;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;

import static java.util.Comparator.comparing;

public class Messages extends ConcurrentSkipListSet<MessageContainer> {
@JsonCreator
public Messages() {
Expand Down
@@ -1,7 +1,7 @@
package co.airy.core.api.communication.lucene;

import co.airy.model.conversation.Conversation;
import co.airy.core.api.communication.dto.ConversationIndex;
import co.airy.model.conversation.Conversation;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
Expand Down
@@ -1,7 +1,7 @@
package co.airy.core.api.communication.payload;

import co.airy.model.conversation.Conversation;
import co.airy.model.channel.ChannelPayload;
import co.airy.model.conversation.Conversation;
import co.airy.model.message.dto.MessageResponsePayload;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.AllArgsConstructor;
Expand Down

0 comments on commit 0f3ab35

Please sign in to comment.