Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .nuget/packages.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="NUnit.ConsoleRunner" version="3.9.0" />
<package id="NUnit.ConsoleRunner" version="3.10.0" />
<package id="NUnit.Extension.NUnitV2ResultWriter" version="3.6.0" />
<package id="JetBrains.dotCover.CommandLineTools" version="2018.2.3" />
<package id="JetBrains.dotCover.CommandLineTools" version="2019.1.2" />
</packages>
8 changes: 4 additions & 4 deletions Hazelcast.Net/Hazelcast.Client.Proxy/ClientMapProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ private ArrayList GetPartitionKeyData(ICollection<TKey> keys)
var partitionService = GetContext().GetPartitionService();
var partitionCount = partitionService.GetPartitionCount();
var initialCapacity = 2 * (keys.Count / partitionCount);
var partitionToKeyData = new ArrayList(partitionCount);
var partitionToKeyData = ArrayList.Synchronized(new ArrayList(partitionCount));
for (var i = 0; i < partitionCount; i++)
{
partitionToKeyData.Add(ArrayList.Synchronized(new ArrayList(initialCapacity)));
Expand Down Expand Up @@ -765,9 +765,9 @@ public void PutAll(IDictionary<TKey, TValue> m)
var partitions = new ArrayList(partitionCount);
for (var i = 0; i < partitionCount; i++)
{
partitions.Add(ArrayList.Synchronized(new ArrayList()));
partitions.Add(new ArrayList());
}
Parallel.ForEach(m, kvp =>
foreach (var kvp in m)
{
ValidationUtil.CheckNotNull(kvp.Key, ValidationUtil.NULL_KEY_IS_NOT_ALLOWED);
ValidationUtil.CheckNotNull(kvp.Value, ValidationUtil.NULL_VALUE_IS_NOT_ALLOWED);
Expand All @@ -777,7 +777,7 @@ public void PutAll(IDictionary<TKey, TValue> m)
var partition = (ArrayList) partitions[partitionId];
partition.Add(keyData);
partition.Add(valueData);
});
}
PutAllInternal(m, partitions);
}

Expand Down
128 changes: 76 additions & 52 deletions Hazelcast.Net/Hazelcast.Client.Spi/ClientClusterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ internal class ClientClusterService : IClientClusterService, IConnectionListener
private readonly AtomicReference<IDictionary<Address, IMember>> _membersRef =
new AtomicReference<IDictionary<Address, IMember>>();

private readonly object _initialMembershipListenerMutex = new object();

private ClientMembershipListener _clientMembershipListener;
private ClientConnectionManager _connectionManager;
private Address _ownerConnectionAddress;
Expand All @@ -58,6 +60,7 @@ internal class ClientClusterService : IClientClusterService, IConnectionListener

public ClientClusterService(HazelcastClient client)
{
_membersRef.Set(new Dictionary<Address, IMember>());
_client = client;

var networkConfig = GetClientConfig().GetNetworkConfig();
Expand Down Expand Up @@ -92,6 +95,7 @@ public ClientClusterService(HazelcastClient client)
AddMembershipListenerWithoutInit(membershipListener);
}
}

}

private Address OwnerConnectionAddress
Expand Down Expand Up @@ -161,15 +165,13 @@ public string AddMembershipListener(IMembershipListener listener)
{
throw new ArgumentNullException("listener");
}
var id = Guid.NewGuid().ToString();
_listeners[id] = listener;
if (listener is IInitialMembershipListener)

lock (_initialMembershipListenerMutex)
{
// TODO: needs sync with membership events...
var cluster = _client.GetCluster();
((IInitialMembershipListener) listener).Init(new InitialMembershipEvent(cluster, cluster.GetMembers()));
var id = AddMembershipListenerWithoutInit(listener);
InitMembershipListener(listener);
return id;
}
return id;
}

public bool RemoveMembershipListener(string registrationId)
Expand Down Expand Up @@ -215,7 +217,6 @@ public virtual void Start()
{
Init();
ConnectToCluster();
InitMembershipListener();
}

internal int ServerVersion
Expand All @@ -232,66 +233,54 @@ internal int ServerVersion
}
}

internal virtual void FireMemberAttributeEvent(MemberAttributeEvent @event)
internal void FireMemberAttributeEvent(MemberAttributeEvent @event)
{
_client.GetClientExecutionService().Submit(() =>
foreach (var listener in _listeners.Values)
{
foreach (var listener in _listeners.Values)
{
listener.MemberAttributeChanged(@event);
}
});
listener.MemberAttributeChanged(@event);
}
}

