diff --git a/src/Pulsar.Client/Api/IConsumer.fs b/src/Pulsar.Client/Api/IConsumer.fs index 84c95474..f6c61db7 100644 --- a/src/Pulsar.Client/Api/IConsumer.fs +++ b/src/Pulsar.Client/Api/IConsumer.fs @@ -76,4 +76,6 @@ type IConsumer<'T> = /// ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message. abstract member ReconsumeLaterCumulativeAsync: message:Message<'T> * deliverAt:TimeStamp -> Task /// The last disconnected timestamp of the consumer abstract member LastDisconnected: DateTime - abstract member LastDisconnectedTimestamp: TimeStamp \ No newline at end of file + abstract member LastDisconnectedTimestamp: TimeStamp + /// Return true if the consumer is connected to the broker + abstract member IsConnected: bool \ No newline at end of file diff --git a/src/Pulsar.Client/Api/IProducer.fs b/src/Pulsar.Client/Api/IProducer.fs index 667aabcb..395e1236 100644 --- a/src/Pulsar.Client/Api/IProducer.fs +++ b/src/Pulsar.Client/Api/IProducer.fs @@ -73,3 +73,5 @@ type IProducer<'T> = abstract member Name: string /// The last disconnected timestamp of the producer abstract member LastDisconnectedTimestamp: TimeStamp + /// Return true if the consumer is connected to the broker + abstract member IsConnected: bool diff --git a/src/Pulsar.Client/Api/IReader.fs b/src/Pulsar.Client/Api/IReader.fs index a1af528e..c5314226 100644 --- a/src/Pulsar.Client/Api/IReader.fs +++ b/src/Pulsar.Client/Api/IReader.fs @@ -23,4 +23,6 @@ type IReader<'T> = /// Check if there is any message available to read from the current position. abstract member HasMessageAvailableAsync: unit -> Task /// Get a topic for the reader - abstract member Topic: string \ No newline at end of file + abstract member Topic: string + /// Return true if the reader is connected to the broker + abstract member IsConnected: bool \ No newline at end of file diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 6b2b3e46..e8b835d2 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -1647,6 +1647,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien member this.LastDisconnectedTimestamp = connectionHandler.LastDisconnectedTimestamp + + member this.IsConnected = + match connectionHandler.ConnectionState with + | Ready _ -> true + | _ -> false interface IAsyncDisposable with diff --git a/src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs b/src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs index 6dca11be..e43672fa 100644 --- a/src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs @@ -62,6 +62,7 @@ type internal MultiTopicConsumerMessage<'T> = | CancelWaiter of Waiter<'T> | CancelBatchWaiter of BatchWaiter<'T> | LastDisconnectedTimestamp of TaskCompletionSource + | IsConnected of TaskCompletionSource | HasMessageAvailable of TaskCompletionSource type internal TopicAndConsumer<'T> = @@ -806,6 +807,13 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration |> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.LastDisconnectedTimestamp) |> Seq.max |> channel.SetResult + + | IsConnected channel -> + + Log.Logger.LogDebug("{0} IsConnected", prefix) + consumers + |> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.IsConnected) + |> channel.SetResult | Seek (seekData, channel) -> @@ -1155,6 +1163,8 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration member this.LastDisconnectedTimestamp = (postAndAsyncReply mb LastDisconnectedTimestamp).Result + member this.IsConnected = + (postAndAsyncReply mb IsConnected).Result interface IAsyncDisposable with diff --git a/src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs b/src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs index ab0e24b9..01d6cebe 100644 --- a/src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs +++ b/src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs @@ -87,6 +87,9 @@ type internal MultiTopicsReaderImpl<'T> private (readerConfig: ReaderConfigurati member this.Topic with get() = castedConsumer.Topic + member this.IsConnected with get() = + castedConsumer.IsConnected + interface IAsyncDisposable with member this.DisposeAsync() = diff --git a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs index f9043b38..985330f5 100644 --- a/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs +++ b/src/Pulsar.Client/Internal/PartitionedProducerImpl.fs @@ -22,6 +22,7 @@ type internal PartitionedProducerMessage = | TickTime | GetStats of TaskCompletionSource | LastDisconnectedTimestamp of TaskCompletionSource + | IsConnected of TaskCompletionSource type internal PartitionedConnectionState = | Uninitialized @@ -197,6 +198,13 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi |> Seq.max |> channel.SetResult + | IsConnected channel -> + + Log.Logger.LogDebug("{0} IsConnected", prefix) + producers + |> Seq.forall (fun producer -> producer.IsConnected) + |> channel.SetResult + | Close channel -> match this.ConnectionState with @@ -374,6 +382,8 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi member this.GetStatsAsync() = postAndAsyncReply mb GetStats member this.LastDisconnectedTimestamp = (postAndAsyncReply mb LastDisconnectedTimestamp).Result + + member this.IsConnected = (postAndAsyncReply mb IsConnected).Result interface IAsyncDisposable with diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index 45bfc69f..98cd4123 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -928,6 +928,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c member this.LastDisconnectedTimestamp = connectionHandler.LastDisconnectedTimestamp + member this.IsConnected = + match connectionHandler.ConnectionState with + | Ready _ -> true + | _ -> false interface IAsyncDisposable with diff --git a/src/Pulsar.Client/Internal/ReaderImpl.fs b/src/Pulsar.Client/Internal/ReaderImpl.fs index b284ac0b..c7d9e26c 100644 --- a/src/Pulsar.Client/Internal/ReaderImpl.fs +++ b/src/Pulsar.Client/Internal/ReaderImpl.fs @@ -93,6 +93,9 @@ type internal ReaderImpl<'T> private (readerConfig: ReaderConfiguration, clientC member this.Topic with get() = castedConsumer.Topic + member this.IsConnected with get() = + castedConsumer.IsConnected + interface IAsyncDisposable with member this.DisposeAsync() =