Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static String formatForEach(
.indent(indent);
}

private String formatMethodSignature(String methodModifier) {
private String formatMethodSignature(String methodModifier, boolean withRequestOptions) {
// Examples:
// // Unary, a single request/response call.
// HelloReply sayHello(HelloRequest request);
Expand Down Expand Up @@ -97,12 +97,15 @@ private String formatMethodSignature(String methodModifier) {
if (requestStream || replyStream) {
sb.append("@NonNull final Pipeline<? super ").append(replyType).append("> replies");
}
if (withRequestOptions) {
sb.append(", ").append("@NonNull final RequestOptions requestOptions");
}
sb.append(")");

return sb.toString();
}

String formatMethodDeclaration() {
String formatMethodDeclaration(String methodModifier, boolean withRequestOptions, boolean withSemicolon) {
final StringBuilder sb = new StringBuilder();
if (javaDoc.contains("\n")) {
sb.append("/**\n");
Expand All @@ -112,33 +115,111 @@ String formatMethodDeclaration() {
sb.append("/** ").append(javaDoc).append(" */\n");
}

sb.append(formatMethodSignature(null));
sb.append(";");
sb.append(formatMethodSignature(methodModifier, withRequestOptions));
if (withSemicolon) {
sb.append(";");
}

return sb.toString();
}

/**
* Format a default implementation of a handler method.
* @param withRequestOptions if true, add the RequestOptions arg and delegate to the method w/o the arg;
* if false, the generated implementation throws.
* @return the default method implementation for a handler
*/
String formatDefaultMethod(boolean withRequestOptions) {
final StringBuilder sb = new StringBuilder();

sb.append(formatMethodDeclaration("default", withRequestOptions, false));
sb.append(" {\n");
if (!withRequestOptions) {
sb.append(" throw new UnsupportedOperationException(\"unimplemented\")");
} else {
// return type:
if (!requestStream && !replyStream) {
sb.append(" return ");
} else if (!requestStream) {
sb.append(" ");
} else {
sb.append(" return ");
}

// name and args:
sb.append(name).append('(');
if (!requestStream) {
sb.append("request");
if (replyStream) {
sb.append(", ");
}
}
if (requestStream || replyStream) {
sb.append("replies");
}
sb.append(")");
}

sb.append(";\n}\n");

return sb.toString();
}

String formatDelegateToMethodWithOptions() {
final StringBuilder sb = new StringBuilder();

// return type:
if (!requestStream && !replyStream) {
sb.append("return ");
} else if (!requestStream) {
// no-op because void return type
} else {
sb.append("return ");
}

// name and args:
sb.append(name).append('(');
if (!requestStream) {
sb.append("request");
if (replyStream) {
sb.append(", ");
}
}
if (requestStream || replyStream) {
sb.append("replies");
}
sb.append(", this.requestOptions);");

return sb.toString();
}

String formatCaseStatement() {
final String kind;
final String methodLambda;
if (!requestStream && !replyStream) {
kind = "unary";
methodLambda = "request -> " + name + "(request, options)";
} else if (requestStream && !replyStream) {
kind = "clientStreaming";
methodLambda = "typedReplies -> " + name + "(typedReplies, options)";
} else if (!requestStream && replyStream) {
kind = "serverStreaming";
methodLambda = "(request, typedReplies) -> " + name + "(request, typedReplies, options)";
} else {
kind = "bidiStreaming";
methodLambda = "typedReplies -> " + name + "(typedReplies, options)";
}

return """
case $methodName -> Pipelines.<$requestType, $replyType>$kind()
.mapRequest(bytes -> parse$simpleRequestType(bytes, options))
.method(this::$methodName)
.method($methodLambda)
.mapResponse(reply -> serialize$simpleReplyType(reply, options))
.respondTo(replies)
.build();
"""
.replace("$methodName", name)
.replace("$methodLambda", methodLambda)
.replace("$requestType", requestType)
.replace("$simpleRequestType", requestType.replace(".", ""))
.replace("$replyType", replyType)
Expand All @@ -149,7 +230,12 @@ String formatCaseStatement() {
private String formatUnaryMethodImplementation() {
return """
@Override
$methodSignature {
$methodSignatureWithoutOptions {
$delegateToMethodWithOptions
}

@Override
$methodSignatureWithOptions {
final AtomicReference<$replyType> replyRef = new AtomicReference<>();
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -181,7 +267,8 @@ public void onComplete() {
FULL_NAME + "/$methodName",
get$simpleRequestTypeCodec(requestOptions),
get$simpleReplyTypeCodec(requestOptions),
pipeline
pipeline,
requestOptions.metadata()
);
call.sendRequest(request, true);
try {
Expand All @@ -203,7 +290,9 @@ public void onComplete() {
throw new RuntimeException("Call to $methodName completed w/o receiving a reply or an error explicitly. The request was: " + request);
}
"""
.replace("$methodSignature", formatMethodSignature("public"))
.replace("$methodSignatureWithoutOptions", formatMethodSignature("public", false))
.replace("$delegateToMethodWithOptions", formatDelegateToMethodWithOptions())
.replace("$methodSignatureWithOptions", formatMethodSignature("public", true))
.replace("$requestType", requestType)
.replace("$simpleRequestType", requestType.replace(".", ""))
.replace("$replyType", replyType)
Expand All @@ -214,7 +303,12 @@ public void onComplete() {
private String formatClientStreamingMethodImplementation() {
return """
@Override
$methodSignature {
$methodSignatureWithoutOptions {
$delegateToMethodWithOptions
}

@Override
$methodSignatureWithOptions {
final AtomicReference<$replyType> replyRef = new AtomicReference<>();
final Pipeline<$replyType> pipeline = new Pipeline<>() {
@Override
Expand Down Expand Up @@ -243,7 +337,8 @@ public void onComplete() {
FULL_NAME + "/$methodName",
get$simpleRequestTypeCodec(requestOptions),
get$simpleReplyTypeCodec(requestOptions),
pipeline
pipeline,
requestOptions.metadata()
);

return new Pipeline<$requestType>() {
Expand All @@ -266,7 +361,9 @@ public void onComplete() {
};
}
"""
.replace("$methodSignature", formatMethodSignature("public"))
.replace("$methodSignatureWithoutOptions", formatMethodSignature("public", false))
.replace("$delegateToMethodWithOptions", formatDelegateToMethodWithOptions())
.replace("$methodSignatureWithOptions", formatMethodSignature("public", true))
.replace("$requestType", requestType)
.replace("$simpleRequestType", requestType.replace(".", ""))
.replace("$replyType", replyType)
Expand All @@ -277,7 +374,12 @@ public void onComplete() {
private String formatServerStreamingMethodImplementation() {
return """
@Override
$methodSignature {
$methodSignatureWithoutOptions {
$delegateToMethodWithOptions
}

@Override
$methodSignatureWithOptions {
final CountDownLatch latch = new CountDownLatch(1);
final Pipeline<$replyType> pipeline = new Pipeline<>() {
@Override
Expand All @@ -304,7 +406,8 @@ public void onComplete() {
FULL_NAME + "/$methodName",
get$simpleRequestTypeCodec(requestOptions),
get$simpleReplyTypeCodec(requestOptions),
pipeline
pipeline,
requestOptions.metadata()
);
call.sendRequest(request, true);
try {
Expand All @@ -318,7 +421,9 @@ public void onComplete() {
// Alternatively, the client could time out if the replies pipeline never saw onComplete().
}
"""
.replace("$methodSignature", formatMethodSignature("public"))
.replace("$methodSignatureWithoutOptions", formatMethodSignature("public", false))
.replace("$delegateToMethodWithOptions", formatDelegateToMethodWithOptions())
.replace("$methodSignatureWithOptions", formatMethodSignature("public", true))
.replace("$requestType", requestType)
.replace("$simpleRequestType", requestType.replace(".", ""))
.replace("$replyType", replyType)
Expand All @@ -329,7 +434,12 @@ public void onComplete() {
private String formatBidiStreamingMethodImplementation() {
return """
@Override
$methodSignature {
$methodSignatureWithoutOptions {
$delegateToMethodWithOptions
}

@Override
$methodSignatureWithOptions {
final Pipeline<$replyType> pipeline = new Pipeline<>() {
@Override
public void onSubscribe(final Flow.Subscription subscription) {
Expand All @@ -353,7 +463,8 @@ public void onComplete() {
FULL_NAME + "/$methodName",
get$simpleRequestTypeCodec(requestOptions),
get$simpleReplyTypeCodec(requestOptions),
pipeline
pipeline,
requestOptions.metadata()
);

return new Pipeline<$requestType>() {
Expand All @@ -377,7 +488,9 @@ public void onComplete() {
};
}
"""
.replace("$methodSignature", formatMethodSignature("public"))
.replace("$methodSignatureWithoutOptions", formatMethodSignature("public", false))
.replace("$delegateToMethodWithOptions", formatDelegateToMethodWithOptions())
.replace("$methodSignatureWithOptions", formatMethodSignature("public", true))
.replace("$requestType", requestType)
.replace("$simpleRequestType", requestType.replace(".", ""))
.replace("$replyType", replyType)
Expand Down Expand Up @@ -517,7 +630,7 @@ enum $serviceNameMethod implements Method {
$methodNames
}

$methodSignatures
$serviceMethods

@NonNull
default String serviceName() {
Expand Down Expand Up @@ -554,7 +667,23 @@ default Pipeline<? super Bytes> open(
$getCodecMethods
$requestReplySerdeMethods

/** A client class for $serviceName. */
/**
* A client class for $serviceName.
* <p>
* Its constructor accepts the default RequestOptions for all requests. Individual service call
* methods have versions that accept a RequestOptions argument which overrides this default value
* for that specific call.
* <p>
* In the context of a client, these RequestOptions control the contentType used to encode/decode
* requests and replies, and the contentType MUST match the contentType defined in the PbjGrpcClientConfig
* that the PbjGrpcClient was created with. The behavior is undefined if a per-request (or the per-stub)
* RequestOptions.contentType() differs from that. With the exception of the metadata, all the other
* RequestOptions values are unused in the client code.
* <p>
* However, the RequestOptions.metadata() is something that a client application may change freely
* from one request to another, and this is the primary reason for the overridden method versions
* in this client stub class.
*/
public class $serviceNameClient implements $serviceName$suffix {
private final GrpcClient grpcClient;
private final RequestOptions requestOptions;
Expand All @@ -578,7 +707,9 @@ public void close() {
.replace("$serviceName", serviceName)
.replace("$suffix", SUFFIX)
.replace("$methodNames", RPC.formatForEach(rpcList, RPC::name, ",\n", DEFAULT_INDENT * 2))
.replace("$methodSignatures", RPC.formatForEach(rpcList, RPC::formatMethodDeclaration, "\n\n", DEFAULT_INDENT))
.replace("$serviceMethods", RPC.formatForEach(rpcList, rpc ->
rpc.formatDefaultMethod(true) + "\n" + rpc.formatDefaultMethod(false)
, "\n\n", DEFAULT_INDENT))
.replace("$fullyQualifiedProtoServiceName", lookupHelper.getLookupHelper().getFullyQualifiedProtoNameForContext(serviceDef))
.replace("$methodCaseStatements", RPC.formatForEach(rpcList, RPC::formatCaseStatement, "\n", DEFAULT_INDENT * 4))
.replace("$getCodecMethods", formatGetCodecMethods(rpcList).indent(DEFAULT_INDENT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ public static void setNetworkBytesInspector(PbjGrpcNetworkBytesInspector network
headers.add(HeaderValues.create(
GRPC_ACCEPT_ENCODING, String.join(",", grpcClient.getConfig().acceptEncodings())));
headers.add(HeaderValues.create(GRPC_ENCODING, grpcOutgoingEncoding));

if (requestOptions.metadata() != null && !requestOptions.metadata().isEmpty()) {
for (String key : requestOptions.metadata().keySet()) {
if (key.startsWith("grpc-")) {
throw new IllegalArgumentException(
"Custom metadata key names must not start with grpc- prefix, got: " + key);
}

String value = requestOptions.metadata().get(key);
if (value != null) {
headers.add(HeaderNames.create(key), value);
}
}
}

clientStream.writeHeaders(Http2Headers.create(headers), false);

// We must start this loop only AFTER writing headers above because that operation initializes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.SocketException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -82,17 +83,20 @@ public void close() {
* @param requestCodec a PBJ codec for requests that MUST correspond to the content type in the PbjGrpcClientConfig
* @param replyCodec a PBJ codec for replies that MUST correspond to the content type in the PbjGrpcClientConfig
* @param pipeline a pipeline for receiving replies
* @param metadata metadata to be sent to the service
*/
@Override
public <RequestT, ReplyT> GrpcCall<RequestT, ReplyT> createCall(
final String fullMethodName,
final Codec<RequestT> requestCodec,
final Codec<ReplyT> replyCodec,
final Pipeline<ReplyT> pipeline) {
final Pipeline<ReplyT> pipeline,
final Map<String, String> metadata) {
final Options options = new Options(Optional.of(resolvedAuthority), config.contentType(), metadata);
return new PbjGrpcCall(
this,
createPbjGrpcClientStream(connection, clientConnection),
new Options(Optional.of(resolvedAuthority), config.contentType()),
options,
fullMethodName,
requestCodec,
replyCodec,
Expand Down Expand Up @@ -153,7 +157,8 @@ public PbjGrpcClientConfig getConfig() {
}

/** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */
private record Options(Optional<String> authority, String contentType) implements ServiceInterface.RequestOptions {}
private record Options(Optional<String> authority, String contentType, Map<String, String> metadata)
implements ServiceInterface.RequestOptions {}

private ClientConnection createClientConnection() {
// We cannot (don't want to) establish connections when unit-testing, so we use a marker:
Expand Down
Loading
Loading