Skip to content

Commit

Permalink
Add support for setting replicateSubscriptionState for the subscrip…
Browse files Browse the repository at this point in the history
…tion (fsprojects#261)

* Add support for setting `replicateSubscriptionState` for the subscription

* Forward port 8080 for tests

(cherry picked from commit e0282f8)
  • Loading branch information
RobertIndie committed Apr 29, 2024
1 parent b8b4faa commit 076eecd
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/Configuration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type ConsumerConfiguration<'T> =
MaxPendingChunkedMessage: int
AutoAckOldestChunkedMessageOnQueueFull: bool
ExpireTimeOfIncompleteChunkedMessage: TimeSpan
ReplicateSubscriptionState: bool
}
member this.SingleTopic with get() = this.Topics |> Seq.head
static member Default =
Expand Down Expand Up @@ -112,6 +113,7 @@ type ConsumerConfiguration<'T> =
MaxPendingChunkedMessage = 10
AutoAckOldestChunkedMessageOnQueueFull = false
ExpireTimeOfIncompleteChunkedMessage = TimeSpan.FromSeconds(60.0)
ReplicateSubscriptionState = false
}

type ProducerConfiguration =
Expand Down
5 changes: 5 additions & 0 deletions src/Pulsar.Client/Api/ConsumerBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
{ config with
ConsumerCryptoFailureAction = action }
|> this.With

member this.ReplicateSubscriptionState replicateSubscriptionState =
{ config with
ReplicateSubscriptionState = replicateSubscriptionState }
|> this.With

member this.SubscribeAsync(): Task<IConsumer<'T>> =
createConsumerAsync(verify config, schema, consumerInterceptors)
Expand Down
6 changes: 4 additions & 2 deletions src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ let newGetTopicsOfNamespaceRequest (ns : NamespaceName) (requestId : RequestId)
let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName) (consumerId: ConsumerId) (requestId: RequestId)
(consumerName: string) (subscriptionType: SubscriptionType) (subscriptionInitialPosition: SubscriptionInitialPosition)
(readCompacted: bool) (startMessageId: MessageIdData) (durable: bool) (startMessageRollbackDuration: TimeSpan)
(createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel) =
(createTopicIfDoesNotExist: bool) (keySharedPolicy: KeySharedPolicy option) (schemaInfo: SchemaInfo) (priorityLevel: PriorityLevel)
(replicateSubscriptionState: bool)=
let schema = getProtoSchema schemaInfo
let subType =
match subscriptionType with
Expand All @@ -267,7 +268,8 @@ let newSubscribe (topicName: CompleteTopicName) (subscription: SubscriptionName)
| _ -> failwith "Unknown initialPosition type"
let request = CommandSubscribe(Topic = %topicName, Subscription = %subscription, subType = subType, ConsumerId = %consumerId,
ConsumerName = consumerName, RequestId = %requestId, initialPosition = initialPosition, ReadCompacted = readCompacted,
StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel)
StartMessageId = startMessageId, Durable = durable, ForceTopicCreation = createTopicIfDoesNotExist, PriorityLevel = %priorityLevel,
ReplicateSubscriptionState = replicateSubscriptionState)
match keySharedPolicy with
| Some keySharedPolicy ->
let meta = KeySharedMeta()
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
consumerId requestId consumerName consumerConfig.SubscriptionType
consumerConfig.SubscriptionInitialPosition consumerConfig.ReadCompacted msgIdData isDurable
startMessageRollbackDuration createTopicIfDoesNotExist consumerConfig.KeySharedPolicy
schema.SchemaInfo consumerConfig.PriorityLevel
schema.SchemaInfo consumerConfig.PriorityLevel consumerConfig.ReplicateSubscriptionState
try
let! response = clientCnx.SendAndWaitForReply requestId payload
response |> PulsarResponseType.GetEmpty
Expand Down
34 changes: 29 additions & 5 deletions tests/IntegrationTests/Basic.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module Pulsar.Client.IntegrationTests.Basic

open System
open System.Text.Json
open System.Threading
open System.Diagnostics

open Expecto
open Expecto.Flip
open Expecto.Logging

