Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to active create stubs without explicit declarations @GrpcClient("clientName") #990

Closed
aeasylife opened this issue Nov 10, 2023 · 3 comments
Labels
question A question about this library or its usage

Comments

@aeasylife
Copy link

The context

I encountered a scenario where I need to connect multiple servers using the same client implementation.But I see in the examples that it is necessary to declare the client.

@Service
public class GrpcClientService {

    @GrpcClient("local-grpc-server")
    private SimpleBlockingStub simpleStub;

    public String sendMessage(final String name) {
        try {
            final HelloReply response = this.simpleStub.sayHello(HelloRequest.newBuilder().setName(name).build());
            return response.getMessage();
        } catch (final StatusRuntimeException e) {
            return "FAILED with " + e.getStatus().getCode().name();
        }
    }

}

I want to be able to dynamically create stubs.
like:

  private final Map<String, PushServiceGrpc.PushServiceStub> stubMap = new ConcurrentHashMap<>();
void init(){
....
ManagedChannel channel = builder.build();
PushServiceGrpc.PushServiceStub stub = createStub(channel);
 stubMap.put(name,stub);
}

void sub(String subscribeId,String serviceName, PushServiceGrpc.PushServiceStub stub){
  SubscribeRealTimeDataRequest request = ...
StreamObserver<SubscribeRealTimeDataResponse> streamObserver = ...
 stub.subscribeRealTimeData(request, streamObserver);
}

After debugging, I realized that I needed to modify the client in net.devh.boot.grpc.client.comfig.GrpcChannelsProperties.
And then I realized that it couldn't be modified

package net.devh.boot.grpc.client.config;
@ToString
@EqualsAndHashCode
@ConfigurationProperties("grpc")
public class GrpcChannelsProperties {
   private final Map<String, GrpcChannelProperties> client = new ConcurrentHashMap<>();
    public final Map<String, GrpcChannelProperties> getClient() {
        return this.client;
    }
}

Afterwards, I tried to write the following code,Most of it comes from source code.

@Data
@Component
@ConfigurationProperties(prefix = "service.push")
public class PushServiceGrpcProperties {

    private Map<String, GrpcChannelProperties> config;
}
@Component
public class GrpcConfig {

    @Autowired
    private PushServiceGrpcProperties grpcProperties;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private List<StubFactory> stubFactories;

    private final Map<String, PushServiceGrpc.PushServiceStub> stubMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        Map<String, GrpcChannelProperties> properties = getProperties();
        properties.keySet().forEach(this::initStub);
    }

    private Map<String, GrpcChannelProperties> getProperties() {
        return grpcProperties.getConfig();
    }

    private void initStub(String name) {
        final GrpcChannelProperties properties = getProperties().get(name);
        URI address = properties.getAddress();
        NettyChannelBuilder builder = NettyChannelBuilder.forTarget(address.toString())
                        .defaultLoadBalancingPolicy(properties.getDefaultLoadBalancingPolicy());
        configureKeepAlive(builder,name);
        configureSecurity(builder, name);
        configureLimits(builder, name);
        configureCompression(builder, name);

        final ManagedChannel channel = builder.build();
        PushServiceGrpc.PushServiceStub stub = createStub(channel);
        stubMap.put(name,stub);

    }

    protected void configureKeepAlive(final NettyChannelBuilder builder, final String name) {
        final GrpcChannelProperties properties = getPropertiesFor(name);
        if (properties.isEnableKeepAlive()) {
            builder.keepAliveTime(properties.getKeepAliveTime().toNanos(), TimeUnit.NANOSECONDS)
                    .keepAliveTimeout(properties.getKeepAliveTimeout().toNanos(), TimeUnit.NANOSECONDS)
                    .keepAliveWithoutCalls(properties.isKeepAliveWithoutCalls());
        }
    }

    protected void configureLimits(final NettyChannelBuilder builder, final String name) {
        final GrpcChannelProperties properties = getPropertiesFor(name);
        final DataSize maxInboundMessageSize = properties.getMaxInboundMessageSize();
        if (maxInboundMessageSize != null) {
            builder.maxInboundMessageSize((int) maxInboundMessageSize.toBytes());
        }
    }

    protected void configureCompression(final NettyChannelBuilder builder, final String name) {
        final GrpcChannelProperties properties = getPropertiesFor(name);
        if (properties.isFullStreamDecompression()) {
            builder.enableFullStreamDecompression();
        }
    }

    protected void configureSecurity(final NettyChannelBuilder builder, final String name) {
        final GrpcChannelProperties properties = getPropertiesFor(name);

        final NegotiationType negotiationType = properties.getNegotiationType();
        builder.negotiationType(of(negotiationType));
    }

    protected static io.grpc.netty.shaded.io.grpc.netty.NegotiationType of(final NegotiationType negotiationType) {
        switch (negotiationType) {
            case PLAINTEXT:
                return io.grpc.netty.shaded.io.grpc.netty.NegotiationType.PLAINTEXT;
            case PLAINTEXT_UPGRADE:
                return io.grpc.netty.shaded.io.grpc.netty.NegotiationType.PLAINTEXT_UPGRADE;
            case TLS:
                return io.grpc.netty.shaded.io.grpc.netty.NegotiationType.TLS;
            default:
                throw new IllegalArgumentException("Unsupported NegotiationType: " + negotiationType);
        }
    }

    private GrpcChannelProperties getPropertiesFor(final String name) {
        return getProperties().get(name);
    }


    public Map<String, PushServiceGrpc.PushServiceStub> getStubMap() {
        return stubMap;
    }

    private List<StubFactory> getStubFactories() {
        if (this.stubFactories == null) {
            this.stubFactories = new ArrayList<>(this.applicationContext.getBeansOfType(StubFactory.class).values());
            this.stubFactories.add(new FallbackStubFactory());
        }
        return this.stubFactories;
    }

    private PushServiceGrpc.PushServiceStub createStub(final Channel channel) {
        final StubFactory factory = getStubFactories().stream()
                .filter(stubFactory -> stubFactory.isApplicable(PushServiceGrpc.PushServiceStub.class))
                .findFirst()
                .orElseThrow(() -> new BeanInstantiationException(PushServiceGrpc.PushServiceStub.class,
                        "Unsupported stub type: " + PushServiceGrpc.PushServiceStub.class.getName() + " -> Please report this issue."));

        try {
            return (PushServiceGrpc.PushServiceStub) factory.createStub(PushServiceGrpc.PushServiceStub.class, channel);
        } catch (final Exception exception) {
            throw new BeanInstantiationException(PushServiceGrpc.PushServiceStub.class, "Failed to create gRPC stub of type " + PushServiceGrpc.PushServiceStub.class.getName(),
                    exception);
        }
    }
