Skip to content

Commit

Permalink
Merge pull request #218 from PHOENIXCONTACT/fix/notification-race-con…
Browse files Browse the repository at this point in the history
…dition

Add missing union on lists in NotificationAdapter
  • Loading branch information
Toxantron committed Aug 17, 2023
2 parents 5c1069c + a87fbbf commit d6afbee
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 113 deletions.
203 changes: 97 additions & 106 deletions src/Moryx.Notifications/Adapter/NotificationAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ namespace Moryx.Notifications
/// </summary>
public class NotificationAdapter : INotificationAdapter, INotificationSourceAdapter
{
private readonly List<NotificationMap> _published = new List<NotificationMap>();
private readonly List<NotificationMap> _pendingAcks = new List<NotificationMap>();
private readonly List<NotificationMap> _pendingPubs = new List<NotificationMap>();
private readonly List<NotificationMap> _published = new List<NotificationMap>(16);
private readonly List<NotificationMap> _pendingAcks = new List<NotificationMap>(16);
private readonly List<NotificationMap> _pendingPubs = new List<NotificationMap>(16);

private readonly ReaderWriterLockSlim _listLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
private readonly object _listLock = new ();

/// <summary>
/// Logger for the Notification Adapter
Expand All @@ -42,13 +42,13 @@ public IReadOnlyList<INotification> GetPublished(INotificationSender sender, obj

private IReadOnlyList<INotification> GetPublished(Func<NotificationMap, bool> filter)
{
_listLock.EnterReadLock();

var notifications = _published.Union(_pendingPubs).Where(filter)
IReadOnlyList<INotification> notifications;
lock (_listLock)
{
notifications = _published.Union(_pendingPubs).Where(filter)
.Select(map => map.Notification)
.ToArray();

_listLock.ExitReadLock();
}

return notifications;
}
Expand All @@ -68,29 +68,23 @@ public void Publish(INotificationSender sender, INotification notification, obje
if (notification == null)
throw new ArgumentNullException(nameof(notification), "Notification must be set");

_listLock.EnterUpgradeableReadLock();

// Lets check if the notification was already published
var isPending = _pendingPubs.Union(_pendingAcks).Union(_published)
.Any(n => n.Notification == notification);

if (isPending)

lock (_listLock)
{
_listLock.ExitUpgradeableReadLock();
throw new InvalidOperationException("Notification cannot be published twice!");
}
// Lets check if the notification was already published
var isPending = _pendingPubs.Union(_pendingAcks).Union(_published)
.Any(n => n.Notification == notification);

_listLock.EnterWriteLock();
if (isPending)
throw new InvalidOperationException("Notification cannot be published twice!");

var managed = (IManagedNotification)notification;
managed.Identifier = Guid.NewGuid();
managed.Created = DateTime.Now;
managed.Sender = sender.Identifier;

_pendingPubs.Add(new NotificationMap(sender, notification, tag));
var managed = (IManagedNotification)notification;
managed.Identifier = Guid.NewGuid();
managed.Created = DateTime.Now;
managed.Sender = sender.Identifier;

_listLock.ExitWriteLock();
_listLock.ExitUpgradeableReadLock();
_pendingPubs.Add(new NotificationMap(sender, notification, tag));
}

Published?.Invoke(this, notification);
}
Expand All @@ -108,32 +102,28 @@ public void Acknowledge(INotificationSender sender, INotification notification)
managed.Acknowledged = DateTime.Now;
managed.Acknowledger = sender.Identifier;

_listLock.EnterWriteLock();

var published = _published.SingleOrDefault(n => n.Notification.Identifier == notification.Identifier);
if (published != null)
{
_published.Remove(published);
}
else
NotificationMap published;
lock (_listLock)
{
published = _pendingPubs.SingleOrDefault(n => n.Notification.Identifier == notification.Identifier);

if (published == null)
published = _published.SingleOrDefault(n => n.Notification.Identifier == notification.Identifier);
if (published is not null)
_published.Remove(published);
else
{
_listLock.ExitWriteLock();
throw new InvalidOperationException("Notification was not managed by the adapter. " +
"The sender was not registered on the adapter");
}
published = _pendingPubs.SingleOrDefault(n => n.Notification.Identifier == notification.Identifier);

_pendingPubs.Remove(published);
}
if (published is null)
throw new InvalidOperationException("Notification was not managed by the adapter. " +
"The sender was not registered on the adapter");

_pendingAcks.Add(published);
_pendingPubs.Remove(published);
}