open System.Text
open System.Threading.Tasks
Expand Down Expand Up @@ -279,11 +279,11 @@ let tests =
do! consumer1.DisposeAsync().AsTask()
Expect.throwsT2<AlreadyClosedException> (fun () -> consumer1.ReceiveAsync().Result |> ignore) |> ignore
do! producer1.DisposeAsync().AsTask()
Expect.throwsT2<AlreadyClosedException> (fun () -> producer1.SendAndForgetAsync([||]).Result |> ignore) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> producer1.SendAndForgetAsync([||]).Result) |> ignore
do! client.CloseAsync()
Expect.throwsT2<AlreadyClosedException> (fun () -> consumer2.UnsubscribeAsync().Result |> ignore) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> producer2.SendAndForgetAsync([||]).Result |> ignore) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> client.CloseAsync().Result |> ignore) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> consumer2.UnsubscribeAsync().Result) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> producer2.SendAndForgetAsync([||]).Result) |> ignore
Expect.throwsT2<AlreadyClosedException> (fun () -> client.CloseAsync().Result) |> ignore

Log.Debug("Finished 'Client, producer and consumer can't be accessed after close'")
}
Expand Down Expand Up @@ -350,6 +350,30 @@ let tests =
Log.Debug("Finished 'Scheduled message should be delivered at requested time'")
}

testTask "Create the replicated subscription should be successful" {
Log.Debug("Started 'Create the replicated subscription should be successful'")
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let consumerName = "replicated-consumer"
let client = getClient()
let! (_ : IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName(consumerName)
.SubscriptionName("replicate")
.SubscriptionType(SubscriptionType.Shared)
.ReplicateSubscriptionState(true)
.SubscribeAsync()

do! Task.Delay 1000

let url = $"{pulsarHttpAddress}/admin/v2/persistent/" + topicName + "/stats"
let! (response: string) = commonHttpClient.GetStringAsync(url)
let json = JsonDocument.Parse(response)
let isReplicated = json.RootElement.GetProperty("subscriptions").GetProperty("replicate").GetProperty("isReplicated").GetBoolean()
Expect.isTrue "" isReplicated
Log.Debug("Finished 'Create the replicated subscription should be successful'")
}

#if !NOTLS
// Before running this test set 'maxMessageSize' for broker and 'nettyMaxFrameSizeBytes' for bookkeeper
testTask "Send large message works fine" {
Expand Down
6 changes: 6 additions & 0 deletions tests/IntegrationTests/Common.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Pulsar.Client.IntegrationTests.Common

open System
open System.Net.Http
open Pulsar.Client.Api

open System.Text
Expand All @@ -21,6 +22,9 @@ let pulsarAddress = "pulsar://127.0.0.1:6650"
[<Literal>]
let pulsarSslAddress = "pulsar+ssl://127.0.0.1:6651"

[<Literal>]
let pulsarHttpAddress = "http://127.0.0.1:8080"

// ssl folder copied by from https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/ssl
// generate pfx file from pem, leave the password blank
// openssl pkcs12 -in admin.cert.pem -inkey admin.key-pk8.pem -export -out admin.pfx
Expand Down Expand Up @@ -50,6 +54,8 @@ let commonClient =
.ServiceUrl(pulsarAddress)
.BuildAsync().Result

let commonHttpClient = new HttpClient()

let getClient() = commonClient

let extractTimeStamp (date: DateTime) : TimeStamp =
Expand Down
7 changes: 4 additions & 3 deletions tests/UnitTests/Common/CommandTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,16 @@ module CommandsTests =
let totalSize, commandSize, command =
serializeDeserializeSimpleCommand
(newSubscribe topicName %"test-subscription" consumerId requestId consumerName
SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel)
SubscriptionType.Exclusive SubscriptionInitialPosition.Earliest false null true TimeSpan.Zero true None (Schema.BYTES().SchemaInfo) priorityLevel false)

totalSize |> Expect.equal "" 70
commandSize |> Expect.equal "" 66
totalSize |> Expect.equal "" 72
commandSize |> Expect.equal "" 68
command.``type`` |> Expect.equal "" CommandType.Subscribe
command.Subscribe.Topic |> Expect.equal "" %topicName
command.Subscribe.RequestId |> Expect.equal "" %requestId
command.Subscribe.ConsumerId |> Expect.equal "" %consumerId
command.Subscribe.ConsumerName |> Expect.equal "" %consumerName
command.Subscribe.ReplicateSubscriptionState |> Expect.equal "" %false
}

test "newFlow should return correct frame" {
Expand Down
1 change: 1 addition & 0 deletions tests/compose/standalone/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
ports:
- "6650:6650"
- "2181:2181"
- "8080:8080"
command: >
bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
bin/pulsar standalone"
Expand Down

0 comments on commit 076eecd

Please sign in to comment.