From b75f9f17abad197b7b6ac7e78f4632dab42bb67a Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Sun, 28 May 2023 23:28:40 -0700 Subject: [PATCH] chore: Convert from DataFlow to Channels Aligns better with other parts of the project --- ....Client.Connectors.WebsocketConnector.csproj | 2 +- .../ButtplugWebsocketConnector.cs | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/Buttplug.Client.Connectors.WebsocketConnector/Buttplug.Client.Connectors.WebsocketConnector.csproj b/Buttplug.Client.Connectors.WebsocketConnector/Buttplug.Client.Connectors.WebsocketConnector.csproj index 2fd2eda5..97ae6a0b 100644 --- a/Buttplug.Client.Connectors.WebsocketConnector/Buttplug.Client.Connectors.WebsocketConnector.csproj +++ b/Buttplug.Client.Connectors.WebsocketConnector/Buttplug.Client.Connectors.WebsocketConnector.csproj @@ -25,7 +25,7 @@ - + diff --git a/Buttplug.Client.Connectors.WebsocketConnector/ButtplugWebsocketConnector.cs b/Buttplug.Client.Connectors.WebsocketConnector/ButtplugWebsocketConnector.cs index 0cfa45a9..9d8eb3ad 100644 --- a/Buttplug.Client.Connectors.WebsocketConnector/ButtplugWebsocketConnector.cs +++ b/Buttplug.Client.Connectors.WebsocketConnector/ButtplugWebsocketConnector.cs @@ -1,11 +1,9 @@ using Buttplug.Core; using System; -using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; +using System.Threading.Channels; using Buttplug.Core.Messages; using vtortola.WebSockets; using vtortola.WebSockets.Rfc6455; @@ -20,7 +18,6 @@ public class ButtplugWebsocketConnector : ButtplugRemoteJSONConnector, IButtplug private WebSocketClient _wsClient; private WebSocket _ws; - public bool Connected => _ws?.IsConnected == true; public event EventHandler Disconnected; @@ -32,7 +29,7 @@ public class ButtplugWebsocketConnector : ButtplugRemoteJSONConnector, IButtplug private readonly Uri _uri; - private readonly BufferBlock _outgoingMessages = new BufferBlock(); + private Channel _channel = Channel.CreateBounded(256); private Task _readTask; @@ -116,7 +113,7 @@ public async Task DisconnectAsync(CancellationToken token = default) public async Task SendAsync(ButtplugMessage msg, CancellationToken token) { var (msgString, msgPromise) = PrepareMessage(msg); - await _outgoingMessages.SendAsync(msgString, token).ConfigureAwait(false); + await _channel.Writer.WriteAsync(msgString); return await msgPromise.ConfigureAwait(false); } @@ -125,7 +122,7 @@ private async Task RunClientLoop(CancellationToken token) try { var readTask = _ws.ReadStringAsync(token); - var writeTask = _outgoingMessages.OutputAvailableAsync(token); + var writeTask = _channel.Reader.ReadAsync(token).AsTask(); while (_ws.IsConnected && !token.IsCancellationRequested) { var msgTasks = new Task[] @@ -150,15 +147,13 @@ private async Task RunClientLoop(CancellationToken token) { try { - IList msgs = new List(); - _outgoingMessages.TryReceiveAll(out msgs); - var outMsgs = msgs.Aggregate(string.Empty, (current, msg) => current + msg); + var outMsgs = await ((Task)msgTasks[1]).ConfigureAwait(false); if (_ws?.IsConnected == true) { await _ws.WriteStringAsync(outMsgs, token).ConfigureAwait(false); } - writeTask = _outgoingMessages.OutputAvailableAsync(token); + writeTask = _channel.Reader.ReadAsync(token).AsTask(); } catch (WebSocketException e) {