diff --git a/Membase/BucketConfigListener.cs b/Membase/BucketConfigListener.cs
index 8a8c6408..267ea82f 100644
--- a/Membase/BucketConfigListener.cs
+++ b/Membase/BucketConfigListener.cs
@@ -48,7 +48,77 @@ public BucketConfigListener(Uri[] poolUrls, string bucketName, NetworkCredential
///
public int DeadTimeout { get; set; }
- #region listener cache
+ ///
+ /// Raised when the pool's configuration changes.
+ ///
+ public event Action ClusterConfigChanged;
+
+ ///
+ /// Starts listening for configuration data. This method blocks until the initial configuration is received. (Or until all pool urls fail.)
+ ///
+ public void Start()
+ {
+ var reset = this.mre = new ManualResetEvent(false);
+
+ // subscribe to the config url
+ this.listener = this.GetPooledListener();
+
+ // this will be signaled by the config changed event handler
+ reset.WaitOne();
+
+ // set to null, then dispose, so RaiseConfigChanged will not
+ // fail at Set when the config changes while we're cleaning up here
+ this.mre = null;
+ ((IDisposable)reset).Dispose();
+ }
+
+ public void Stop()
+ {
+ this.ReleaseListener(this.listener);
+ this.listener = null;
+ }
+
+ private void HandleMessage(string message)
+ {
+ // everything failed
+ if (String.IsNullOrEmpty(message))
+ {
+ this.lastHash = null;
+ this.RaiseConfigChanged(null);
+ return;
+ }
+
+ // deserialize the buckets
+ var jss = new JavaScriptSerializer();
+ var config = jss.Deserialize(message);
+
+ // check if the config is the same as the previous
+ // we cannot compare the messages because they have more information than we deserialize from them
+ var configHash = config.GetHashCode();
+
+ if (lastHash != configHash)
+ {
+ lastHash = configHash;
+ this.RaiseConfigChanged(config);
+ }
+ else if (log.IsDebugEnabled)
+ log.Debug("Last message was the same as current, ignoring.");
+ }
+
+ private void RaiseConfigChanged(ClusterConfig config)
+ {
+ var ccc = this.ClusterConfigChanged;
+
+ // we got a new config, notify the pool to reload itself
+ if (ccc != null)
+ ccc(config);
+
+ // trigger the event so Start stops blocking
+ if (this.mre != null)
+ this.mre.Set();
+ }
+
+ #region [ message listener pooling ]
private static readonly object ListenerSync = new Object();
// we pool and refcount the listeners here so we can safely dispose them when all clients are destroyed
@@ -87,6 +157,13 @@ private void ReleaseListener(MessageStreamListener listener)
}
}
+ ///
+ /// Returns a MessageStreamListener instance based on this instance's configuratino (timeout, bucket name etc.)
+ ///
+ /// When multiple listeners are requested with the exact same parameters (usually when multiple clients are instantiated from the same configuration),
+ /// the same listener will be returned each time.
+ ///
+ ///
private MessageStreamListener GetPooledListener()
{
// create a unique key based on the parameters
@@ -114,11 +191,17 @@ private MessageStreamListener GetPooledListener()
MessageStreamListener retval;
lock (ListenerSync)
- {
- if (!listeners.TryGetValue(hash, out retval))
+ if (listeners.TryGetValue(hash, out retval))
{
- listeners[hash] = retval = new MessageStreamListener(poolUrls, this.ResolveBucketUri);
- listenerRefs[retval] = new ListenerInfo { RefCount = 1, HashKey = hash };
+ listenerRefs[retval].RefCount++;
+ retval.Subscribe(this.HandleMessage);
+ }
+ else
+ {
+ var name = this.bucketName;
+
+ // create a new listener for the pool urls
+ retval = new MessageStreamListener(poolUrls, (client, root) => ResolveBucketUri(client, root, name));
retval.Timeout = this.Timeout;
retval.DeadTimeout = this.DeadTimeout;
@@ -128,38 +211,25 @@ private MessageStreamListener GetPooledListener()
retval.Subscribe(this.HandleMessage);
+ listeners[hash] = retval;
+ listenerRefs[retval] = new ListenerInfo { RefCount = 1, HashKey = hash };
+
retval.Start();
}
- else
- {
- listenerRefs[retval].RefCount++;
- retval.Subscribe(this.HandleMessage);
- }
- }
return retval;
}
- #endregion
-
- private Uri ResolveBucketUri(WebClientWithTimeout client, Uri uri)
+ private static Uri ResolveBucketUri(WebClientWithTimeout client, Uri root, string bucketName)
{
try
{
var helper = new ConfigHelper(client);
- var bucket = helper.ResolveBucket(uri, this.bucketName);
+ var bucket = helper.ResolveBucket(root, bucketName);
if (bucket == null)
return null;
- var streamingUri = bucket.streamingUri;
-
- var node = bucket.nodes.FirstOrDefault();
-
- // beta 2 hack, will be phased out after b3 is released for a while
- if (node != null && node.version == "1.6.0beta2")
- streamingUri = streamingUri.Replace("/bucketsStreaming/", "/bucketsStreamingConfig/");
-
- return new Uri(uri, streamingUri);
+ return new Uri(root, bucket.streamingUri);
}
catch (Exception e)
{
@@ -169,68 +239,7 @@ private Uri ResolveBucketUri(WebClientWithTimeout client, Uri uri)
}
}
- public event Action ClusterConfigChanged;
-
- private void HandleMessage(string message)
- {
- // everything failed
- if (String.IsNullOrEmpty(message))
- {
- this.lastHash = null;
- this.RaiseConfigChanged(null);
- return;
- }
-
- // deserialize the buckets
- var jss = new JavaScriptSerializer();
- var config = jss.Deserialize(message);
-
- // check if the config is the same as the previous
- // we cannot compare the messages because they have more information than we deserialize from them
- var configHash = config.GetHashCode();
-
- if (lastHash != configHash)
- {
- lastHash = configHash;
- this.RaiseConfigChanged(config);
- }
- else if (log.IsDebugEnabled)
- log.Debug("Last message was the same as current, ignoring.");
- }
-
- public void Start()
- {
- var reset = this.mre = new ManualResetEvent(false);
-
- // subscribe to the config url
- this.listener = this.GetPooledListener();
-
- reset.WaitOne();
-
- // set to null, then dispose, so RaiseConfigChanged will not
- // fail at Set when the config changes while we're cleaning up here
- this.mre = null;
- ((IDisposable)reset).Dispose();
- }
-
- public void Stop()
- {
- this.ReleaseListener(this.listener);
- this.listener = null;
- }
-
- private void RaiseConfigChanged(ClusterConfig config)
- {
- var ccc = this.ClusterConfigChanged;
-
- // we got a new config, notify the pool to reload itself
- if (ccc != null)
- ccc(config);
-
- // trigger the event so Start stops blocking
- if (this.mre != null)
- this.mre.Set();
- }
+ #endregion
}
}
diff --git a/Membase/ConfigHelper.cs b/Membase/ConfigHelper.cs
index c12ce7b7..361165b5 100644
--- a/Membase/ConfigHelper.cs
+++ b/Membase/ConfigHelper.cs
@@ -5,110 +5,143 @@
using System.Text;
using System.Web.Script.Serialization;
using System.IO;
+using System.Text.RegularExpressions;
namespace Membase
{
- internal class ConfigHelper : IDisposable
+ internal class ConfigHelper
{
private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(ConfigHelper));
private WebClientWithTimeout wcwt;
- public ConfigHelper() : this(new WebClientWithTimeout()) { }
-
public ConfigHelper(WebClientWithTimeout client)
{
this.wcwt = client;
}
- public ICredentials Credentials
- {
- get { return this.wcwt.Credentials; }
- set { this.wcwt.Credentials = value; }
- }
-
- public int Timeout
+ ///
+ /// Deserializes the content of an url as a json object
+ ///
+ ///
+ ///
+ ///
+ private T DeserializeUri(Uri uri, bool forceAuth)
{
- get { return this.wcwt.Timeout; }
- set
+ if (forceAuth)
{
- this.wcwt.Timeout = value;
- this.wcwt.ReadWriteTimeout = value;
+ var cred = this.wcwt.Credentials;
+
+ if (cred == null)
+ {
+ if (log.IsDebugEnabled) log.Debug("Cannot force basic auth, the client has no credentials specified.");
+ return default(T);
+ }
+
+ var nc = cred.GetCredential(uri, "Basic");
+ if (nc == null)
+ {
+ if (log.IsDebugEnabled) log.DebugFormat("Cannot force basic auth, the client did not gave us a credential for this url: {0}.", uri);
+ return default(T);
+ }
+
+ // this will send the basic auth header even though the server did not ask for it
+ // 1.6.4+ requires you to authenticate to get protected bucket info (but it does not give you a 401 error to force the auth)
+ this.wcwt.Encoding = Encoding.UTF8;
+ this.wcwt.Headers["Authorization"] = "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes(nc.UserName + ":" + nc.Password));
}
- }
- private T DeserializeUri(Uri uri)
- {
var info = this.wcwt.DownloadString(uri);
var jss = new JavaScriptSerializer();
return jss.Deserialize(info);
}
- private Uri GetBucketsRoot(Uri poolUri)
+ private ClusterInfo GetClusterInfo(Uri clusterUrl)
{
- var poolInfo = DeserializeUri>(poolUri);
-
- object tmp;
- Dictionary dict;
+ var info = DeserializeUri(clusterUrl, false);
- // get the buckets member which will hold the url of the bucket listing REST endpoint
- if (!poolInfo.TryGetValue("buckets", out tmp)
- || (dict = tmp as Dictionary) == null)
- throw new ArgumentException("invalid pool url: " + poolUri);
+ if (info == null)
+ throw new ArgumentException("invalid pool url: " + clusterUrl);
- string bucketsUrl;
-
- // get the { uri: VALUE } part
- if (!dict.TryGetValue("uri", out tmp) || (bucketsUrl = tmp as string) == null)
+ if (info.buckets == null || String.IsNullOrEmpty(info.buckets.uri))
throw new ArgumentException("got an invalid response, missing { buckets : { uri : '' } }");
- return new Uri(poolUri, bucketsUrl);
+ return info;
}
+ ///
+ /// Asks the cluster for the specified bucket's configuration.
+ ///
+ ///
+ ///
+ ///
public ClusterConfig ResolveBucket(Uri poolUri, string name)
{
- var root = this.GetBucketsRoot(poolUri);
- var allBuckets = this.DeserializeUri(root);
- var retval = allBuckets.FirstOrDefault(b => b.name == name);
-
- if (retval == null && log.IsWarnEnabled)
- log.WarnFormat("Could not find the pool '{0}' at {1}", name, poolUri);
-
- return retval;
- }
-
- public Uri[] GetBucketStreamingUris(Uri[] pools, string name)
- {
- if (pools == null) throw new ArgumentNullException("pools");
- if (pools.Length == 0) throw new ArgumentException("must specify at least one url", "pools");
+ var info = this.GetClusterInfo(poolUri);
+ var root = new Uri(poolUri, info.buckets.uri);
- List retval = new List(pools.Length);
+ // first try the default auth mechanism: auth if 401, otherwise do nothing
+ var allBuckets = this.DeserializeUri(root, false);
+ var retval = allBuckets.FirstOrDefault(b => b.name == name);
- for (var i = 0; i < pools.Length; i++)
+ // we did not find the bucket
+ if (retval == null)
{
- try
- {
- var current = pools[i];
- var bucket = this.ResolveBucket(current, name);
+ if (log.IsDebugEnabled) log.DebugFormat("Could not find the pool '{0}' at {1}, trying with forceAuth=true", name, poolUri);
- if (bucket != null)
- retval.Add(new Uri(current, bucket.streamingUri));
+ // check if we're connecting to a 1.6.4 server
+ var node = info.nodes == null ? null : info.nodes.FirstOrDefault();
+ if (node == null)
+ {
+ if (log.IsDebugEnabled) log.Debug("No nodes are defined for the first bucket.");
}
- catch (Exception e)
+ else
{
- log.Error(e);
+ // ignore git revisino and other garbage, only take x.y.z
+ var m = Regex.Match(node.version, @"^\d+\.\d+\.\d+");
+ if (!m.Success)
+ {
+ if (log.IsDebugEnabled) log.DebugFormat("Invalid version number: {0}", node.version);
+ }
+ else
+ {
+ var version = new Version(m.Value);
+
+ // let's try to load the config with forced authentication
+ if (version >= new Version(1, 6, 4))
+ {
+ allBuckets = this.DeserializeUri(root, true);
+ retval = allBuckets.FirstOrDefault(b => b.name == name);
+ }
+ else
+ if (log.IsDebugEnabled) log.DebugFormat("This is a {0} server, skipping forceAuth.", node.version);
+ }
}
+
+ if (retval == null)
+ {
+ if (log.IsWarnEnabled) log.WarnFormat("Could not find the pool '{0}' at {1}", name, poolUri);
+ }
+ else if (log.IsDebugEnabled) log.DebugFormat("Found config for bucket {0}.", name);
}
- return retval.Count == 0 ? null : retval.ToArray();
+ return retval;
}
- public ClusterNode[] GetWorkingNodes(Uri[] pools, string name)
+ ///
+ /// Finds the comet endpoint of the specified bucket. This checks all controller urls until one returns a config or all fails.
+ ///
+ ///
+ ///
+ ///
+ public Uri[] GetBucketStreamingUris(Uri[] pools, string name)
{
if (pools == null) throw new ArgumentNullException("pools");
if (pools.Length == 0) throw new ArgumentException("must specify at least one url", "pools");
+ List retval = new List(pools.Length);
+
for (var i = 0; i < pools.Length; i++)
{
try
@@ -117,7 +150,7 @@ public ClusterNode[] GetWorkingNodes(Uri[] pools, string name)
var bucket = this.ResolveBucket(current, name);
if (bucket != null)
- return bucket.nodes.Where(b => b.status == "healthy").ToArray();
+ retval.Add(new Uri(current, bucket.streamingUri));
}
catch (Exception e)
{
@@ -125,22 +158,7 @@ public ClusterNode[] GetWorkingNodes(Uri[] pools, string name)
}
}
- return null;
- }
-
- void IDisposable.Dispose()
- {
- if (this.wcwt != null)
- {
- this.wcwt.Dispose();
- this.wcwt = null;
- GC.SuppressFinalize(this);
- }
- }
-
- ~ConfigHelper()
- {
- ((IDisposable)this).Dispose();
+ return retval.Count == 0 ? null : retval.ToArray();
}
}
}
diff --git a/Membase/Deserialization.cs b/Membase/Deserialization.cs
index 76ea71c0..3b5ab1f0 100644
--- a/Membase/Deserialization.cs
+++ b/Membase/Deserialization.cs
@@ -6,6 +6,18 @@
namespace Membase
{
#pragma warning disable 649
+ internal class ClusterInfo
+ {
+ public string name;
+ public ClusterNode[] nodes;
+ public ClusterBucketInfo buckets;
+ }
+
+ internal class ClusterBucketInfo
+ {
+ public string uri;
+ }
+
internal class ClusterConfig
{
public string name;