Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[go-client] support produce flush #3469

Merged
merged 16 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef struct _pulsar_producer pulsar_producer_t;

typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx);
typedef void (*pulsar_close_callback)(pulsar_result, void *ctx);
typedef void (*pulsar_flush_callback)(pulsar_result, void *ctx);

/**
* @return the topic to which producer is publishing to
Expand Down Expand Up @@ -114,6 +115,11 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
*/
void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx);

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
pulsar_result pulsar_producer_flush(pulsar_producer_t *producer);

void pulsar_producer_flush_async(pulsar_producer_t *producer, pulsar_flush_callback callback, void *ctx);

void pulsar_producer_free(pulsar_producer_t *producer);

#pragma GCC visibility pop
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/c/c_Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}

pulsar_result pulsar_producer_flush(pulsar_producer_t *producer) {
return (pulsar_result)producer->producer.flush();
}

void pulsar_producer_flush_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
producer->producer.flushAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}
6 changes: 6 additions & 0 deletions pulsar-client-go/pulsar/c_go_pulsar.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ static inline void _pulsar_client_create_producer_async(pulsar_client_t *client,
pulsar_client_create_producer_async(client, topic, conf, pulsarCreateProducerCallbackProxy, ctx);
}

void pulsarProducerFlushCallbackProxy(pulsar_result result, void *ctx);

static inline void _pulsar_producer_flush_async(pulsar_producer_t *producer, void *ctx){
pulsar_producer_flush_async(producer, pulsarProducerFlushCallbackProxy, ctx);
}

void pulsarProducerCloseCallbackProxy(pulsar_result result, void *ctx);

static inline void _pulsar_producer_close_async(pulsar_producer_t *producer, void *ctx) {
Expand Down
25 changes: 25 additions & 0 deletions pulsar-client-go/pulsar/c_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,28 @@ func pulsarProducerCloseCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback(nil)
}
}

func (p *producer) Flush() error {
f := make(chan error)
p.FlushAsync(func(err error) {
f <- err
close(f)
})
return <-f
}

func (p *producer) FlushAsync(callback func(error)) {
C._pulsar_producer_flush_async(p.ptr, savePointer(callback))
}


//export pulsarProducerFlushCallbackProxy
func pulsarProducerFlushCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(error))

if res != C.pulsar_result_Ok {
callback(newError(res, "Failed to flush Producer"))
} else {
callback(nil)
}
}
4 changes: 4 additions & 0 deletions pulsar-client-go/pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ type Producer interface {
// return the last sequence id published by this producer.
LastSequenceID() int64

// Flush all the messages buffered in the client and wait until all messages have been successfully
// persisted.
Flush() error

// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Expand Down
51 changes: 51 additions & 0 deletions pulsar-client-go/pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/stretchr/testify/assert"
"testing"
"time"

log "github.com/apache/pulsar/pulsar-client-go/logutil"
"github.com/stretchr/testify/assert"
)

func TestInvalidURL(t *testing.T) {
Expand Down Expand Up @@ -205,3 +208,51 @@ func TestProducerZstd(t *testing.T) {
}
}
}

func TestProducer_Flush(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()

topicName := "test-flush-in-producer"
subName := "subscription-name"

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
Properties: map[string]string{
"producer-name": "test-producer-name",
"producer-id": "test-producer-id",
},
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
Properties: map[string]string{
"consumer-name": "test-consumer-name",
"consumer-id": "test-consumer-id",
},
})
assert.Nil(t, err)
defer consumer.Close()

ctx := context.Background()
for i := 0; i < 10; i++ {
// Create a different message to send asynchronously
asyncMsg := ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
}
// Attempt to send the message asynchronously and handle the response
producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message %s successfully published", msg.Payload)
})
producer.Flush()
}
}
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestReaderConnectError(t *testing.T) {
assert.Nil(t, reader)
assert.NotNil(t, err)

assert.Equal(t, err.(*Error).Result(), ConnectError);
assert.Equal(t, err.(*Error).Result(), ConnectError)
}

func TestReader(t *testing.T) {
Expand Down