Skip to content

Commit

Permalink
move more functionality (metrics, transports) to plugins (#155)
Browse files Browse the repository at this point in the history
* move more functionality (metrics, transports) to plugins

* rename `liiklusServiceImpl` -> `liiklusService`
  • Loading branch information
bsideup committed Jul 25, 2019
1 parent c758b72 commit 95331b6
Show file tree
Hide file tree
Showing 21 changed files with 526 additions and 194 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Expand Up @@ -18,8 +18,6 @@ bin/

/config/

/protocol/generated/
/examples/java/generated/

generated/

infrastructure.yaml
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -20,7 +20,7 @@ WORKDIR /app

RUN java -Xshare:dump

COPY --from=workspace /root/project/app/build/libs/app.jar app.jar
COPY --from=workspace /root/project/app/build/libs/app-boot.jar app.jar
COPY --from=workspace /root/project/plugins/*/build/libs/*.jar plugins/

ENV JAVA_OPTS=""
Expand Down
12 changes: 8 additions & 4 deletions app/build.gradle
Expand Up @@ -4,6 +4,14 @@ test {
useJUnit()
}

jar {
enabled = true
}

bootJar {
archiveClassifier = 'boot'
}

dependencies {
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
Expand All @@ -14,11 +22,7 @@ dependencies {
compile 'org.springframework.boot:spring-boot-starter-webflux'
compile 'org.springframework.fu:spring-fu-autoconfigure-adapter'

compile 'io.grpc:grpc-netty'
compile 'io.grpc:grpc-services'
compile 'io.prometheus:simpleclient_common'
compile 'org.pf4j:pf4j'
compile 'io.rsocket:rsocket-transport-netty'

testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
Expand Down
50 changes: 10 additions & 40 deletions app/src/main/java/com/github/bsideup/liiklus/Application.java
@@ -1,13 +1,9 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.config.GRPCConfiguration;
import com.github.bsideup.liiklus.config.LayersConfiguration;
import com.github.bsideup.liiklus.config.MetricsConfiguration;
import com.github.bsideup.liiklus.config.RSocketConfiguration;
import com.github.bsideup.liiklus.monitoring.MetricsCollector;
import com.github.bsideup.liiklus.config.GatewayConfiguration;
import com.github.bsideup.liiklus.plugins.LiiklusPluginManager;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -24,19 +20,13 @@
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.Profiles;
import org.springframework.core.env.SimpleCommandLinePropertySource;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping;

import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Paths;
import java.util.Collections;

@Slf4j
@SpringBootApplication
Expand Down Expand Up @@ -99,35 +89,15 @@ protected void load(ApplicationContext context, Object[] sources) {
binder.bind("spring.webflux", WebFluxProperties.class).orElseGet(WebFluxProperties::new),
new NettyReactiveWebServerFactory()
),
new GRPCConfiguration(),
new RSocketConfiguration(),
new LayersConfiguration(),
new MetricsConfiguration(),
new GatewayConfiguration(),
(GenericApplicationContext applicationContext) -> {
applicationContext.registerBean(RouterFunctionMapping.class, () -> {
var router = RouterFunctions.route();
router.GET("/health", __ -> ServerResponse.ok().syncBody("OK"));

if (environment.acceptsProfiles(Profiles.of("exporter"))) {
var metricsCollector = applicationContext.getBean(MetricsCollector.class);
router.GET("/prometheus", __ -> {
return metricsCollector.collect()
.collectList()
.flatMap(metrics -> {
try {
var writer = new StringWriter();
TextFormat.write004(writer, Collections.enumeration(metrics));
return ServerResponse.ok()
.contentType(MediaType.valueOf(TextFormat.CONTENT_TYPE_004))
.syncBody(writer.toString());
} catch (IOException e) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
});
});
}
return new RouterFunctionMapping(router.build());
applicationContext.registerBean("health", RouterFunction.class, () -> {
return RouterFunctions.route()
.GET("/health", __ -> ServerResponse.ok().syncBody("OK"))
.build();
});

applicationContext.registerBean(PluginManager.class, () -> pluginManager);
}
);

Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.github.bsideup.liiklus.records.RecordPostProcessor;
import com.github.bsideup.liiklus.records.RecordPreProcessor;
import com.github.bsideup.liiklus.service.LiiklusService;
import lombok.Data;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.ApplicationContextInitializer;
Expand All @@ -13,11 +14,12 @@
import java.util.Map;
import java.util.stream.Collectors;

public class LayersConfiguration implements ApplicationContextInitializer<GenericApplicationContext> {
public class GatewayConfiguration implements ApplicationContextInitializer<GenericApplicationContext> {

@Override
public void initialize(GenericApplicationContext applicationContext) {
var environment = applicationContext.getEnvironment();

if (!environment.acceptsProfiles(Profiles.of("gateway"))) {
return;
}
Expand All @@ -40,6 +42,8 @@ public void initialize(GenericApplicationContext applicationContext) {
.sorted(comparator.reversed())
.collect(Collectors.toList())
));

applicationContext.registerBean(LiiklusService.class);
}

@Data
Expand Down

This file was deleted.

Expand Up @@ -15,17 +15,13 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import com.salesforce.reactorgrpc.stub.SubscribeOnlyOnceLifter;
import io.grpc.Status;
import io.netty.buffer.ByteBuf;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;

import java.util.*;
Expand All @@ -41,7 +37,7 @@
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true)
@Slf4j
public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase implements LiiklusService {
public class LiiklusService {

private static final NavigableMap<Integer, Map<Integer, Long>> EMPTY_ACKED_OFFSETS = Collections.unmodifiableNavigableMap(new TreeMap<>());

Expand All @@ -57,12 +53,6 @@ public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.Liiklus

RecordPostProcessorChain recordPostProcessorChain;

@Override
public Mono<PublishReply> publish(PublishRequest message, ByteBuf metadata) {
return publish(Mono.just(message));
}

@Override
public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
return requestMono
.map(request -> new Envelope(
Expand Down Expand Up @@ -90,16 +80,9 @@ public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
.setOffset(it.getOffset())
.build()
)
.log("publish", Level.SEVERE, SignalType.ON_ERROR)
.onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException());
}

@Override
public Flux<SubscribeReply> subscribe(SubscribeRequest message, ByteBuf metadata) {
return subscribe(Mono.just(message));
.log("publish", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
return requestFlux
.flatMapMany(subscribe -> {
Expand Down Expand Up @@ -166,7 +149,7 @@ public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
Flux.from(source.getPublisher())
.log("partition-" + partition, Level.WARNING, SignalType.ON_ERROR)
.doFinally(__ -> sourcesByPartition.remove(partition))
.transform(Operators.lift(new SubscribeOnlyOnceLifter<Record>()))
// TODO .transform(Operators.lift(new SubscribeOnlyOnceLifter<Record>()))
)
);

Expand All @@ -183,16 +166,9 @@ public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
subscriptions.remove(sessionId, storedSubscription);
});
})
.log("subscribe", Level.SEVERE, SignalType.ON_ERROR)
.onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException());
}

@Override
public Flux<ReceiveReply> receive(ReceiveRequest message, ByteBuf metadata) {
return receive(Mono.just(message));
.log("subscribe", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
return requestMono
.flatMapMany(request -> {
Expand Down Expand Up @@ -235,16 +211,9 @@ public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
});
});
})
.log("receive", Level.SEVERE, SignalType.ON_ERROR)
.onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException());
.log("receive", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
public Mono<Empty> ack(AckRequest message, ByteBuf metadata) {
return ack(Mono.just(message));
}

@Override
public Mono<Empty> ack(Mono<AckRequest> request) {
return request
.flatMap(ack -> {
Expand Down Expand Up @@ -277,16 +246,9 @@ public Mono<Empty> ack(Mono<AckRequest> request) {
));
})
.thenReturn(Empty.getDefaultInstance())
.log("ack", Level.SEVERE, SignalType.ON_ERROR)
.onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException());
}

@Override
public Mono<GetOffsetsReply> getOffsets(GetOffsetsRequest message, ByteBuf metadata) {
return getOffsets(Mono.just(message));
.log("ack", Level.SEVERE, SignalType.ON_ERROR);
}

@Override
public Mono<GetOffsetsReply> getOffsets(Mono<GetOffsetsRequest> request) {
return request.flatMap(getOffsets -> Mono
.fromCompletionStage(positionsStorage.findAll(
Expand All @@ -299,20 +261,13 @@ public Mono<GetOffsetsReply> getOffsets(Mono<GetOffsetsRequest> request) {
.defaultIfEmpty(emptyMap())
.map(offsets -> GetOffsetsReply.newBuilder().putAllOffsets(offsets).build())
.log("getOffsets", Level.SEVERE, SignalType.ON_ERROR)
.onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException())
);
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) {
return getEndOffsets(Mono.just(message));
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(Mono<GetEndOffsetsRequest> request) {
return request.flatMap(getEndOffsets -> {
if (!(recordsStorage instanceof FiniteRecordsStorage)) {
return Mono.error(Status.INTERNAL.withDescription("The record storage is not finite").asException());
return Mono.error(new IllegalStateException("The record storage is not finite"));
}

var topic = getEndOffsets.getTopic();
Expand Down
12 changes: 11 additions & 1 deletion app/src/test/java/com/github/bsideup/liiklus/AckTest.java
Expand Up @@ -40,7 +40,17 @@ public void setUpAckTest() throws Exception {
public void testManualAck() throws Exception {
Integer partition = stub.subscribe(subscribeRequest)
.take(1)
.delayUntil(it -> stub.ack(AckRequest.newBuilder().setAssignment(it.getAssignment()).setOffset(100).build()))
.delayUntil(it -> {
return stub.ack(
AckRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.setGroupVersion(subscribeRequest.getGroupVersion())
.setPartition(it.getAssignment().getPartition())
.setOffset(100)
.build()
);
})
.map(it -> it.getAssignment().getPartition())
.blockFirst(Duration.ofSeconds(30));

Expand Down

0 comments on commit 95331b6

Please sign in to comment.