Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove catalog activations locking #5526

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/Orleans.Runtime/Catalog/ActivationDirectory.cs
Expand Up @@ -83,13 +83,17 @@ private CounterStatistic FindSystemTargetCounter(string systemTargetTypeName)
return ctr;
}

public void RecordNewTarget(ActivationData target)
public bool RecordNewTarget(ActivationData target)
{
if (!activations.TryAdd(target.ActivationId, target))
return;
{
return false;
}

grainToActivationsMap.AddOrUpdate(target.Grain,
g => new List<ActivationData> { target },
(g, list) => { lock (list) { list.Add(target); } return list; });
return true;
}

public void RecordNewSystemTarget(SystemTarget target)
Expand Down
148 changes: 69 additions & 79 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Expand Up @@ -161,13 +161,10 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont
inProcessRequests = IntValueStatistic.FindOrCreate(StatisticNames.MESSAGING_PROCESSING_ACTIVATION_DATA_ALL, () =>
{
long counter = 0;
lock (activations)
foreach (var activation in activations)
{
foreach (var activation in activations)
{
ActivationData data = activation.Value;
counter += data.GetRequestCount();
}
ActivationData data = activation.Value;
counter += data.GetRequestCount();
}
return counter;
});
Expand Down Expand Up @@ -265,27 +262,24 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de
public List<Tuple<GrainId, string, int>> GetGrainStatistics()
{
var counts = new Dictionary<string, Dictionary<GrainId, int>>();
lock (activations)
foreach (var activation in activations)
{
foreach (var activation in activations)
{
ActivationData data = activation.Value;
if (data == null || data.GrainInstance == null) continue;
ActivationData data = activation.Value;
if (data == null || data.GrainInstance == null) continue;

// TODO: generic type expansion
var grainTypeName = TypeUtils.GetFullName(data.GrainInstanceType);
// TODO: generic type expansion
var grainTypeName = TypeUtils.GetFullName(data.GrainInstanceType);

Dictionary<GrainId, int> grains;
int n;
if (!counts.TryGetValue(grainTypeName, out grains))
{
counts.Add(grainTypeName, new Dictionary<GrainId, int> { { data.Grain, 1 } });
}
else if (!grains.TryGetValue(data.Grain, out n))
grains[data.Grain] = 1;
else
grains[data.Grain] = n + 1;
Dictionary<GrainId, int> grains;
int n;
if (!counts.TryGetValue(grainTypeName, out grains))
{
counts.Add(grainTypeName, new Dictionary<GrainId, int> { { data.Grain, 1 } });
}
else if (!grains.TryGetValue(data.Grain, out n))
grains[data.Grain] = 1;
else
grains[data.Grain] = n + 1;
}
return counts
.SelectMany(p => p.Value.Select(p2 => Tuple.Create(p2.Key, p.Key, p2.Value)))
Expand All @@ -295,23 +289,20 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de
public List<DetailedGrainStatistic> GetDetailedGrainStatistics(string[] types=null)
{
var stats = new List<DetailedGrainStatistic>();
lock (activations)
foreach (var activation in activations)
{
foreach (var activation in activations)
{
ActivationData data = activation.Value;
if (data == null || data.GrainInstance == null) continue;
ActivationData data = activation.Value;
if (data == null || data.GrainInstance == null) continue;

if (types==null || types.Contains(TypeUtils.GetFullName(data.GrainInstanceType)))
if (types==null || types.Contains(TypeUtils.GetFullName(data.GrainInstanceType)))
{
stats.Add(new DetailedGrainStatistic()
{
stats.Add(new DetailedGrainStatistic()
{
GrainType = TypeUtils.GetFullName(data.GrainInstanceType),
GrainIdentity = data.Grain,
SiloAddress = data.Silo,
Category = data.Grain.Category.ToString()
});
}
GrainType = TypeUtils.GetFullName(data.GrainInstanceType),
GrainIdentity = data.Grain,
SiloAddress = data.Silo,
Category = data.Grain.Category.ToString()
});
}
}
return stats;
Expand Down Expand Up @@ -357,11 +348,16 @@ public DetailedGrainReport GetDetailedGrainReport(GrainId grain)
/// Register a new object to which messages can be delivered with the local lookup table and scheduler.
/// </summary>
/// <param name="activation"></param>
public void RegisterMessageTarget(ActivationData activation)
public bool TryRegisterMessageTarget(ActivationData activation)
{
scheduler.RegisterWorkContext(activation.SchedulingContext);
activations.RecordNewTarget(activation);
lock (activation)
{
if (scheduler.RegisterWorkContext(activation.SchedulingContext) == null) return false;
if (!activations.RecordNewTarget(activation)) return false;
}

activationsCreated.Increment();
return true;
}

/// <summary>
Expand Down Expand Up @@ -443,20 +439,19 @@ public void GetGrainTypeInfo(int typeCode, out string grainClass, out PlacementS
activatedPromise = Task.CompletedTask;
PlacementStrategy placement;

lock (activations)
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this smells bad. why are we adding infinite retry here while the original logic didn't?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a standard lock-free approach when dealing with optimistic concurrency conflicts


In reply to: 274565380 [](ancestors = 274565380)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for explaining !

{
if (TryGetActivationData(address.Activation, out result))
{
return result;
}

int typeCode = address.Grain.TypeCode;
string actualGrainType = null;
MultiClusterRegistrationStrategy activationStrategy;

if (typeCode != 0)
{
GetGrainTypeInfo(typeCode, out actualGrainType, out placement, out activationStrategy, genericArguments);
GetGrainTypeInfo(typeCode, out var actualGrainType, out placement, out activationStrategy, genericArguments);
if (string.IsNullOrEmpty(grainType))
{
grainType = actualGrainType;
Expand Down Expand Up @@ -489,9 +484,13 @@ public void GetGrainTypeInfo(int typeCode, out string grainClass, out PlacementS
this.maxRequestProcessingTime,
this.RuntimeClient,
this.loggerFactory);
RegisterMessageTarget(result);

// Loop until an activation has been registered either by this thread or another.
if (!TryRegisterMessageTarget(result)) continue;
}
} // End lock

break;
}

// Did not find and did not start placing new
if (result == null)
Expand Down Expand Up @@ -1277,29 +1276,23 @@ private async Task<ActivationRegistrationResult> RegisterActivationInGrainDirect
{
// Stateless workers are not registered in the directory and can have multiple local activations.
int maxNumLocalActivations = stPlacement.MaxLocal;
lock (activations)
{
List<ActivationData> local;
if (!LocalLookup(address.Grain, out local) || local.Count <= maxNumLocalActivations)
return ActivationRegistrationResult.Success;
List<ActivationData> local;
if (!LocalLookup(address.Grain, out local) || local.Count <= maxNumLocalActivations)
return ActivationRegistrationResult.Success;

var id = StatelessWorkerDirector.PickRandom(local).Address;
return new ActivationRegistrationResult(existingActivationAddress: id);
}
var id = StatelessWorkerDirector.PickRandom(local).Address;
return new ActivationRegistrationResult(existingActivationAddress: id);
}
else
{
// Some other non-directory, single-activation placement.
lock (activations)
var exists = LocalLookup(address.Grain, out var local);
if (exists && local.Count == 1 && local[0].ActivationId.Equals(activation.ActivationId))
{
var exists = LocalLookup(address.Grain, out var local);
if (exists && local.Count == 1 && local[0].ActivationId.Equals(activation.ActivationId))
{
return ActivationRegistrationResult.Success;
}

return new ActivationRegistrationResult(existingActivationAddress: local[0].Address);
return ActivationRegistrationResult.Success;
}

return new ActivationRegistrationResult(existingActivationAddress: local[0].Address);
}

// We currently don't have any other case for multiple activations except for StatelessWorker.
Expand Down Expand Up @@ -1406,29 +1399,26 @@ private void OnSiloStatusChange(SiloAddress updatedSilo, SiloStatus status)
try
{
// scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner.
lock (activations)
foreach (var activation in activations)
{
foreach (var activation in activations)
try
{
try
{
var activationData = activation.Value;
if (!activationData.IsUsingGrainDirectory) continue;
if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.Grain))) continue;
var activationData = activation.Value;
if (!activationData.IsUsingGrainDirectory) continue;
if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.Grain))) continue;

lock (activationData)
{
// adapted from InsideGrainClient.DeactivateOnIdle().
activationData.ResetKeepAliveRequest();
activationsToShutdown.Add(activationData);
}
}
catch (Exception exc)
lock (activationData)
{
logger.Error(ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
String.Format("Catalog has thrown an exception while executing OnSiloStatusChange of silo {0}.", updatedSilo.ToStringWithHashCode()), exc);
// adapted from InsideGrainClient.DeactivateOnIdle().
activationData.ResetKeepAliveRequest();
activationsToShutdown.Add(activationData);
}
}
catch (Exception exc)
{
logger.Error(ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
String.Format("Catalog has thrown an exception while executing OnSiloStatusChange of silo {0}.", updatedSilo.ToStringWithHashCode()), exc);
}
}
logger.Info(ErrorCode.Catalog_SiloStatusChangeNotification,
String.Format("Catalog is deactivating {0} activations due to a failure of silo {1}, since it is a primary directory partition to these grain ids.",
Expand Down
9 changes: 8 additions & 1 deletion src/Orleans.Runtime/Scheduler/OrleansTaskScheduler.cs
Expand Up @@ -277,7 +277,14 @@ public WorkItemGroup RegisterWorkContext(ISchedulingContext context)
this.cancellationTokenSource.Token,
this.schedulerStatistics,
this.statisticsOptions);
workgroupDirectory.TryAdd(context, wg);
if (!workgroupDirectory.TryAdd(context, wg))
{
// A workgroup for that context has already been added.
// Stop the newly created workgroup to ensure tracking statistics are consistent.
wg.Stop();
return null;
}

return wg;
}

Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Runtime/Silo/SiloProviderRuntime.cs
Expand Up @@ -64,7 +64,11 @@ public void RegisterSystemTarget(ISystemTarget target)
var systemTarget = target as SystemTarget;
if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target));
systemTarget.RuntimeClient = this.runtimeClient;
scheduler.RegisterWorkContext(systemTarget.SchedulingContext);
if (scheduler.RegisterWorkContext(systemTarget.SchedulingContext) == null)
{
throw new InvalidOperationException($"Unable to register duplicate work context for SystemTarget {target} (context: {systemTarget.SchedulingContext})");
}

activationDirectory.RecordNewSystemTarget(systemTarget);
}

Expand Down