From 076eecd5db1ab1cd30728ef35806a38930b6927a Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sat, 20 Apr 2024 00:24:55 +0800 Subject: [PATCH] Add support for setting `replicateSubscriptionState` for the subscription (#261) * Add support for setting `replicateSubscriptionState` for the subscription * Forward port 8080 for tests (cherry picked from commit e0282f86536956ed95d0758ac9ef7aacdaa1d048) --- src/Pulsar.Client/Api/Configuration.fs | 2 ++ src/Pulsar.Client/Api/ConsumerBuilder.fs | 5 +++ src/Pulsar.Client/Common/Commands.fs | 6 ++-- src/Pulsar.Client/Internal/ConsumerImpl.fs | 2 +- tests/IntegrationTests/Basic.fs | 34 ++++++++++++++++++--- tests/IntegrationTests/Common.fs | 6 ++++ tests/UnitTests/Common/CommandTests.fs | 7 +++-- tests/compose/standalone/docker-compose.yml | 1 + 8 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/Pulsar.Client/Api/Configuration.fs b/src/Pulsar.Client/Api/Configuration.fs index b8c1782a..3c7cf295 100644 --- a/src/Pulsar.Client/Api/Configuration.fs +++ b/src/Pulsar.Client/Api/Configuration.fs @@ -78,6 +78,7 @@ type ConsumerConfiguration<'T> = MaxPendingChunkedMessage: int AutoAckOldestChunkedMessageOnQueueFull: bool ExpireTimeOfIncompleteChunkedMessage: TimeSpan + ReplicateSubscriptionState: bool } member this.SingleTopic with get() = this.Topics |> Seq.head static member Default = @@ -112,6 +113,7 @@ type ConsumerConfiguration<'T> = MaxPendingChunkedMessage = 10 AutoAckOldestChunkedMessageOnQueueFull = false ExpireTimeOfIncompleteChunkedMessage = TimeSpan.FromSeconds(60.0) + ReplicateSubscriptionState = false } type ProducerConfiguration = diff --git a/src/Pulsar.Client/Api/ConsumerBuilder.fs b/src/Pulsar.Client/Api/ConsumerBuilder.fs index 6f2b40b2..1034e0aa 100644 --- a/src/Pulsar.Client/Api/ConsumerBuilder.fs +++ b/src/Pulsar.Client/Api/ConsumerBuilder.fs @@ -261,6 +261,11 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf { config with ConsumerCryptoFailureAction = action } |> this.With + + member this.ReplicateSubscriptionState replicateSubscriptionState = + { config with + ReplicateSubscriptionState = replicateSubscriptionState } + |> this.With member this.SubscribeAsync(): Task> = createConsumerAsync(verify config, schema, consumerInterceptors) diff --git a/src/Pulsar.Client/Common/Commands.fs b/src/Pulsar.Client/Common/Commands.fs index 639d8e83..81c3a380 100644 --- a/src/Pulsar.Client/Common/Commands.fs +++ b/src/Pulsar.Client/Common/Commands.fs @@ -251,7 +251,8 @@ let newGetTopicsOfNamespaceRequest (ns : NamespaceName) (requestId : RequestId) let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) (consumerId: ConsumerId) (requestId: RequestId) (consumerName: string) (subscriptionType: SubscriptionType) (subscriptionInitialPosition: SubscriptionInitialPosition) (readCompacted: bool) (startMessageId: MessageIdData) (durable: bool) (startMessageRollbackDuration: TimeSpan) - (createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) = + (createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) + (replicateSubscriptionState: bool)= let schema = getProtoSchema schemaInfo let subType = match subscriptionType with @@ -267,7 +268,8 @@ let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) | _ -> failwith "Unknown initialPosition type" let request = CommandSubscribe(Topic = %topicName, Subscription = %subscription, subType = subType, ConsumerId = %consumerId, ConsumerName = consumerName, RequestId = %requestId, initialPosition = initialPosition, ReadCompacted = readCompacted, - StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel) + StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel, + ReplicateSubscriptionState = replicateSubscriptionState) match keySharedPolicy with | Some keySharedPolicy -> let meta = KeySharedMeta() diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 0da634ab..78d3b00b 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -856,7 +856,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien consumerId requestId consumerName consumerConfig.SubscriptionType consumerConfig.SubscriptionInitialPosition consumerConfig.ReadCompacted msgIdData isDurable startMessageRollbackDuration createTopicIfDoesNotExist consumerConfig.KeySharedPolicy - schema.SchemaInfo consumerConfig.PriorityLevel + schema.SchemaInfo consumerConfig.PriorityLevel consumerConfig.ReplicateSubscriptionState try let! response = clientCnx.SendAndWaitForReply requestId payload response |> PulsarResponseType.GetEmpty diff --git a/tests/IntegrationTests/Basic.fs b/tests/IntegrationTests/Basic.fs index e42b9ba9..25bb7046 100644 --- a/tests/IntegrationTests/Basic.fs +++ b/tests/IntegrationTests/Basic.fs @@ -1,12 +1,12 @@ module Pulsar.Client.IntegrationTests.Basic open System +open System.Text.Json open System.Threading open System.Diagnostics open Expecto open Expecto.Flip -open Expecto.Logging open System.Text open System.Threading.Tasks @@ -279,11 +279,11 @@ let tests = do! consumer1.DisposeAsync().AsTask() Expect.throwsT2 (fun () -> consumer1.ReceiveAsync().Result |> ignore) |> ignore do! producer1.DisposeAsync().AsTask() - Expect.throwsT2 (fun () -> producer1.SendAndForgetAsync([||]).Result |> ignore) |> ignore + Expect.throwsT2 (fun () -> producer1.SendAndForgetAsync([||]).Result) |> ignore do! client.CloseAsync() - Expect.throwsT2 (fun () -> consumer2.UnsubscribeAsync().Result |> ignore) |> ignore - Expect.throwsT2 (fun () -> producer2.SendAndForgetAsync([||]).Result |> ignore) |> ignore - Expect.throwsT2 (fun () -> client.CloseAsync().Result |> ignore) |> ignore + Expect.throwsT2 (fun () -> consumer2.UnsubscribeAsync().Result) |> ignore + Expect.throwsT2 (fun () -> producer2.SendAndForgetAsync([||]).Result) |> ignore + Expect.throwsT2 (fun () -> client.CloseAsync().Result) |> ignore Log.Debug("Finished 'Client, producer and consumer can't be accessed after close'") } @@ -350,6 +350,30 @@ let tests = Log.Debug("Finished 'Scheduled message should be delivered at requested time'") } + testTask "Create the replicated subscription should be successful" { + Log.Debug("Started 'Create the replicated subscription should be successful'") + let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N") + let consumerName = "replicated-consumer" + let client = getClient() + let! (_ : IConsumer) = + client.NewConsumer() + .Topic(topicName) + .ConsumerName(consumerName) + .SubscriptionName("replicate") + .SubscriptionType(SubscriptionType.Shared) + .ReplicateSubscriptionState(true) + .SubscribeAsync() + + do! Task.Delay 1000 + + let url = $"{pulsarHttpAddress}/admin/v2/persistent/" + topicName + "/stats" + let! (response: string) = commonHttpClient.GetStringAsync(url) + let json = JsonDocument.Parse(response) + let isReplicated = json.RootElement.GetProperty("subscriptions").GetProperty("replicate").GetProperty("isReplicated").GetBoolean() + Expect.isTrue "" isReplicated + Log.Debug("Finished 'Create the replicated subscription should be successful'") + } + #if !NOTLS // Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper testTask "Send large message works fine" { diff --git a/tests/IntegrationTests/Common.fs b/tests/IntegrationTests/Common.fs index 84c68e1b..8eeeb4f0 100644 --- a/tests/IntegrationTests/Common.fs +++ b/tests/IntegrationTests/Common.fs @@ -1,6 +1,7 @@ module Pulsar.Client.IntegrationTests.Common open System +open System.Net.Http open Pulsar.Client.Api open System.Text @@ -21,6 +22,9 @@ let pulsarAddress = "pulsar://127.0.0.1:6650" [] let pulsarSslAddress = "pulsar+ssl://127.0.0.1:6651" +[] +let pulsarHttpAddress = "http://127.0.0.1:8080" + // ssl folder copied by from https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/ssl // generate pfx file from pem, leave the password blank // openssl pkcs12 -in admin.cert.pem -inkey admin.key-pk8.pem -export -out admin.pfx @@ -50,6 +54,8 @@ let commonClient = .ServiceUrl(pulsarAddress) .BuildAsync().Result +let commonHttpClient = new HttpClient() + let getClient() = commonClient let extractTimeStamp (date: DateTime) : TimeStamp = diff --git a/tests/UnitTests/Common/CommandTests.fs b/tests/UnitTests/Common/CommandTests.fs index 58ecb7da..d261eb08 100644 --- a/tests/UnitTests/Common/CommandTests.fs +++ b/tests/UnitTests/Common/CommandTests.fs @@ -148,15 +148,16 @@ module CommandsTests = let totalSize, commandSize, command = serializeDeserializeSimpleCommand (newSubscribe topicName %"test-subscription" consumerId requestId consumerName - SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel) + SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel false) - totalSize |> Expect.equal "" 70 - commandSize |> Expect.equal "" 66 + totalSize |> Expect.equal "" 72 + commandSize |> Expect.equal "" 68 command.``type`` |> Expect.equal "" CommandType.Subscribe command.Subscribe.Topic |> Expect.equal "" %topicName command.Subscribe.RequestId |> Expect.equal "" %requestId command.Subscribe.ConsumerId |> Expect.equal "" %consumerId command.Subscribe.ConsumerName |> Expect.equal "" %consumerName + command.Subscribe.ReplicateSubscriptionState |> Expect.equal "" %false } test "newFlow should return correct frame" { diff --git a/tests/compose/standalone/docker-compose.yml b/tests/compose/standalone/docker-compose.yml index 87ad34f3..5929dc28 100644 --- a/tests/compose/standalone/docker-compose.yml +++ b/tests/compose/standalone/docker-compose.yml @@ -10,6 +10,7 @@ services: ports: - "6650:6650" - "2181:2181" + - "8080:8080" command: > bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"