Skip to content

Commit

Permalink
Membase 1.6.4 compatibility fix for authenticated buckets.
Browse files Browse the repository at this point in the history
  • Loading branch information
enyim committed Dec 28, 2010
1 parent e399cc8 commit fa13a23
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 160 deletions.
181 changes: 95 additions & 86 deletions Membase/BucketConfigListener.cs
Expand Up @@ -48,7 +48,77 @@ public BucketConfigListener(Uri[] poolUrls, string bucketName, NetworkCredential
/// </summary>
public int DeadTimeout { get; set; }

#region listener cache
/// <summary>
/// Raised when the pool's configuration changes.
/// </summary>
public event Action<ClusterConfig> ClusterConfigChanged;

/// <summary>
/// Starts listening for configuration data. This method blocks until the initial configuration is received. (Or until all pool urls fail.)
/// </summary>
public void Start()
{
var reset = this.mre = new ManualResetEvent(false);

// subscribe to the config url
this.listener = this.GetPooledListener();

// this will be signaled by the config changed event handler
reset.WaitOne();

// set to null, then dispose, so RaiseConfigChanged will not
// fail at Set when the config changes while we're cleaning up here
this.mre = null;
((IDisposable)reset).Dispose();
}

public void Stop()
{
this.ReleaseListener(this.listener);
this.listener = null;
}

private void HandleMessage(string message)
{
// everything failed
if (String.IsNullOrEmpty(message))
{
this.lastHash = null;
this.RaiseConfigChanged(null);
return;
}

// deserialize the buckets
var jss = new JavaScriptSerializer();
var config = jss.Deserialize<ClusterConfig>(message);

// check if the config is the same as the previous
// we cannot compare the messages because they have more information than we deserialize from them
var configHash = config.GetHashCode();

if (lastHash != configHash)
{
lastHash = configHash;
this.RaiseConfigChanged(config);
}
else if (log.IsDebugEnabled)
log.Debug("Last message was the same as current, ignoring.");
}

private void RaiseConfigChanged(ClusterConfig config)
{
var ccc = this.ClusterConfigChanged;

// we got a new config, notify the pool to reload itself
if (ccc != null)
ccc(config);

// trigger the event so Start stops blocking
if (this.mre != null)
this.mre.Set();
}

#region [ message listener pooling ]
private static readonly object ListenerSync = new Object();

// we pool and refcount the listeners here so we can safely dispose them when all clients are destroyed
Expand Down Expand Up @@ -87,6 +157,13 @@ private void ReleaseListener(MessageStreamListener listener)
}
}

/// <summary>
/// Returns a MessageStreamListener instance based on this instance's configuratino (timeout, bucket name etc.)
///
/// When multiple listeners are requested with the exact same parameters (usually when multiple clients are instantiated from the same configuration),
/// the same listener will be returned each time.
/// </summary>
/// <returns></returns>
private MessageStreamListener GetPooledListener()
{
// create a unique key based on the parameters
Expand Down Expand Up @@ -114,11 +191,17 @@ private MessageStreamListener GetPooledListener()
MessageStreamListener retval;

lock (ListenerSync)
{
if (!listeners.TryGetValue(hash, out retval))
if (listeners.TryGetValue(hash, out retval))
{
listeners[hash] = retval = new MessageStreamListener(poolUrls, this.ResolveBucketUri);
listenerRefs[retval] = new ListenerInfo { RefCount = 1, HashKey = hash };
listenerRefs[retval].RefCount++;
retval.Subscribe(this.HandleMessage);
}
else
{
var name = this.bucketName;

// create a new listener for the pool urls
retval = new MessageStreamListener(poolUrls, (client, root) => ResolveBucketUri(client, root, name));

retval.Timeout = this.Timeout;
retval.DeadTimeout = this.DeadTimeout;
Expand All @@ -128,38 +211,25 @@ private MessageStreamListener GetPooledListener()

retval.Subscribe(this.HandleMessage);

listeners[hash] = retval;
listenerRefs[retval] = new ListenerInfo { RefCount = 1, HashKey = hash };

retval.Start();
}
else
{
listenerRefs[retval].RefCount++;
retval.Subscribe(this.HandleMessage);
}
}

return retval;
}

#endregion

private Uri ResolveBucketUri(WebClientWithTimeout client, Uri uri)
private static Uri ResolveBucketUri(WebClientWithTimeout client, Uri root, string bucketName)
{
try
{
var helper = new ConfigHelper(client);
var bucket = helper.ResolveBucket(uri, this.bucketName);
var bucket = helper.ResolveBucket(root, bucketName);
if (bucket == null)
return null;

var streamingUri = bucket.streamingUri;

var node = bucket.nodes.FirstOrDefault();

// beta 2 hack, will be phased out after b3 is released for a while
if (node != null && node.version == "1.6.0beta2")
streamingUri = streamingUri.Replace("/bucketsStreaming/", "/bucketsStreamingConfig/");

return new Uri(uri, streamingUri);
return new Uri(root, bucket.streamingUri);
}
catch (Exception e)
{
Expand All @@ -169,68 +239,7 @@ private Uri ResolveBucketUri(WebClientWithTimeout client, Uri uri)
}
}

public event Action<ClusterConfig> ClusterConfigChanged;

private void HandleMessage(string message)
{
// everything failed
if (String.IsNullOrEmpty(message))
{
this.lastHash = null;
this.RaiseConfigChanged(null);
return;
}

// deserialize the buckets
var jss = new JavaScriptSerializer();
var config = jss.Deserialize<ClusterConfig>(message);

// check if the config is the same as the previous
// we cannot compare the messages because they have more information than we deserialize from them
var configHash = config.GetHashCode();

if (lastHash != configHash)
{
lastHash = configHash;
this.RaiseConfigChanged(config);
}
else if (log.IsDebugEnabled)
log.Debug("Last message was the same as current, ignoring.");
}

public void Start()
{
var reset = this.mre = new ManualResetEvent(false);

// subscribe to the config url
this.listener = this.GetPooledListener();

reset.WaitOne();

// set to null, then dispose, so RaiseConfigChanged will not
// fail at Set when the config changes while we're cleaning up here
this.mre = null;
((IDisposable)reset).Dispose();
}

public void Stop()
{
this.ReleaseListener(this.listener);
this.listener = null;
}

private void RaiseConfigChanged(ClusterConfig config)
{
var ccc = this.ClusterConfigChanged;

// we got a new config, notify the pool to reload itself
if (ccc != null)
ccc(config);

// trigger the event so Start stops blocking
if (this.mre != null)
this.mre.Set();
}
#endregion
}
}

Expand Down

0 comments on commit fa13a23

Please sign in to comment.