internal virtual void FireMembershipEvent(MembershipEvent @event)
private void FireMembershipEvent(MembershipEvent @event)
{
_client.GetClientExecutionService().Submit((() =>
foreach (var listener in _listeners.Values)
{
foreach (var listener in _listeners.Values)
if (@event.GetEventType() == MembershipEvent.MemberAdded)
{
if (@event.GetEventType() == MembershipEvent.MemberAdded)
{
listener.MemberAdded(@event);
}
else
{
listener.MemberRemoved(@event);
}
listener.MemberAdded(@event);
}
}));
}

internal virtual IDictionary<Address, IMember> GetMembersRef()
{
return _membersRef.Get();
else
{
listener.MemberRemoved(@event);
}
}
}

internal virtual string MembersString()
{
var sb = new StringBuilder("\n\nMembers [");
var members = GetMemberList();
sb.Append(members != null ? members.Count : 0);
sb.Append("] {");
if (members != null)
private void FireInitialMembershipEvent(InitialMembershipEvent @event) {
foreach (var listener in _listeners.Values)
{
foreach (var member in members)
if (listener is IInitialMembershipListener)
{
sb.Append("\n\t").Append(member);
((IInitialMembershipListener) listener).Init(@event);
}
}
sb.Append("\n}\n");
return sb.ToString();
}

internal virtual IDictionary<Address, IMember> GetMembersRef()
{
return _membersRef.Get();
}

internal virtual void SetMembersRef(IDictionary<Address, IMember> map)
{
_membersRef.Set(map);
}

private void AddMembershipListenerWithoutInit(IMembershipListener listener)
private string AddMembershipListenerWithoutInit(IMembershipListener listener)
{
var id = Guid.NewGuid().ToString();
_listeners[id] = listener;
_listeners.TryAdd(id, listener);
return id;
}

/// <exception cref="System.Exception" />
Expand Down Expand Up @@ -426,18 +415,53 @@ private void Init()
_connectionManager.AddConnectionListener(this);
}

private void InitMembershipListener()
private void InitMembershipListener(IMembershipListener listener) {
if (listener is IInitialMembershipListener) {
var cluster = _client.GetCluster();
var memberCollection = _membersRef.Get().Values;
if (memberCollection.Count == 0)
{
//if members are empty,it means initial event did not arrive yet
//it will be redirected to listeners when it arrives see #handleInitialMembershipEvent
return;
}
var members = new HashSet<IMember>(memberCollection);
var @event = new InitialMembershipEvent(cluster, members);
((IInitialMembershipListener) listener).Init(@event);
}
}

internal void HandleInitialMembershipEvent(InitialMembershipEvent @event)
{
foreach (var membershipListener in _listeners.Values)
lock (_initialMembershipListenerMutex)
{
if (membershipListener is IInitialMembershipListener)
var initialMembers = @event.GetMembers();
var newMap = new Dictionary<Address, IMember>();
foreach (var initialMember in initialMembers)
{
// TODO: needs sync with membership events...
var cluster = _client.GetCluster();
var @event = new InitialMembershipEvent(cluster, cluster.GetMembers());
((IInitialMembershipListener) membershipListener).Init(@event);
newMap.Add(initialMember.GetAddress(), initialMember);
}
_membersRef.Set(newMap);
FireInitialMembershipEvent(@event);
}
}

internal void HandleMembershipEvent(MembershipEvent @event)
{
lock (_initialMembershipListenerMutex)
{
var member = @event.GetMember();
var dictionary = _membersRef.Get();
var newMap = new Dictionary<Address, IMember>(dictionary);
if (@event.GetEventType() == MembershipEvent.MemberAdded) {
newMap.Add(member.GetAddress(), member);
} else {
newMap.Remove(member.GetAddress());
}
_membersRef.Set(newMap);
FireMembershipEvent(@event);
}
}

}
}
4 changes: 2 additions & 2 deletions Hazelcast.Net/Hazelcast.Client.Spi/ClientListenerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public bool DeregisterListener(string userRegistrationId)

private bool DeregisterListenerInternal(string userRegistrationId)
{
//This method should not be called from registrationExecutor
Debug.Assert(Thread.CurrentThread.Name == null || !Thread.CurrentThread.Name.Contains("eventRegistration"));
//This method should only be called from registrationExecutor
Debug.Assert(Thread.CurrentThread.Name != null && Thread.CurrentThread.Name.Contains("eventRegistration"));
ListenerRegistration listenerRegistration;
if (!_registrations.TryGetValue(userRegistrationId, out listenerRegistration))
{
Expand Down
Loading