Skip to content

Commit

Permalink
expose the connection state to consumer and producer interface #239 (#…
Browse files Browse the repository at this point in the history
…240)

* expose the connection state to consumer and producer interface #239

* fix issues from code review and add reader interface #239
  • Loading branch information
baumerik authored Nov 18, 2023
1 parent b1e64f0 commit 6834cff
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/Pulsar.Client/Api/IConsumer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ type IConsumer<'T> =
/// ReconsumeLater the reception of all the messages in the stream up to (and including) the provided message.
abstract member ReconsumeLaterCumulativeAsync: message:Message<'T> * deliverAt:TimeStamp -> Task<unit>
/// The last disconnected timestamp of the consumer abstract member LastDisconnected: DateTime
abstract member LastDisconnectedTimestamp: TimeStamp
abstract member LastDisconnectedTimestamp: TimeStamp
/// Return true if the consumer is connected to the broker
abstract member IsConnected: bool
2 changes: 2 additions & 0 deletions src/Pulsar.Client/Api/IProducer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,5 @@ type IProducer<'T> =
abstract member Name: string
/// The last disconnected timestamp of the producer
abstract member LastDisconnectedTimestamp: TimeStamp
/// Return true if the consumer is connected to the broker
abstract member IsConnected: bool
4 changes: 3 additions & 1 deletion src/Pulsar.Client/Api/IReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ type IReader<'T> =
/// Check if there is any message available to read from the current position.
abstract member HasMessageAvailableAsync: unit -> Task<bool>
/// Get a topic for the reader
abstract member Topic: string
abstract member Topic: string
/// Return true if the reader is connected to the broker
abstract member IsConnected: bool
5 changes: 5 additions & 0 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien

member this.LastDisconnectedTimestamp =
connectionHandler.LastDisconnectedTimestamp

member this.IsConnected =
match connectionHandler.ConnectionState with
| Ready _ -> true
| _ -> false


interface IAsyncDisposable with
Expand Down
10 changes: 10 additions & 0 deletions src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type internal MultiTopicConsumerMessage<'T> =
| CancelWaiter of Waiter<'T>
| CancelBatchWaiter of BatchWaiter<'T>
| LastDisconnectedTimestamp of TaskCompletionSource<TimeStamp>
| IsConnected of TaskCompletionSource<bool>
| HasMessageAvailable of TaskCompletionSource<bool>

type internal TopicAndConsumer<'T> =
Expand Down Expand Up @@ -806,6 +807,13 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
|> Seq.map (fun (KeyValue(_, (consumer, _))) -> consumer.LastDisconnectedTimestamp)
|> Seq.max
|> channel.SetResult

| IsConnected channel ->

Log.Logger.LogDebug("{0} IsConnected", prefix)
consumers
|> Seq.forall (fun (KeyValue(_, (consumer, _))) -> consumer.IsConnected)
|> channel.SetResult

| Seek (seekData, channel) ->

Expand Down Expand Up @@ -1155,6 +1163,8 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration

member this.LastDisconnectedTimestamp =
(postAndAsyncReply mb LastDisconnectedTimestamp).Result
member this.IsConnected =
(postAndAsyncReply mb IsConnected).Result

interface IAsyncDisposable with

Expand Down
3 changes: 3 additions & 0 deletions src/Pulsar.Client/Internal/MultiTopicsReaderImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type internal MultiTopicsReaderImpl<'T> private (readerConfig: ReaderConfigurati
member this.Topic with get() =
castedConsumer.Topic

member this.IsConnected with get() =
castedConsumer.IsConnected

interface IAsyncDisposable with

member this.DisposeAsync() =
Expand Down
10 changes: 10 additions & 0 deletions src/Pulsar.Client/Internal/PartitionedProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type internal PartitionedProducerMessage =
| TickTime
| GetStats of TaskCompletionSource<ProducerStats>
| LastDisconnectedTimestamp of TaskCompletionSource<TimeStamp>
| IsConnected of TaskCompletionSource<bool>

type internal PartitionedConnectionState =
| Uninitialized
Expand Down Expand Up @@ -197,6 +198,13 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi
|> Seq.max
|> channel.SetResult

| IsConnected channel ->

Log.Logger.LogDebug("{0} IsConnected", prefix)
producers
|> Seq.forall (fun producer -> producer.IsConnected)
|> channel.SetResult

| Close channel ->

match this.ConnectionState with
Expand Down Expand Up @@ -374,6 +382,8 @@ type internal PartitionedProducerImpl<'T> private (producerConfig: ProducerConfi
member this.GetStatsAsync() = postAndAsyncReply mb GetStats

member this.LastDisconnectedTimestamp = (postAndAsyncReply mb LastDisconnectedTimestamp).Result

member this.IsConnected = (postAndAsyncReply mb IsConnected).Result


interface IAsyncDisposable with
Expand Down
4 changes: 4 additions & 0 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,10 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c

member this.LastDisconnectedTimestamp =
connectionHandler.LastDisconnectedTimestamp
member this.IsConnected =
match connectionHandler.ConnectionState with
| Ready _ -> true
| _ -> false

interface IAsyncDisposable with

Expand Down
3 changes: 3 additions & 0 deletions src/Pulsar.Client/Internal/ReaderImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type internal ReaderImpl<'T> private (readerConfig: ReaderConfiguration, clientC
member this.Topic with get() =
castedConsumer.Topic

member this.IsConnected with get() =
castedConsumer.IsConnected

interface IAsyncDisposable with

member this.DisposeAsync() =
Expand Down

0 comments on commit 6834cff

Please sign in to comment.