@Service
@Slf4j
@RequiredArgsConstructor
public class RealTimeDataService {
     private final GrpcConfig grpcConfig;
    public void init() {
    Map<String, PushServiceGrpc.PushServiceStub> stubMap = grpcConfig.getStubMap();
                if (!stubMap.isEmpty()) {
                    stubMap.forEach((k, v) -> {
                        sub(localHost.getHostAddress() + ":" + subId,k, v);
                    });
                }
}
public void sub(String subscribeId,String serviceName, PushServiceGrpc.PushServiceStub stub) {
        log.info("subId:{},serviceName:{}", subscribeId,serviceName);
        SubscribeRealTimeDataRequest request = SubscribeRealTimeDataRequest.newBuilder()
                .setSubscribeId(subscribeId)
                .build();
        StreamObserver<SubscribeRealTimeDataResponse> streamObserver = new StreamObserver<SubscribeRealTimeDataResponse>() {
            @Override
            public void onNext(SubscribeRealTimeDataResponse value) {
           ...
            }

            @Override
            public void onError(Throwable t) {
               ...
            }

            @Override
            public void onCompleted() {
          ...
            }
        };
        stub.subscribeRealTimeData(request, streamObserver);
    }
}
 service:
  push:
    config:
      server143:
        address: 'static://192.168.60.143:57501'
        enableKeepAlive: true
        keepAliveWithoutCalls: true
        negotiationType: plaintext
        maxInboundMessageSize: 64MB
      server122:
        address: 'static://192.168.60.122:57501'
        enableKeepAlive: true
        keepAliveWithoutCalls: true
        negotiationType: plaintext
        maxInboundMessageSize: 64MB
      server150:
        address: 'static://192.168.60.150:57501'
        enableKeepAlive: true
        keepAliveWithoutCalls: true
        negotiationType: plaintext
        maxInboundMessageSize: 64MB

It can run, but it's just a demo

The question

Is there a more elegant way to implement this feature?
Are there any examples in the future?

The application's environment

Which versions do you use?

  • Spring (boot): 2.3.12.RELEASE
  • grpc-spring-boot-starter: 2.14.0.RELEASE
  • java: version + architecture (64bit?) 1.8
@aeasylife aeasylife added the question A question about this library or its usage label Nov 10, 2023
@ST-DDT
Copy link
Collaborator

ST-DDT commented Nov 10, 2023

I had planned to create a bean that can be used to get preconfigured channels/stubs by name. But I currently don't have time to work on that:

PushServiceStub stub = gprcClientInjector.getStub(PushServiceStub.class, "server143");

@ST-DDT
Copy link
Collaborator

ST-DDT commented Nov 10, 2023

Currently you can only get the preconfigured Channels through GrpcChannelFactory beans.
That way you can at least omit the PushServiceGrpcProperties class.

@aeasylife
Copy link
Author

Currently you can only get the preconfigured Channels through GrpcChannelFactory beans. That way you can at least omit the PushServiceGrpcProperties class.

Thank you for your reply. PushServiceGrpcProperties is just an example of server-side configuration, which actually needs to be obtained from the service registration list. Unfortunately, I am unable to assist in completing the plan due to insufficient abilities. I look forward to updating the plan in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question A question about this library or its usage
Projects
None yet
Development

No branches or pull requests

2 participants