/
ReactiveGreeterGrpcService.java
76 lines (64 loc) · 2.81 KB
/
ReactiveGreeterGrpcService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package org.lognet.springboot.grpc.demo;
import io.grpc.Status;
import io.grpc.examples.reactor.ReactiveHelloRequest;
import io.grpc.examples.reactor.ReactiveHelloResponse;
import io.grpc.examples.reactor.ReactorReactiveGreeterGrpc;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
import org.lognet.springboot.grpc.recovery.GRpcExceptionHandler;
import org.lognet.springboot.grpc.recovery.GRpcExceptionScope;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Profile;
import org.springframework.security.access.annotation.Secured;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.stream.IntStream;
@GRpcService
@Slf4j
@ConditionalOnClass(Transactional.class)
@Profile("!reactive-buggy-security")
public class ReactiveGreeterGrpcService extends ReactorReactiveGreeterGrpc.ReactiveGreeterImplBase {
private ReactiveGreeterService reactiveGreeterService;
public ReactiveGreeterGrpcService(ReactiveGreeterService reactiveGreeterService) {
this.reactiveGreeterService = reactiveGreeterService;
}
@Override
@Secured({})
public Mono<ReactiveHelloResponse> greet(Mono<ReactiveHelloRequest> request) {
Optional.ofNullable(SecurityContextHolder.getContext().getAuthentication())
.orElseThrow();
return reactiveGreeterService.greet(request);
}
@Override
public Mono<ReactiveHelloResponse> greet(ReactiveHelloRequest request) {
return super.greet(request); //for tests
}
@Override
public Flux<ReactiveHelloResponse> multiGreet(Mono<ReactiveHelloRequest> request) {
return request.flatMapIterable(r ->
IntStream.range(0, r.getName().length())
.mapToObj(i -> ReactiveHelloResponse.newBuilder()
.setMessage(String.format("Hello %d,%s ", i, r.getName()))
.build())
.toList()
);
}
@Override
public Flux<ReactiveHelloResponse> streamGreet(Flux<ReactiveHelloRequest> request) {
return request.flatMap(r -> Mono.just(
ReactiveHelloResponse.newBuilder()
.setMessage(String.format("Hello ,%s ", r.getName()))
.build()
)
);
}
@GRpcExceptionHandler
public Status handle(Exception ex, GRpcExceptionScope scope) {
var status = Status.INVALID_ARGUMENT.withDescription(ex.getLocalizedMessage()).withCause(ex);
log.error("(GrpcExceptionAdvice) : ", ex);
return status;
}
}