Skip to content

Commit

Permalink
Merge pull request #777 from lscpike/pinned
Browse files Browse the repository at this point in the history
Pinned consumer strategy
  • Loading branch information
gregoryyoung committed Dec 10, 2015
2 parents 56c0378 + 97a84b0 commit f752a96
Show file tree
Hide file tree
Showing 31 changed files with 1,397 additions and 192 deletions.
6 changes: 6 additions & 0 deletions src/EventStore.ClientAPI/Common/SystemNames.cs
Expand Up @@ -139,6 +139,12 @@ public static class SystemConsumerStrategies
/// Distribute events to each client in a round robin fashion.
/// </summary>
public const string RoundRobin = "RoundRobin";

/// <summary>
/// Distribute events of the same streamId to the same client until it disconnects on a best efforts basis.
/// Designed to be used with indexes such as the category projection.
/// </summary>
public const string Pinned = "Pinned";
}

}
66 changes: 47 additions & 19 deletions src/EventStore.ClusterNode/Program.cs
Expand Up @@ -21,6 +21,8 @@
using EventStore.Core.Util;
using System.Net.NetworkInformation;
using EventStore.Core.Data;
using EventStore.Core.Services.PersistentSubscription;
using EventStore.Core.Services.PersistentSubscription.ConsumerStrategy;

namespace EventStore.ClusterNode
{
Expand Down Expand Up @@ -249,8 +251,9 @@ private static ClusterVNodeSettings GetClusterVNodeSettings(ClusterNodeOptions o
}

var authenticationConfig = String.IsNullOrEmpty(options.AuthenticationConfig) ? options.Config : options.AuthenticationConfig;
var authenticationProviderFactory = GetAuthenticationProviderFactory(options.AuthenticationType, authenticationConfig);

var plugInContainer = FindPlugins();
var authenticationProviderFactory = GetAuthenticationProviderFactory(options.AuthenticationType, authenticationConfig, plugInContainer);
var consumerStrategyFactories = GetPlugInConsumerStrategyFactories(plugInContainer);
return new ClusterVNodeSettings(Guid.NewGuid(), 0,
intTcp, intSecTcp, extTcp, extSecTcp, intHttp, extHttp, gossipAdvertiseInfo,
intHttpPrefixes, extHttpPrefixes, options.EnableTrustedAuth,
Expand All @@ -277,7 +280,8 @@ private static ClusterVNodeSettings GetClusterVNodeSettings(ClusterNodeOptions o
options.DisableHTTPCaching,
options.Index,
options.EnableHistograms,
options.IndexCacheDepth);
options.IndexCacheDepth,
consumerStrategyFactories);
}

