Skip to content

Commit

Permalink
Removed notion of brokerless connection, so caching is done only by l…
Browse files Browse the repository at this point in the history
…ogical address (like in Java client)
  • Loading branch information
Lanayx committed Sep 19, 2023
1 parent db86d35 commit 46f4198
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 23 deletions.
8 changes: 4 additions & 4 deletions src/Pulsar.Client/Internal/BinaryLookupService.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type internal BinaryLookupService (config: PulsarClientConfiguration, connection
try
let! clientCnx =
resolveEndPoint()
|> connectionPool.GetBrokerlessConnection
|> connectionPool.GetBasicConnection
|> Async.AwaitTask
let requestId = Generators.getNextRequestId()
let payload = Commands.newPartitionMetadataRequest topicName requestId
Expand Down Expand Up @@ -64,7 +64,7 @@ type internal BinaryLookupService (config: PulsarClientConfiguration, connection
backgroundTask {
if config.MaxLookupRedirects > 0 && redirectCount > config.MaxLookupRedirects then
raise (LookupException <| "Too many redirects: " + string redirectCount)
let! clientCnx = connectionPool.GetBrokerlessConnection endpoint
let! clientCnx = connectionPool.GetBasicConnection endpoint
let requestId = Generators.getNextRequestId()
let payload = Commands.newLookup topicName requestId authoritative config.ListenerName
let! response = clientCnx.SendAndWaitForReply requestId payload
Expand Down Expand Up @@ -94,7 +94,7 @@ type internal BinaryLookupService (config: PulsarClientConfiguration, connection
try
let! clientCnx =
resolveEndPoint()
|> connectionPool.GetBrokerlessConnection
|> connectionPool.GetBasicConnection
|> Async.AwaitTask
let requestId = Generators.getNextRequestId()
let payload = Commands.newGetTopicsOfNamespaceRequest ns requestId isPersistent
Expand Down Expand Up @@ -126,7 +126,7 @@ type internal BinaryLookupService (config: PulsarClientConfiguration, connection
try
let! clientCnx =
resolveEndPoint()
|> connectionPool.GetBrokerlessConnection
|> connectionPool.GetBasicConnection
|> Async.AwaitTask
let requestId = Generators.getNextRequestId()
let payload = Commands.newGetSchema topicName requestId schemaVersion
Expand Down
7 changes: 3 additions & 4 deletions src/Pulsar.Client/Internal/ClientCnx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ and internal ClientCnx (config: PulsarClientConfiguration,
broker: Broker,
connection: Connection,
maxMessageSize: int,
brokerless: bool,
initialConnectionTsc: TaskCompletionSource<ClientCnx>,
unregisterClientCnx: Broker -> unit) as this =

Expand Down Expand Up @@ -551,9 +550,9 @@ and internal ClientCnx (config: PulsarClientConfiguration,
this.WaitingForPingResponse <- false
match xcmd with
| XCommandConnected cmd ->
Log.Logger.LogInformation("{0} Connected ProtocolVersion: {1} ServerVersion: {2} MaxMessageSize: {3} Brokerless: {4}",
prefix, cmd.ProtocolVersion, cmd.ServerVersion, cmd.MaxMessageSize, brokerless)
if cmd.ShouldSerializeMaxMessageSize() && (not brokerless) && maxMessageSize <> cmd.MaxMessageSize then
Log.Logger.LogInformation("{0} Connected ProtocolVersion: {1} ServerVersion: {2} MaxMessageSize: {3}",
prefix, cmd.ProtocolVersion, cmd.ServerVersion, cmd.MaxMessageSize)
if cmd.ShouldSerializeMaxMessageSize() && maxMessageSize <> cmd.MaxMessageSize then
initialConnectionTsc.SetException(MaxMessageSizeChanged cmd.MaxMessageSize)
else
initialConnectionTsc.SetResult(this)
Expand Down
2 changes: 1 addition & 1 deletion src/Pulsar.Client/Internal/ConnectionHandler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type internal ConnectionHandler( parentPrefix: string,
try
Log.Logger.LogDebug("{0} Starting reconnect to {1}", prefix, topic)
let! broker = lookup.GetBroker(topic)
let! clientCnx = connectionPool.GetConnection(broker, maxMessageSize, false)
let! clientCnx = connectionPool.GetConnection(broker, maxMessageSize)
this.ConnectionState <- Ready clientCnx
Log.Logger.LogDebug("{0} Successfuly reconnected to {1}, {2}", prefix, topic, clientCnx)
connectionOpened epoch
Expand Down
28 changes: 14 additions & 14 deletions src/Pulsar.Client/Internal/ConnectionPool.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ open System.Security.Cryptography.X509Certificates
type internal ConnectionPool (config: PulsarClientConfiguration) =


let connections = ConcurrentDictionary<bool*LogicalAddress, Lazy<Task<ClientCnx>>>()
let connections = ConcurrentDictionary<LogicalAddress, Lazy<Task<ClientCnx>>>()

// from https://github.com/mgravell/Pipelines.Sockets.Unofficial/blob/master/src/Pipelines.Sockets.Unofficial/SocketConnection.Connect.cs
let getSocket (endpoint: DnsEndPoint) =
Expand Down Expand Up @@ -123,15 +123,15 @@ type internal ConnectionPool (config: PulsarClientConfiguration) =
Log.Logger.LogError("Unknown ssl error: {0}", error)
false

let unregisterClientCnx brokerless (broker: Broker) =
let key = (brokerless, broker.LogicalAddress)
let unregisterClientCnx (broker: Broker) =
let key = broker.LogicalAddress
match connections.TryRemove(key) with
| true, _ -> Log.Logger.LogInformation("Connection backgroundTask {0} removed", key)
| false, _ -> Log.Logger.LogDebug("Connection backgroundTask {0} was not removed", key)

let connect (broker: Broker, maxMessageSize: int, brokerless: bool) =
Log.Logger.LogInformation("Connecting to {0} with maxMessageSize: {1}, brokerless: {2}",
broker, maxMessageSize, brokerless)
let connect (broker: Broker, maxMessageSize: int) =
Log.Logger.LogInformation("Connecting to {0} with maxMessageSize: {1}",
broker, maxMessageSize)
backgroundTask {
let (PhysicalAddress physicalAddress) = broker.PhysicalAddress
let pipeOptions = PipeOptions(pauseWriterThreshold = int64 maxMessageSize )
Expand Down Expand Up @@ -167,8 +167,8 @@ type internal ConnectionPool (config: PulsarClientConfiguration) =
Log.Logger.LogDebug("Connection established for {0}", physicalAddress)
let initialConnectionTsc = TaskCompletionSource<ClientCnx>(TaskCreationOptions.RunContinuationsAsynchronously)

let clientCnx = ClientCnx(config, broker, connection, maxMessageSize, brokerless, initialConnectionTsc,
unregisterClientCnx brokerless)
let clientCnx = ClientCnx(config, broker, connection, maxMessageSize, initialConnectionTsc,
unregisterClientCnx)
let connectPayload = clientCnx.NewConnectCommand()
let! success = clientCnx.Send connectPayload
if not success then
Expand All @@ -184,19 +184,19 @@ type internal ConnectionPool (config: PulsarClientConfiguration) =
return reraize ex
}

member this.GetConnection (broker: Broker, maxMessageSize: int, brokerless: bool) =
let t = connections.GetOrAdd((brokerless, broker.LogicalAddress), fun _ ->
lazy connect(broker, maxMessageSize, brokerless)).Value
member this.GetConnection (broker: Broker, maxMessageSize: int) =
let t = connections.GetOrAdd(broker.LogicalAddress, fun _ ->
lazy connect(broker, maxMessageSize)).Value
if t.IsFaulted then
let key = (brokerless, broker.LogicalAddress)
let key = broker.LogicalAddress
match connections.TryRemove(key) with
| true, _ -> Log.Logger.LogInformation("Removed faulted connection task to {0}", key)
| false, _ -> Log.Logger.LogDebug("Faulted connection task to {0} wasn't removed", key)
t

member this.GetBrokerlessConnection (address: DnsEndPoint) =
member this.GetBasicConnection (address: DnsEndPoint) =
this.GetConnection({ LogicalAddress = LogicalAddress address; PhysicalAddress = PhysicalAddress address },
Commands.DEFAULT_MAX_MESSAGE_SIZE, true)
Commands.DEFAULT_MAX_MESSAGE_SIZE)

member this.CloseAsync() =
backgroundTask {
Expand Down

0 comments on commit 46f4198

Please sign in to comment.