diff --git a/pulsar/reader.go b/pulsar/reader.go index c45b8ff145..f1cb575eac 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -79,6 +79,9 @@ type ReaderOptions struct { // Decryption represents the encryption related fields required by the reader to decrypt a message. Decryption *MessageDecryptionInfo + + // Schema represents the schema implementation. + Schema Schema } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 0fed80c4dc..596884aa48 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { nackRedeliveryDelay: defaultNackRedeliveryDelay, replicateSubscriptionState: false, decryption: options.Decryption, + schema: options.Schema, } reader := &reader{ diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index bdafea086f..7a71fa624d 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -710,3 +710,45 @@ func TestProducerReaderRSAEncryption(t *testing.T) { assert.Equal(t, []byte(expectMsg), msg.Payload()) } } +func TestReaderWithSchema(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + schema := NewStringSchema(nil) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + value := "hello pulsar" + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: value, + }) + assert.Nil(t, err) + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + Schema: schema, + }) + assert.Nil(t, err) + defer reader.Close() + + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + var res *string + err = msg.GetSchemaValue(&res) + assert.Nil(t, err) + assert.Equal(t, *res, value) +}