_listLock.ExitWriteLock();
_pendingAcks.Add(published);
}

Acknowledged?.Invoke(this, published.Notification);
if (published is not null)
Acknowledged?.Invoke(this, published.Notification);
}

/// <inheritdoc />
Expand All @@ -153,24 +143,24 @@ public void AcknowledgeAll(INotificationSender sender, object tag)
/// </summary>
private void AcknowledgeByFilter(INotificationSender sender, Predicate<NotificationMap> filter)
{
_listLock.EnterWriteLock();

var publishes = _published.Where(m => filter(m)).ToArray();
_published.RemoveAll(filter);
_pendingPubs.RemoveAll(filter);

foreach (var published in publishes)
NotificationMap[] publishes;
lock (_listLock)
{
var managed = (IManagedNotification)published.Notification;
managed.Acknowledged = DateTime.Now;
managed.Acknowledger = sender.Identifier;
publishes = _published.Union(_pendingPubs).Where(m => filter(m)).ToArray();
_published.RemoveAll(filter);
_pendingPubs.RemoveAll(filter);

_pendingAcks.Add(published);
}
foreach (var published in publishes)
{
var managed = (IManagedNotification)published.Notification;
managed.Acknowledged = DateTime.Now;
managed.Acknowledger = sender.Identifier;

_listLock.ExitWriteLock();
_pendingAcks.Add(published);
}
}

foreach (var published in publishes)
foreach (var published in publishes ?? new NotificationMap[0]) // ToDo: Change to Array.Empty in MROYX 6
Acknowledged?.Invoke(this, published.Notification);
}

Expand All @@ -181,92 +171,93 @@ private void AcknowledgeByFilter(INotificationSender sender, Predicate<Notificat
/// <inheritdoc />
IReadOnlyList<INotification> INotificationSourceAdapter.GetPublished()
{
_listLock.EnterReadLock();

var published = _published.Union(_pendingAcks).Select(map => map.Notification).ToArray();

_listLock.ExitReadLock();
IReadOnlyList<INotification> published;
lock (_listLock)
{
published = _published.Union(_pendingAcks).Select(map => map.Notification).ToArray();
}

return published;
}

/// <inheritdoc />
void INotificationSourceAdapter.Acknowledge(INotification notification)
{
_listLock.EnterReadLock();

var map = _published.Single(m => m.Notification.Identifier == notification.Identifier);

_listLock.ExitReadLock();

map.Sender.Acknowledge(map.Notification, map.Tag);
NotificationMap map;
lock (_listLock)
{
map = _published.Single(m => m.Notification.Identifier == notification.Identifier);
}
map?.Sender.Acknowledge(map.Notification, map.Tag);
}

/// <inheritdoc />
void INotificationSourceAdapter.AcknowledgeProcessed(INotification notification)
{
_listLock.EnterWriteLock();

var map = _pendingAcks.SingleOrDefault(n => n.Notification.Identifier.Equals(notification.Identifier));

// Maybe already removed from this adapter
if (map != null)
_pendingAcks.Remove(map);
lock (_listLock)
{
var map = _pendingAcks.SingleOrDefault(n => n.Notification.Identifier.Equals(notification.Identifier));

_listLock.ExitWriteLock();
// Maybe already removed from this adapter
if (map is not null)
_pendingAcks.Remove(map);
}
}

/// <inheritdoc />
void INotificationSourceAdapter.PublishProcessed(INotification notification)
{
_listLock.EnterWriteLock();

var map = _pendingPubs.SingleOrDefault(n => n.Notification.Identifier.Equals(notification.Identifier));

if (map != null)
NotificationMap map;
lock (_listLock)
{
_pendingPubs.Remove(map);
_published.Add(map);
_listLock.ExitWriteLock();
return;
}
map = _pendingPubs.SingleOrDefault(n => n.Notification.Identifier.Equals(notification.Identifier));

// Notification is maybe not pending anymore
_listLock.ExitWriteLock();
if (map != null)
{
_pendingPubs.Remove(map);
_published.Add(map);
return;
}
}

var managed = (IManagedNotification)notification;

// If necessary we can acknowledge it
if (managed.Acknowledged is null)
{
Logger.Log(LogLevel.Error, "Notification was removed from the pending publications " +
"before being published but is not acknowledged.");
Logger.Log(LogLevel.Error, "Notification {0} was removed from the pending publications " +
"before being published but is not acknowledged.", managed.Identifier);
managed.Acknowledged = DateTime.Now;
managed.Acknowledger = nameof(NotificationAdapter);
Acknowledged?.Invoke(this, notification);
}

Logger.Log(LogLevel.Warning, "Notification was removed from the pending publications. " +
"It was already acknowledged by {0} at {1}.", managed.Acknowledger, managed.Acknowledged);
Logger.Log(LogLevel.Warning, "Notification {0} was removed from the pending publications. " +
"It was already acknowledged by {1} at {2}.", managed.Identifier, managed.Acknowledger, managed.Acknowledged);
}

