Skip to content

Commit

Permalink
chore: Convert from DataFlow to Channels
Browse files Browse the repository at this point in the history
Aligns better with other parts of the project
  • Loading branch information
qdot committed Jun 25, 2023
1 parent e3ea88a commit b75f9f1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

<ItemGroup>
<PackageReference Include="deniszykov.WebSocketListener" Version="4.2.15" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="7.0.0" />
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using Buttplug.Core;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading.Channels;
using Buttplug.Core.Messages;
using vtortola.WebSockets;
using vtortola.WebSockets.Rfc6455;
Expand All @@ -20,7 +18,6 @@ public class ButtplugWebsocketConnector : ButtplugRemoteJSONConnector, IButtplug
private WebSocketClient _wsClient;

private WebSocket _ws;

public bool Connected => _ws?.IsConnected == true;

public event EventHandler Disconnected;
Expand All @@ -32,7 +29,7 @@ public class ButtplugWebsocketConnector : ButtplugRemoteJSONConnector, IButtplug

private readonly Uri _uri;

private readonly BufferBlock<string> _outgoingMessages = new BufferBlock<string>();
private Channel<string> _channel = Channel.CreateBounded<string>(256);

private Task _readTask;

Expand Down Expand Up @@ -116,7 +113,7 @@ public async Task DisconnectAsync(CancellationToken token = default)
public async Task<ButtplugMessage> SendAsync(ButtplugMessage msg, CancellationToken token)
{
var (msgString, msgPromise) = PrepareMessage(msg);
await _outgoingMessages.SendAsync(msgString, token).ConfigureAwait(false);
await _channel.Writer.WriteAsync(msgString);
return await msgPromise.ConfigureAwait(false);
}

Expand All @@ -125,7 +122,7 @@ private async Task RunClientLoop(CancellationToken token)
try
{
var readTask = _ws.ReadStringAsync(token);
var writeTask = _outgoingMessages.OutputAvailableAsync(token);
var writeTask = _channel.Reader.ReadAsync(token).AsTask();
while (_ws.IsConnected && !token.IsCancellationRequested)
{
var msgTasks = new Task[]
Expand All @@ -150,15 +147,13 @@ private async Task RunClientLoop(CancellationToken token)
{
try
{
IList<string> msgs = new List<string>();
_outgoingMessages.TryReceiveAll(out msgs);
var outMsgs = msgs.Aggregate(string.Empty, (current, msg) => current + msg);
var outMsgs = await ((Task<string>)msgTasks[1]).ConfigureAwait(false);
if (_ws?.IsConnected == true)
{
await _ws.WriteStringAsync(outMsgs, token).ConfigureAwait(false);
}

writeTask = _outgoingMessages.OutputAvailableAsync(token);
writeTask = _channel.Reader.ReadAsync(token).AsTask();
}
catch (WebSocketException e)
{
Expand Down

0 comments on commit b75f9f1

Please sign in to comment.