private static IPAddress GetNonLoopbackAddress(){
Expand All @@ -297,27 +301,32 @@ private static ClusterVNodeSettings GetClusterVNodeSettings(ClusterNodeOptions o
return null;
}

private static IAuthenticationProviderFactory GetAuthenticationProviderFactory(string authenticationType, string authenticationConfigFile)
private static IPersistentSubscriptionConsumerStrategyFactory[] GetPlugInConsumerStrategyFactories(CompositionContainer plugInContainer)
{
var catalog = new AggregateCatalog();
var allPlugins = plugInContainer.GetExports<IPersistentSubscriptionConsumerStrategyPlugin>();

var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
var pluginsPath = Path.Combine(currentPath ?? String.Empty, "plugins");
var strategyFactories = new List<IPersistentSubscriptionConsumerStrategyFactory>();

catalog.Catalogs.Add(new AssemblyCatalog(typeof(Program).Assembly));

if (Directory.Exists(pluginsPath))
foreach (var potentialPlugin in allPlugins)
{
Log.Info("Plugins path: {0}", pluginsPath);
catalog.Catalogs.Add(new DirectoryCatalog(pluginsPath));
}
else
{
Log.Info("Cannot find plugins path: {0}", pluginsPath);
try
{
var plugin = potentialPlugin.Value;
Log.Info("Loaded consumer strategy plugin: {0} version {1}.", plugin.Name, plugin.Version);
strategyFactories.Add(plugin.GetConsumerStrategyFactory());
}
catch (CompositionException ex)
{
Log.ErrorException(ex, "Error loading consumer strategy plugin.");
}
}

var compositionContainer = new CompositionContainer(catalog);
var potentialPlugins = compositionContainer.GetExports<IAuthenticationPlugin>();
return strategyFactories.ToArray();
}

private static IAuthenticationProviderFactory GetAuthenticationProviderFactory(string authenticationType, string authenticationConfigFile, CompositionContainer plugInContainer)
{
var potentialPlugins = plugInContainer.GetExports<IAuthenticationPlugin>();

var authenticationTypeToPlugin = new Dictionary<string, Func<IAuthenticationProviderFactory>> {
{ "internal", () => new InternalAuthenticationProviderFactory() }
Expand All @@ -343,12 +352,31 @@ private static IAuthenticationProviderFactory GetAuthenticationProviderFactory(s
{
throw new ApplicationInitializationException(string.Format("The authentication type {0} is not recognised. If this is supposed " +
"to be provided by an authentication plugin, confirm the plugin DLL is located in {1}.\n" +
"Valid options for authentication are: {2}.", authenticationType, pluginsPath, string.Join(", ", authenticationTypeToPlugin.Keys)));
"Valid options for authentication are: {2}.", authenticationType, Locations.PluginsDirectory, string.Join(", ", authenticationTypeToPlugin.Keys)));
}

return factory();
}

private static CompositionContainer FindPlugins()
{
var catalog = new AggregateCatalog();

catalog.Catalogs.Add(new AssemblyCatalog(typeof (Program).Assembly));

if (Directory.Exists(Locations.PluginsDirectory))
{
Log.Info("Plugins path: {0}", Locations.PluginsDirectory);
catalog.Catalogs.Add(new DirectoryCatalog(Locations.PluginsDirectory));
}
else
{
Log.Info("Cannot find plugins path: {0}", Locations.PluginsDirectory);
}

return new CompositionContainer(catalog);
}

protected override void Start()
{
_node.Start();
Expand Down
4 changes: 4 additions & 0 deletions src/EventStore.Common/Utils/Locations.cs
Expand Up @@ -10,6 +10,7 @@ public class Locations
public static readonly string WebContentDirectory;
public static readonly string ProjectionsDirectory;
public static readonly string PreludeDirectory;
public static readonly string PluginsDirectory;
public static readonly string DefaultContentDirectory;
public static readonly string DefaultConfigurationDirectory;
public static readonly string DefaultDataDirectory;
Expand All @@ -21,6 +22,8 @@ static Locations()
ApplicationDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location) ??
Path.GetFullPath(".");

PluginsDirectory = Path.Combine(ApplicationDirectory, "plugins");

switch (Platforms.GetPlatform())
{
case Platform.Linux:
Expand Down Expand Up @@ -51,6 +54,7 @@ static Locations()
Path.Combine(ApplicationDirectory, "Prelude"),
Path.Combine(DefaultContentDirectory, "Prelude")
);

}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/EventStore.Core.Tests/EventStore.Core.Tests.csproj
Expand Up @@ -331,6 +331,7 @@
<Compile Include="Services\PersistentSubscription\OutstandingMessageCacheTests.cs" />
<Compile Include="Services\PersistentSubscription\PersistentSubscriptionConfigPersistence.cs" />
<Compile Include="Services\PersistentSubscription\PersistentSubscriptionTests.cs" />
<Compile Include="Services\PersistentSubscription\PinnedConsumerStrategyTests.cs" />
<Compile Include="Services\PersistentSubscription\StreamBufferTests.cs" />
<Compile Include="Services\Replication\DeleteStream\when_delete_stream_gets_commit_timeout_after_commit.cs" />
<Compile Include="Authentication\when_handling_multiple_requests_with_reset_password_cache_in_between.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Core.Tests/Helpers/PortsHelper.cs
Expand Up @@ -48,7 +48,7 @@ public static void InitPorts(IPAddress ip)
try
{
var httpListener = new HttpListener();
httpListener.Prefixes.Add(string.Format("http://+:{0}/", port));
httpListener.Prefixes.Add(string.Format("http://127.0.0.1:{0}/", port));
httpListener.Start();

Exception httpListenerError = null;
Expand Down

0 comments on commit f752a96

Please sign in to comment.