Skip to content

Commit

Permalink
Log Consistency Providers (#1854)
Browse files Browse the repository at this point in the history
Log Consistency Providers
  • Loading branch information
sebastianburckhardt authored and sergeybykov committed Jan 14, 2017
1 parent e3ee491 commit 0f4ba04
Show file tree
Hide file tree
Showing 83 changed files with 6,533 additions and 206 deletions.
56 changes: 53 additions & 3 deletions src/Orleans/Async/BatchWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ public abstract class BatchWorker
/// <summary>Implement this member in derived classes to define what constitutes a work cycle</summary>
protected abstract Task Work();

protected object lockable = new object();

/// <summary>
/// Notify the worker that there is more work.
/// </summary>
public void Notify()
{
lock (this)
lock (lockable)
{
if (currentWorkCycle != null)
{
Expand Down Expand Up @@ -62,7 +64,7 @@ private void CheckForMoreWork()
TaskCompletionSource<Task> signal = null;
Task taskToSignal = null;

lock (this)
lock (lockable)
{
if (moreWork)
{
Expand Down Expand Up @@ -109,7 +111,7 @@ public async Task WaitForCurrentWorkToBeServiced()
Task waitfortask = null;

// figure out exactly what we need to wait for
lock (this)
lock (lockable)
{
if (!moreWork)
// just wait for current work cycle
Expand All @@ -132,5 +134,53 @@ public async Task WaitForCurrentWorkToBeServiced()
else if (waitfortask != null)
await waitfortask;
}

/// <summary>
/// Notify the worker that there is more work, and wait for the current work cycle, and also the next work cycle if there is currently unserviced work.
/// </summary>
public async Task NotifyAndWaitForWorkToBeServiced()
{
Task<Task> waitForTaskTask = null;
Task waitForTask = null;

lock (lockable)
{
if (currentWorkCycle != null)
{
moreWork = true;
if (nextWorkCyclePromise == null)
nextWorkCyclePromise = new TaskCompletionSource<Task>();
waitForTaskTask = nextWorkCyclePromise.Task;
}
else
{
Start();
waitForTask = currentWorkCycle;
}
}

if (waitForTaskTask != null)
await await waitForTaskTask;

else if (waitForTask != null)
await waitForTask;
}
}

/// A convenient variant of a batch worker
/// that allows the work function to be passed as a constructor argument
public class BatchWorkerFromDelegate : BatchWorker
{
public BatchWorkerFromDelegate(Func<Task> work)
{
this.work = work;
}

private Func<Task> work;

protected override Task Work()
{
return work();
}
}
}
4 changes: 4 additions & 0 deletions src/Orleans/Async/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ internal static T GetResult<T>(this Task<T> task)
{
return task.GetAwaiter().GetResult();
}
internal static void GetResult(this Task task)
{
task.GetAwaiter().GetResult();
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/Orleans/Configuration/GlobalConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Orleans.Providers;
using Orleans.Storage;
using Orleans.Streams;
using Orleans.LogConsistency;

namespace Orleans.Runtime.Configuration
{
Expand Down Expand Up @@ -1095,6 +1096,37 @@ public void RegisterStorageProvider(string providerTypeFullName, string provider
ProviderConfigurationUtility.RegisterProvider(ProviderConfigurations, ProviderCategoryConfiguration.STORAGE_PROVIDER_CATEGORY_NAME, providerTypeFullName, providerName, properties);
}

/// <summary>
/// Registers a given log-consistency provider.
/// </summary>
/// <param name="providerTypeFullName">Full name of the log-consistency provider type</param>
/// <param name="providerName">Name of the log-consistency provider</param>
/// <param name="properties">Properties that will be passed to the log-consistency provider upon initialization </param>
public void RegisterLogConsistencyProvider(string providerTypeFullName, string providerName, IDictionary<string, string> properties = null)
{
ProviderConfigurationUtility.RegisterProvider(ProviderConfigurations, ProviderCategoryConfiguration.LOG_CONSISTENCY_PROVIDER_CATEGORY_NAME, providerTypeFullName, providerName, properties);
}


/// <summary>
/// Registers a given type of <typeparamref name="T"/> where <typeparamref name="T"/> is a log-consistency provider
/// </summary>
/// <typeparam name="T">Non-abstract type which implements <see cref="ILogConsistencyProvider"/> a log-consistency storage interface</typeparam>
/// <param name="providerName">Name of the log-consistency provider</param>
/// <param name="properties">Properties that will be passed to log-consistency provider upon initialization</param>
public void RegisterLogConsistencyProvider<T>(string providerName, IDictionary<string, string> properties = null) where T : ILogConsistencyProvider
{
Type providerType = typeof(T);
var providerTypeInfo = providerType.GetTypeInfo();
if (providerTypeInfo.IsAbstract ||
providerTypeInfo.IsGenericType ||
!typeof(ILogConsistencyProvider).IsAssignableFrom(providerType))
throw new ArgumentException("Expected non-generic, non-abstract type which implements ILogConsistencyProvider interface", "typeof(T)");

ProviderConfigurationUtility.RegisterProvider(ProviderConfigurations, ProviderCategoryConfiguration.LOG_CONSISTENCY_PROVIDER_CATEGORY_NAME, providerType.FullName, providerName, properties);
}


/// <summary>
/// Retrieves an existing provider configuration
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/Orleans/Configuration/OrleansConfiguration.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,18 @@
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="LogConsistencyProviders" minOccurs="0" maxOccurs="1">
<xs:annotation>
<xs:documentation>
The optional LogConsistencyProviders element contains configuration for log-consistency provider instances.
</xs:documentation>
</xs:annotation>
<xs:complexType>
<xs:sequence>
<xs:element name="Provider" type="tns:ProviderInstanceConfig" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="StreamProviders" minOccurs="0" maxOccurs="1" type="tns:StreamProvidersConfig" />
<xs:element name="Application" minOccurs="0" maxOccurs="1">
<xs:complexType>
Expand Down
1 change: 1 addition & 0 deletions src/Orleans/Configuration/ProviderConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class ProviderCategoryConfiguration
public const string BOOTSTRAP_PROVIDER_CATEGORY_NAME = "Bootstrap";
public const string STORAGE_PROVIDER_CATEGORY_NAME = "Storage";
public const string STREAM_PROVIDER_CATEGORY_NAME = "Stream";
public const string LOG_CONSISTENCY_PROVIDER_CATEGORY_NAME = "LogConsistency";

public string Name { get; set; }
public IDictionary<string, IProviderConfiguration> Providers { get; set; }
Expand Down
33 changes: 31 additions & 2 deletions src/Orleans/Core/GrainAttributes.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using Orleans.GrainDirectory;
using Orleans.Runtime;

namespace Orleans
{
namespace Concurrency
Expand Down Expand Up @@ -272,15 +274,42 @@ namespace Providers
[AttributeUsage(AttributeTargets.Class)]
public sealed class StorageProviderAttribute : Attribute
{
/// <summary>
/// The name of the provider to be used for persisting of grain state
/// </summary>
public string ProviderName { get; set; }

public StorageProviderAttribute()
{
ProviderName = Runtime.Constants.DEFAULT_STORAGE_PROVIDER_NAME;
ProviderName = Runtime.Constants.DEFAULT_STORAGE_PROVIDER_NAME;
}
}

/// <summary>
/// The [Orleans.Providers.LogConsistencyProvider] attribute is used to define which consistency provider to use for grains using the log-view state abstraction.
/// <para>
/// Specifying [Orleans.Providers.LogConsistencyProvider] property is recommended for all grains that derive
/// from ILogConsistentGrain, such as JournaledGrain.
/// If no [Orleans.Providers.LogConsistencyProvider] attribute is specified, then the runtime tries to locate
/// one as follows. First, it looks for a
/// "Default" provider in the configuration file, then it checks if the grain type defines a default.
/// If a consistency provider cannot be located for this grain, then the grain will fail to load into the Silo.
/// </para>
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public sealed class LogConsistencyProviderAttribute : Attribute
{
/// <summary>
/// The name of the storage provider to ne used for persisting state for this grain.
/// The name of the provider to be used for consistency
/// </summary>
public string ProviderName { get; set; }

public LogConsistencyProviderAttribute()
{
ProviderName = Runtime.Constants.DEFAULT_LOG_CONSISTENCY_PROVIDER_NAME;
}
}

}

[AttributeUsage(AttributeTargets.Class, AllowMultiple=true)]
Expand Down
8 changes: 6 additions & 2 deletions src/Orleans/GrainDirectory/ClusterLocalRegistration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using Orleans.MultiCluster;

namespace Orleans.GrainDirectory
{
Expand Down Expand Up @@ -34,9 +36,11 @@ public override int GetHashCode()
return GetType().GetHashCode();
}

internal override bool IsSingleInstance()
public override IEnumerable<string> GetRemoteInstances(MultiClusterConfiguration mcConfig, string myClusterId)
{
return true;
foreach (var clusterId in mcConfig.Clusters)
if (clusterId != myClusterId)
yield return clusterId;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using Orleans.MultiCluster;

namespace Orleans.GrainDirectory
{
Expand Down Expand Up @@ -41,9 +43,11 @@ public override int GetHashCode()
return GetType().GetHashCode();
}

internal override bool IsSingleInstance()
private static List<string> emptyList = new List<string>();

public override IEnumerable<string> GetRemoteInstances(MultiClusterConfiguration mcConfig, string myClusterId)
{
return true;
return emptyList; // there is only one instance, so no remote instances
}
}
}
30 changes: 27 additions & 3 deletions src/Orleans/GrainDirectory/MultiClusterRegistrationStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
using System;
using System.Linq;
using Orleans.Runtime.Configuration;
using Orleans.MultiCluster;
using System.Collections.Generic;

namespace Orleans.GrainDirectory
{

/// <summary>
/// Interface for multi-cluster registration strategies. Used by protocols that coordinate multiple instances.
/// </summary>
public interface IMultiClusterRegistrationStrategy {

/// <summary>
/// Determines which remote clusters have instances.
/// </summary>
/// <param name="mcConfig">The multi-cluster configuration</param>
/// <param name="myClusterId">The cluster id of this cluster</param>
/// <returns></returns>
IEnumerable<string> GetRemoteInstances(MultiClusterConfiguration mcConfig, string myClusterId);

}

/// <summary>
/// A superclass for all multi-cluster registration strategies.
/// Strategy objects are used as keys to select the proper registrar.
/// Strategy object which is used as keys to select the proper registrar.
/// </summary>
[Serializable]
internal abstract class MultiClusterRegistrationStrategy
internal abstract class MultiClusterRegistrationStrategy : IMultiClusterRegistrationStrategy
{
private static MultiClusterRegistrationStrategy defaultStrategy;

Expand All @@ -33,6 +52,11 @@ internal static MultiClusterRegistrationStrategy GetDefault()
return defaultStrategy;
}

internal abstract bool IsSingleInstance();
internal static MultiClusterRegistrationStrategy FromAttributes(IEnumerable<object> attributes)
{
return (attributes.FirstOrDefault() as RegistrationAttribute)?.RegistrationStrategy ?? defaultStrategy;
}

public abstract IEnumerable<string> GetRemoteInstances(MultiClusterConfiguration mcConfig, string myClusterId);
}
}
68 changes: 68 additions & 0 deletions src/Orleans/LogConsistency/ConnectionIssues.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.LogConsistency
{

/// <summary>
/// Represents information about connection issues encountered inside log consistency protocols.
/// It is used both inside the protocol to track retry loops, and is made visible to users
/// who want to monitor their log-consistent grains for communication issues.
/// </summary>
[Serializable]
public abstract class ConnectionIssue
{
/// <summary>
/// The UTC timestamp of the last time at which the issue was observed
/// </summary>
public DateTime TimeStamp { get; set; }

/// <summary>
/// The UTC timestamp of the first time we observed this issue
/// </summary>
public DateTime TimeOfFirstFailure { get; set; }

/// <summary>
/// The number of times we have observed this issue since the first failure
/// </summary>
public int NumberOfConsecutiveFailures { get; set; }

/// <summary>
/// The delay we are waiting before the next retry
/// </summary>
public TimeSpan RetryDelay { get; set; }

/// <summary>
/// Computes the retry delay based on the rest of the information. Is overridden by subclasses
/// that represent specific categories of issues.
/// </summary>
/// <param name="previous">The previously used retry delay</param>
/// <returns></returns>
public abstract TimeSpan ComputeRetryDelay(TimeSpan? previous);
}



/// <summary>
/// Represents information about notification failures encountered inside log consistency protocols.
/// </summary>
[Serializable]
public abstract class NotificationFailed : ConnectionIssue
{
/// <summary>
/// The clusterId of the remote cluster to which we had an issue when sending change notifications.
/// </summary>
public string RemoteClusterId { get; set; }

/// <summary>
/// The exception we caught, or null if the problem was not caused by an exception.
/// </summary>
public Exception Exception { get; set; }
}



}
Loading

0 comments on commit 0f4ba04

Please sign in to comment.