Skip to content
This repository has been archived by the owner on Nov 8, 2023. It is now read-only.

Commit

Permalink
More granular locking and fixed disconnect detection.
Browse files Browse the repository at this point in the history
  • Loading branch information
NTDLS committed Nov 30, 2022
1 parent 200e692 commit 0495d8c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 20 deletions.
13 changes: 8 additions & 5 deletions NTDLS.MemQueue/Library/ListQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ public class ListQueue<T> : List<T>
{
public void Enqueue(T item)
{
base.Add(item);
lock(this) base.Add(item);
}

public T Dequeue()
{
var t = base[0];
base.RemoveAt(0);
return t;
lock (this)
{
var t = base[0];
base.RemoveAt(0);
return t;
}
}

public T Peek()
{
return base[0];
lock (this) return base[0];
}
}
}
8 changes: 7 additions & 1 deletion NTDLS.MemQueue/NMQACKEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal class NMQACKEvent
/// <summary>
/// The command that we are waiting on an acknowledgement for.
/// </summary>
///
public NMQCommand Command { get; set; }
/// <summary>
/// The date/time that the event was created.
Expand All @@ -20,12 +21,17 @@ internal class NMQACKEvent

private string _key;

public static string MakeKey(Guid peerId, Guid messageId)
{
return $"{{{peerId}:{messageId}}}".Replace("-", "").ToLower();
}

public NMQACKEvent(Peer peer, NMQCommand command)
{
CreatedDate = DateTime.UtcNow;
Peer = peer;
Command = command;
_key = $"[{Peer.PeerId}:{Command.Message.MessageId}]".Replace("-", "");
_key = MakeKey(Peer.PeerId, Command.Message.MessageId);
}

/// <summary>
Expand Down
51 changes: 37 additions & 14 deletions NTDLS.MemQueue/NMQServer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using NTDLS.MemQueue.Library;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Net;
using System.Net.Sockets;
Expand Down Expand Up @@ -60,6 +61,22 @@ public class NMQServer : NMQBase
/// </summary>
public int TCPSendQueueDepth { get; private set; }

public int ListenBacklog
{
get
{
return _listenBacklog;
}
set
{
if (_listenSocket != null)
{
throw new Exception("Listen backlog cannot be changed once the server is started.");
}
_listenBacklog = value;
}
}

private BrodcastScheme _brodcastScheme = BrodcastScheme.NotSet;
public BrodcastScheme BrodcastScheme
{
Expand All @@ -84,7 +101,7 @@ public BrodcastScheme BrodcastScheme
private Dictionary<string, NMQACKEvent> _commandAckEvents = new Dictionary<string, NMQACKEvent>();
private Dictionary<string, NMQACKEvent> _processedEvents = new Dictionary<string, NMQACKEvent>();
private bool _continueRunning = false;
private readonly int _listenBacklog = NMQConstants.DEFAULT_TCPIP_LISTEN_SIZE;
private int _listenBacklog = NMQConstants.DEFAULT_TCPIP_LISTEN_SIZE;
private Socket _listenSocket;
private readonly List<Peer> _peers = new List<Peer>();
private AsyncCallback _onDataReceivedCallback;
Expand All @@ -99,11 +116,6 @@ public BrodcastScheme BrodcastScheme
public NMQServer()
{
}

public NMQServer(int listenBacklog)
{
this._listenBacklog = listenBacklog;
}

#endregion

Expand Down Expand Up @@ -436,6 +448,7 @@ private void OnDataReceived(IAsyncResult asyn)
{
peer.Packet.BufferLength = peer.Socket.EndReceive(asyn);

//Disonnect peer on the receipt of zero bytes.
if (peer.Packet.BufferLength == 0)
{
CleanupConnection(peer.Socket);
Expand All @@ -451,6 +464,10 @@ private void OnDataReceived(IAsyncResult asyn)

WaitForData(peer);
}
else
{
CleanupConnection(peer.Socket);
}
}
catch (ObjectDisposedException)
{
Expand All @@ -466,8 +483,8 @@ private void OnDataReceived(IAsyncResult asyn)
{
LogException(ex);
}
}

}

private void CleanupConnection(Socket socket)
{
try
Expand Down Expand Up @@ -692,7 +709,7 @@ private void BroadcastMessages_Scheme()
//The message has been sent to all peers, remove it from the queue.
if (peers.Select(o => o.CurrentMessage).Where(o => o.IsComplete == false).Any() == false)
{
lock (this) queue.Dequeue();
queue.Dequeue();
peers.ForEach(o => o.CurrentMessage = null);
}
}
Expand Down Expand Up @@ -730,23 +747,29 @@ private void PacketPayloadHandler(Peer peer, NMQCommand payload)
//The client is acknowledging the receipt of a command.
else if (payload.CommandType == PayloadCommandType.CommandAck)
{
var key = $"{peer.PeerId}-{payload.Message.MessageId}";
var key = NMQACKEvent.MakeKey(peer.PeerId, payload.Message.MessageId);
if (_commandAckEvents.ContainsKey(key))
{
lock (_commandAckEvents) _commandAckEvents.Remove(key);
lock (_commandAckEvents)
{
_commandAckEvents.Remove(key);
}
}
else
{
//Client... what are you ack'ing?
}
}
//The client is acknowledging the receipt of a command.
//The client is acknowledging that it has processed a message.
else if (payload.CommandType == PayloadCommandType.ProcessedAck)
{
var key = $"{peer.PeerId}-{payload.Message.MessageId}";
var key = NMQACKEvent.MakeKey(peer.PeerId, payload.Message.MessageId);
if (_processedEvents.ContainsKey(key))
{
lock (_processedEvents) _processedEvents.Remove(key);
lock (_processedEvents)
{
_processedEvents.Remove(key);
}
}
else
{
Expand Down

0 comments on commit 0495d8c

Please sign in to comment.