Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
718 changes: 718 additions & 0 deletions pcip/pcip-2.md

Large diffs are not rendered by default.

Binary file added pcip/static/img/pcip-2/img-pulsar-RPC.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 26 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
<org.testing.version>7.10.2</org.testing.version>
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<google-java-format.version>1.10.0</google-java-format.version>
<commons-pool.version>2.12.0</commons-pool.version>
<awaitility.version>4.2.2</awaitility.version>
<testcontainers.version>1.20.1</testcontainers.version>
</properties>

<modules>
Expand All @@ -57,6 +60,7 @@
<module>pulsar-transaction-contrib</module>
<module>pulsar-metrics-contrib</module>
<module>pulsar-auth-contrib</module>
<module>pulsar-rpc-contrib</module>
</modules>

<dependencyManagement>
Expand All @@ -75,6 +79,22 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons-pool.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -84,7 +104,6 @@
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -100,6 +119,12 @@
<version>${org.testing.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
64 changes: 64 additions & 0 deletions pulsar-rpc-contrib/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache</groupId>
<artifactId>pulsar-java-contrib</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<inceptionYear>2024</inceptionYear>

<artifactId>pulsar-rpc-contrib</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.rpc.contrib.client;

import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

/**
* Default implementation of {@link RequestCallBack} that handles callback events for Pulsar RPC communications.
*/
@Slf4j
public class DefaultRequestCallBack<V> implements RequestCallBack<V> {

@Override
public void onSendRequestSuccess(String correlationId, MessageId messageId) {

}

@Override
public void onSendRequestError(String correlationId, Throwable t,
CompletableFuture<V> replyFuture) {
replyFuture.completeExceptionally(t);
}

@Override
public void onReplySuccess(String correlationId, String subscription,
V value, CompletableFuture<V> replyFuture) {
replyFuture.complete(value);
}

@Override
public void onReplyError(String correlationId, String subscription,
String errorMessage, CompletableFuture<V> replyFuture) {
replyFuture.completeExceptionally(new Exception(errorMessage));
}

@Override
public void onTimeout(String correlationId, Throwable t) {

}

@Override
public void onReplyMessageAckFailed(String correlationId, Consumer<V> consumer, Message<V> msg, Throwable t) {
consumer.acknowledgeAsync(msg.getMessageId()).exceptionally(ex -> {
log.warn("<onReplyMessageAckFailed> [{}] [{}] Acknowledging message {} failed again.",
msg.getTopicName(), correlationId, msg.getMessageId(), ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.rpc.contrib.client;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.rpc.contrib.common.PulsarRpcClientException;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;

/**
* Provides the functionality to send asynchronous requests and handle replies using Apache Pulsar as the
* messaging system. This client manages request-response interactions ensuring that messages are sent
* to the correct topics and handling responses through callbacks.
*
* @param <T> The type of the request messages.
* @param <V> The type of the reply messages.
*/
public interface PulsarRpcClient<T, V> extends AutoCloseable {

/**
* Creates a builder for configuring a new {@link PulsarRpcClient}.
*
* @return A new instance of {@link PulsarRpcClientBuilder}.
*/
static <T, V> PulsarRpcClientBuilder<T, V> builder(@NonNull Schema<T> requestSchema,
@NonNull Schema<V> replySchema) {
return new PulsarRpcClientBuilderImpl<>(requestSchema, replySchema);
}

/**
* Synchronously sends a request and waits for the replies.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @return The reply value.
* @throws PulsarRpcClientException if an error occurs during the request or while waiting for the reply.
*/
default V request(String correlationId, T value) throws PulsarRpcClientException {
return request(correlationId, value, Collections.emptyMap());
}

/**
* Synchronously sends a request and waits for the replies.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param config Configuration map for creating a request producer,
* will call {@link TypedMessageBuilder#loadConf(Map)}
* @return The reply value.
* @throws PulsarRpcClientException if an error occurs during the request or while waiting for the reply.
*/
V request(String correlationId, T value, Map<String, Object> config) throws PulsarRpcClientException;

/**
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @return A CompletableFuture that will complete with the reply value.
*/
default CompletableFuture<V> requestAsync(String correlationId, T value) {
return requestAsync(correlationId, value, Collections.emptyMap());
}

/**
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param config Configuration map for creating a request producer,
* will call {@link TypedMessageBuilder#loadConf(Map)}
* @return A CompletableFuture that will complete with the reply value.
*/
CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config);

/**
* Removes a request from the tracking map based on its correlation ID.
*
* <p>When this method is executed, ReplyListener the received message will not be processed again.
* You need to make sure that this request has been processed through the callback, or you need to resend it.
*
* @param correlationId The correlation ID of the request to remove.
*/
void removeRequest(String correlationId);

@VisibleForTesting
int pendingRequestSize();

/**
* Closes this client and releases any resources associated with it. This includes closing any active
* producers and consumers and clearing pending requests.
*
* @throws PulsarRpcClientException if there is an error during the closing process.
*/
@Override
void close() throws PulsarRpcClientException;
}
Loading