Skip to content

Commit

Permalink
Merge branch 'release/0.39.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Feb 1, 2022
2 parents c56de16 + b614cc7 commit da45e58
Show file tree
Hide file tree
Showing 138 changed files with 3,586 additions and 3,452 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ package-lock.json

# Testing
cypress

# Airy config files
airy.yaml
cli.yaml
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.38.1
0.39.0
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
# Airy Bazel tools
git_repository(
name = "com_github_airyhq_bazel_tools",
commit = "2f3dc965c8ad367d10d62d754f99336cd1cdd40f",
commit = "603c42f934d41c53bd93987419920cd9bc7959e7",
remote = "https://github.com/airyhq/bazel-tools.git",
shallow_since = "1634214650 +0200",
shallow_since = "1643188910 +0100",
)

load("@com_github_airyhq_bazel_tools//:repositories.bzl", "airy_bazel_tools_dependencies", "airy_jvm_deps")
Expand Down
14 changes: 14 additions & 0 deletions backend/api/admin/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@com_github_airyhq_bazel_tools//lint:buildifier.bzl", "check_pkg")
load("//tools/build:java_library.bzl", "custom_java_library")
load("//tools/build:springboot.bzl", "springboot")
load("//tools/build:junit5.bzl", "junit5")
load("//tools/build:container_release.bzl", "container_release")
Expand All @@ -13,6 +14,12 @@ app_deps = [
"//backend/model/template",
"//backend/model/event",
"//lib/java/uuid",
"//lib/java/pagination",
"//lib/java/date",
"//backend/avro:http-log",
"//backend/avro:user",
"//lib/java/kafka/schema:ops-application-logs",
"//lib/java/kafka/schema:application-communication-users",
"//lib/java/spring/auth:spring-auth",
"//lib/java/spring/web:spring-web",
"//lib/java/spring/kafka/core:spring-kafka-core",
Expand All @@ -27,13 +34,20 @@ springboot(
deps = app_deps,
)

custom_java_library(
name = "test-util",
srcs = ["src/test/java/co/airy/core/api/admin/util/Topics.java"],
deps = app_deps,
)

[
junit5(
size = "medium",
file = file,
resources = glob(["src/test/resources/**/*"]),
deps = [
":app",
":test-util",
"//backend:base_test",
"//lib/java/kafka/test:kafka-test",
"//lib/java/spring/test:spring-test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
import co.airy.avro.communication.Metadata;
import co.airy.avro.communication.Tag;
import co.airy.avro.communication.Template;
import co.airy.avro.communication.User;
import co.airy.avro.communication.Webhook;
import co.airy.avro.ops.HttpLog;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationTags;
import co.airy.kafka.schema.application.ApplicationCommunicationTemplates;
import co.airy.kafka.schema.application.ApplicationCommunicationUsers;
import co.airy.kafka.schema.application.ApplicationCommunicationWebhooks;
import co.airy.kafka.schema.ops.OpsApplicationLogs;
import co.airy.kafka.streams.KafkaStreamsWrapper;
import co.airy.model.channel.dto.ChannelContainer;
import co.airy.model.metadata.dto.MetadataMap;
Expand All @@ -32,8 +36,10 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static co.airy.core.api.admin.TimestampExtractor.timestampExtractor;
import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.getSubject;

Expand All @@ -48,12 +54,15 @@ public class Stores implements HealthIndicator, ApplicationListener<ApplicationS
private final String tagsStore = "tags-store";
private final String webhooksStore = "webhooks-store";
private final String templatesStore = "templates-store";
private final String usersStore = "users-store";

private final String applicationCommunicationChannels = new ApplicationCommunicationChannels().name();
private final String applicationCommunicationWebhooks = new ApplicationCommunicationWebhooks().name();
private final String applicationCommunicationTags = new ApplicationCommunicationTags().name();
private final String applicationCommunicationMetadata = new ApplicationCommunicationMetadata().name();
private final String applicationCommunicationTemplates = new ApplicationCommunicationTemplates().name();
private final String applicationCommunicationUsers = new ApplicationCommunicationUsers().name();
private final String opsApplicationLogs = new OpsApplicationLogs().name();

public Stores(KafkaStreamsWrapper streams, KafkaProducer<String, SpecificRecordBase> producer) {
this.streams = streams;
Expand All @@ -64,7 +73,7 @@ public Stores(KafkaStreamsWrapper streams, KafkaProducer<String, SpecificRecordB
public void onApplicationEvent(ApplicationStartedEvent event) {
final StreamsBuilder builder = new StreamsBuilder();

// metadata table keyed by subject id
// Metadata table keyed by subject id
final KTable<String, MetadataMap> metadataTable = builder.<String, Metadata>table(applicationCommunicationMetadata)
.groupBy((metadataId, metadata) -> KeyValue.pair(getSubject(metadata).getIdentifier(), metadata))
.aggregate(MetadataMap::new, MetadataMap::adder, MetadataMap::subtractor);
Expand All @@ -79,6 +88,36 @@ public void onApplicationEvent(ApplicationStartedEvent event) {

builder.<String, Template>table(applicationCommunicationTemplates, Materialized.as(templatesStore));

final KTable<String, User> usersTable = builder.table(applicationCommunicationUsers, Materialized.as(usersStore));

// Extract users from the op log to the users topic
builder.<String, HttpLog>stream(opsApplicationLogs)
.filter((logId, log) -> log.getUserId() != null)
.selectKey((logId, log) -> log.getUserId())
// Extract the Kafka record timestamp header
.transform(timestampExtractor())
.leftJoin(usersTable, (logWithTimestamp, user) -> {
final HttpLog log = logWithTimestamp.getLog();
if (user == null) {
return User.newBuilder()
.setId(log.getUserId())
.setName(log.getUserName())
.setAvatarUrl(log.getUserAvatar())
.setFirstSeenAt(logWithTimestamp.getTimestamp())
.setLastSeenAt(logWithTimestamp.getTimestamp())
.build();
}

return User.newBuilder()
.setId(user.getId())
.setName(Optional.ofNullable(user.getName()).orElse(log.getUserName()))
.setAvatarUrl(Optional.ofNullable(user.getAvatarUrl()).orElse(log.getUserAvatar()))
.setFirstSeenAt(user.getFirstSeenAt())
.setLastSeenAt(logWithTimestamp.getTimestamp())
.build();
})
.to(applicationCommunicationUsers);

streams.start(builder.build(), appId);
}

Expand Down Expand Up @@ -125,6 +164,10 @@ public void deleteTemplate(Template template) {
producer.send(new ProducerRecord<>(applicationCommunicationTemplates, template.getId(), null));
}

public ReadOnlyKeyValueStore<String, User> getUsersStore() {
return streams.acquireLocalStore(usersStore);
}

public ReadOnlyKeyValueStore<String, ChannelContainer> getConnectedChannelsStore() {
return streams.acquireLocalStore(connectedChannelsStore);
}
Expand Down Expand Up @@ -175,6 +218,13 @@ public List<Webhook> getWebhooks() {
return webhooks;
}

public List<User> getUsers() {
final KeyValueIterator<String, User> iterator = getUsersStore().all();
List<User> users = new ArrayList<>();
iterator.forEachRemaining(kv -> users.add(kv.value));
return users;
}

@Override
public void destroy() {
if (streams != null) {
Expand All @@ -187,6 +237,7 @@ public Health health() {
getConnectedChannelsStore();
getWebhookStore();
getTagsStore();
getUsersStore();

return Health.up().build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package co.airy.core.api.admin;

import co.airy.avro.ops.HttpLog;
import co.airy.core.api.admin.dto.LogWithTimestamp;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;

public class TimestampExtractor {
public static TransformerSupplier<String, HttpLog, KeyValue<String, LogWithTimestamp>> timestampExtractor() {
return new TransformerSupplier<>() {
@Override
public Transformer<String, HttpLog, KeyValue<String, LogWithTimestamp>> get() {
return new Transformer<>() {

private ProcessorContext context;

@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}

@Override
public KeyValue<String, LogWithTimestamp> transform(String logId, HttpLog log) {
return KeyValue.pair(logId, new LogWithTimestamp(context.timestamp(), log));
}

@Override
public void close() {
}
};
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package co.airy.core.api.admin;

import co.airy.avro.communication.User;
import co.airy.core.api.admin.payload.ListUsersResponsePayload;
import co.airy.core.api.admin.payload.PaginationData;
import co.airy.core.api.admin.payload.UserResponsePayload;
import co.airy.core.api.admin.payload.UsersListRequestPayload;
import co.airy.pagination.Page;
import co.airy.pagination.Paginator;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;
import java.util.List;

import static java.util.stream.Collectors.toList;

@RestController
public class UsersController {
private final Stores stores;

public UsersController(Stores stores) {
this.stores = stores;
}

@PostMapping("/users.list")
public ResponseEntity<?> list(@RequestBody(required = false) @Valid UsersListRequestPayload payload) {
payload = payload == null ? new UsersListRequestPayload() : payload;

final List<User> users = stores.getUsers();
Paginator<User> paginator = new Paginator<>(users, User::getId)
.perPage(payload.getPageSize()).from(payload.getCursor());

Page<User> page = paginator.page();

return ResponseEntity.ok(ListUsersResponsePayload.builder()
.data(page.getData().stream().map(UserResponsePayload::fromUser).collect(toList()))
.paginationData(PaginationData.builder()
.nextCursor(page.getNextCursor())
.previousCursor(page.getPreviousCursor())
.total(users.size())
.build()).build());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package co.airy.core.api.admin.dto;

import co.airy.avro.ops.HttpLog;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class LogWithTimestamp implements Serializable {
private long timestamp;
private HttpLog log;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.airy.core.api.admin.payload;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ListUsersResponsePayload {
private List<UserResponsePayload> data;
private PaginationData paginationData;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package co.airy.core.api.admin.payload;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PaginationData {
private String previousCursor;
private String nextCursor;
private long total;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package co.airy.core.api.admin.payload;

import co.airy.avro.communication.User;
import lombok.Builder;
import lombok.Data;

import static co.airy.date.format.DateFormat.isoFromMillis;

@Data
@Builder
public class UserResponsePayload {
private String id;
private String name;
private String avatarUrl;
private String firstSeenAt;
private String lastSeenAt;

public static UserResponsePayload fromUser(User user) {
return UserResponsePayload.builder()
.id(user.getId())
.name(user.getName())
.avatarUrl(user.getAvatarUrl())
.firstSeenAt(isoFromMillis(user.getFirstSeenAt()))
.lastSeenAt(isoFromMillis(user.getLastSeenAt()))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package co.airy.core.api.admin.payload;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class UsersListRequestPayload {
private String cursor;
private Integer pageSize = 20;
}

0 comments on commit da45e58

Please sign in to comment.