/// <inheritdoc />
void INotificationSourceAdapter.Sync()
{
// Publish pending notifications
_listLock.EnterReadLock();
var pendingPublishs = _pendingPubs.ToArray();
_listLock.ExitReadLock();

foreach (var pendingPublish in pendingPublishs)
NotificationMap[] pendingPublishes = new NotificationMap[0]; // ToDo: Replace with Array.Empty in MORYX 6
lock (_listLock)
{
pendingPublishes = _pendingPubs.ToArray();
}
foreach (var pendingPublish in pendingPublishes)
{
Published?.Invoke(this, pendingPublish.Notification);
}

// Acknowledge pending acknowledges
_listLock.EnterReadLock();
var pendingAcks = _pendingAcks.ToArray();
_listLock.ExitReadLock();
NotificationMap[] pendingAcks = new NotificationMap[0]; // ToDo: Replace with Array.Empty in MORYX 6
lock (_listLock)
{
pendingAcks = _pendingAcks.ToArray();

}
foreach (var pendingAck in pendingAcks)
{
Acknowledged?.Invoke(this, pendingAck.Notification);
Expand Down
11 changes: 4 additions & 7 deletions src/Tests/Moryx.Notifications.Tests/NotificationAdapterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void SetUp()
}

[Test(Description = "Check that publishing a notification publishes an event, and marks the notification as published. " +
"Check, that notifications can not be published twice.")]
"Check that notifications can not be published twice.")]
public void AdapterPublish()
{
// Arrange
Expand Down Expand Up @@ -80,9 +80,9 @@ public void AdapterPublishWithTag()
// Act
_adapter.Publish(_sender, notification, tag);
((INotificationSourceAdapter)_adapter).PublishProcessed(notification);

// Arrange
var published = _adapter.GetPublished(_sender, tag);

// Assert
Assert.AreEqual(1, published.Count);
}

Expand Down Expand Up @@ -119,7 +119,7 @@ public void AdapterAcknowledgeKnownNotificationWhichIsAlreadyPublishedToThePubli
Assert.AreNotEqual(_acknowledgedEventNotification.Acknowledged, default(DateTime), "Acknowledged date should have been set");
}

[Test(Description = "Check that acknowledging a notification by the adapter for a known notification throws an exception.")]
[Test(Description = "Check that acknowledging a notification by the adapter for an unknown notification throws an exception.")]
public void AdapterAcknowledgeForUnknownNotification()
{
// Arrange
Expand Down Expand Up @@ -150,9 +150,6 @@ public void AcknowledgeForKnownNotification()
Assert.AreEqual(notification, _acknowledgeCallNotification, "Acknowledged was not called for the wrong notification.");
}

/// <summary>
/// Check that acknowledging a notification by the SenderAdapter-interface for an unknown notification throws an exception.
/// </summary>
[Test(Description = "Check that acknowledging a notification by the SenderAdapter-interface for an unknown notification throws an exception.")]
public void SenderAdapterAcknowledgeForUnknownNotification()
{
Expand Down

0 comments on commit d6afbee

Please sign in to comment.