Skip to content

Commit

Permalink
Merge branch 'release/0.27.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Jul 27, 2021
2 parents 4893c6f + 0eab4a8 commit b14a691
Show file tree
Hide file tree
Showing 137 changed files with 3,114 additions and 2,676 deletions.
4 changes: 2 additions & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ build --stamp --workspace_status_command=tools/build/bazel_status.sh

# Output test errors by default
test --test_output=errors
# Do not build production artifacts to speed up tests
# Build outputs should be validated by running build
# Do not build production artifacts like docker images to speed up tests
# Production outputs should be validated by running build
test --build_tests_only


Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
if: ${{ startsWith(github.ref, 'refs/heads/main') }}
run: |
sudo apt-get install -y expect
bazel run //frontend/chat-plugin:publish-npm release
bazel run //frontend/chat-plugin/lib:publish-npm release
env:
DEPLOY_NPM_USERNAME: ${{ secrets.DEPLOY_NPM_USERNAME }}
DEPLOY_NPM_PASSWORD: ${{ secrets.DEPLOY_NPM_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.26.3
0.27.0
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import co.airy.avro.communication.Webhook;
import co.airy.core.api.admin.payload.GetWebhookResponse;
import co.airy.core.api.admin.payload.WebhookSubscriptionPayload;
import co.airy.core.api.config.ServiceDiscovery;
import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.spring.web.payload.RequestErrorResponsePayload;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -17,13 +20,25 @@
@RestController
public class WebhooksController {
private final Stores stores;
private final ServiceDiscovery serviceDiscovery;

public WebhooksController(Stores stores) {
public WebhooksController(Stores stores, ServiceDiscovery serviceDiscovery) {
this.stores = stores;
this.serviceDiscovery = serviceDiscovery;
}

@PostMapping("/webhooks.subscribe")
public ResponseEntity<?> subscribe(@RequestBody @Valid WebhookSubscriptionPayload payload) {
final ComponentInfo component = serviceDiscovery.getComponent("integration-webhook");
if (component == null || !component.isEnabled()) {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body(new RequestErrorResponsePayload("The webhook component needs to be enabled. Learn more: https://airy.co/docs/core/api/webhook"));
}
if (!component.isHealthy()) {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body(new RequestErrorResponsePayload("The webhook component is enabled, but not healthy. Check the Kubernetes cluster state."));
}

final Webhook webhook = Webhook.newBuilder()
.setId(UUID.randomUUID().toString())
.setEndpoint(payload.getUrl())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.airy.core.api.config;

import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.core.api.config.dto.ServiceInfo;
import co.airy.core.api.config.payload.ServicesResponsePayload;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -13,6 +14,7 @@
import org.springframework.web.client.RestTemplate;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,6 +39,21 @@ public Map<String, ServiceInfo> getServices() {
return services;
}

public ComponentInfo getComponent(String componentName) {
return getServices().values().stream()
.filter((serviceInfo) -> serviceInfo.getComponent().equals(componentName))
.reduce(null, (componentInfo, serviceInfo) -> {
componentInfo = Optional.ofNullable(componentInfo).orElse(new ComponentInfo(false, true));
componentInfo.setEnabled(serviceInfo.isEnabled());
// One unhealthy service means that the component is unhealthy
componentInfo.setEnabled(componentInfo.isHealthy() && serviceInfo.isHealthy());
return componentInfo;
}, (v1, v2) -> {
v1.setHealthy(v1.isHealthy() && v2.isHealthy());
return v1;
});
}

@Scheduled(fixedRate = 1_000)
public void updateComponentsStatus() {
final ResponseEntity<ServicesResponsePayload> response = restTemplate.getForEntity(String.format("http://airy-controller.%s/services", namespace),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package co.airy.core.api.config.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ComponentInfo {
private boolean enabled;
private boolean healthy;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package co.airy.core.api.admin;

import co.airy.core.api.config.ServiceDiscovery;
import co.airy.core.api.config.dto.ComponentInfo;
import co.airy.core.api.config.dto.ServiceInfo;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationTags;
Expand All @@ -15,14 +18,23 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Map;

import static co.airy.test.Timing.retryOnException;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand All @@ -39,6 +51,13 @@ public class WebhooksControllerTest {
@Autowired
private WebTestHelper webTestHelper;

@Autowired
@InjectMocks
private WebhooksController webhooksController;

@MockBean
private ServiceDiscovery serviceDiscovery;

private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationWebhooks applicationCommunicationWebhooks = new ApplicationCommunicationWebhooks();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
Expand All @@ -64,6 +83,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws Exception {
MockitoAnnotations.openMocks(this);
webTestHelper.waitUntilHealthy();
}

Expand All @@ -76,6 +96,22 @@ public void canManageWebhook() throws Exception {

final String payload = "{\"url\":\"" + url + "\",\"headers\":{\"X-Auth\":\"" + xAuthHeader + "\"}}";

when(serviceDiscovery.getComponent(Mockito.anyString())).thenCallRealMethod();
// One service of the component is failing
doReturn(Map.of(
"webhook-consumer", new ServiceInfo(true, false, "integration-webhook"),
"webhook-publisher", new ServiceInfo(true, true, "integration-webhook")
)).when(serviceDiscovery).getServices();

webTestHelper.post("/webhooks.subscribe", payload)
.andExpect(status().isConflict());

// Component is healthy
doReturn(Map.of(
"webhook-consumer", new ServiceInfo(true, true, "integration-webhook"),
"webhook-publisher", new ServiceInfo(true, true, "integration-webhook")
)).when(serviceDiscovery).getServices();

webTestHelper.post("/webhooks.subscribe", payload)
.andExpect(status().isOk())
.andExpect(jsonPath("$.url", equalTo(url)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private List<Conversation> fetchAllConversations() {
return conversations;
}

@PostMapping("/conversations.read")
@PostMapping("/conversations.markRead")
ResponseEntity<?> conversationMarkRead(@RequestBody @Valid ConversationByIdRequestPayload requestPayload) {
final ReadOnlyKeyValueStore<String, Conversation> store = stores.getConversationsStore();
final String conversationId = requestPayload.getConversationId().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import co.airy.avro.communication.ReadReceipt;
import co.airy.core.api.communication.dto.Conversation;
import co.airy.core.api.communication.dto.CountAction;
import co.airy.core.api.communication.dto.MessagesTreeSet;
import co.airy.core.api.communication.dto.Messages;
import co.airy.core.api.communication.dto.UnreadCountState;
import co.airy.core.api.communication.lucene.IndexingProcessor;
import co.airy.core.api.communication.lucene.LuceneDiskStore;
Expand All @@ -27,7 +27,6 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
Expand Down Expand Up @@ -137,7 +136,7 @@ private void startStream() {


// messages store
messageGroupedTable.aggregate(MessagesTreeSet::new,
messageGroupedTable.aggregate(Messages::new,
(key, value, aggregate) -> {
aggregate.update(value);
return aggregate;
Expand Down Expand Up @@ -194,7 +193,7 @@ public ReadOnlyKeyValueStore<String, Conversation> getConversationsStore() {
return streams.acquireLocalStore(conversationsStore);
}

public ReadOnlyKeyValueStore<String, MessagesTreeSet> getMessagesStore() {
public ReadOnlyKeyValueStore<String, Messages> getMessagesStore() {
return streams.acquireLocalStore(messagesStore);
}

Expand Down Expand Up @@ -254,8 +253,8 @@ public List<Conversation> addChannelMetadata(List<Conversation> conversations) {
}

public List<MessageContainer> getMessages(String conversationId) {
final ReadOnlyKeyValueStore<String, MessagesTreeSet> store = getMessagesStore();
final MessagesTreeSet messagesTreeSet = store.get(conversationId);
final ReadOnlyKeyValueStore<String, Messages> store = getMessagesStore();
final Messages messagesTreeSet = store.get(conversationId);

return messagesTreeSet == null ? null : new ArrayList<>(messagesTreeSet);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package co.airy.core.api.communication.dto;

import co.airy.model.message.dto.MessageContainer;
import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Comparator;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;

import static java.util.Comparator.comparing;

public class Messages extends ConcurrentSkipListSet<MessageContainer> {
@JsonCreator
public Messages() {
super((MessageContainer c1, MessageContainer c2) -> {
if (c1.getMessage().getSentAt() == c2.getMessage().getSentAt()) {
// If messages share the same timestamp
return c1.getMessage().getId().compareTo(c2.getMessage().getId());
}

return Long.compare(c2.getMessage().getSentAt(), c1.getMessage().getSentAt());
});
}

// TreeSet does not support updating objects
public void update(MessageContainer container) {
remove(container);
add(container);
}
}

This file was deleted.

0 comments on commit b14a691

Please sign in to comment.