Skip to content

Commit

Permalink
Changing endpoint cache to use async, and a single thread for Get req…
Browse files Browse the repository at this point in the history
…uests
  • Loading branch information
phatboyg committed May 19, 2016
1 parent 1d8a07e commit 8fc2b80
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
Expand Up @@ -41,7 +41,7 @@ public ILog Get(string name)
if (name == null)
throw new ArgumentNullException(nameof(name));

return _logs.Get(name).Value.Result;
return _logs.Get(name).Result.Value.Result;
}

public void Shutdown()
Expand Down
Expand Up @@ -65,7 +65,7 @@ public IPublishEndpoint CreatePublishEndpoint(Uri sourceAddress, Guid? correlati

public async Task<ISendEndpoint> GetPublishSendEndpoint(Type messageType)
{
Cached<ISendEndpoint> cached = _cache.Get(messageType);
Cached<ISendEndpoint> cached = await _cache.Get(messageType).ConfigureAwait(false);

var endpoint = await cached.Value.ConfigureAwait(false);

Expand Down
6 changes: 3 additions & 3 deletions src/MassTransit.Tests/LazyMemoryCache_Specs.cs
Expand Up @@ -27,17 +27,17 @@ public async Task Should_store_a_cached_item()
using (var cache = new LazyMemoryCache<Uri, Data>("endpoints", key => Task.FromResult(new Data {Value = $"The Key: {key}"}),
x => x.SlidingWindow(TimeSpan.FromSeconds(5))))
{
var endpoint = await cache.Get(new Uri("loopback://localhost")).Value;
var endpoint = await (await cache.Get(new Uri("loopback://localhost"))).Value;
Console.WriteLine("Endpoint: {0}", endpoint.Created);

await Task.Delay(TimeSpan.FromSeconds(2));

endpoint = await cache.Get(new Uri("loopback://localhost")).Value;
endpoint = await (await cache.Get(new Uri("loopback://localhost"))).Value;
Console.WriteLine("Endpoint: {0}", endpoint.Created);

await Task.Delay(TimeSpan.FromSeconds(10));

endpoint = await cache.Get(new Uri("loopback://localhost")).Value;
endpoint = await (await cache.Get(new Uri("loopback://localhost"))).Value;
Console.WriteLine("Endpoint: {0}", endpoint.Created);
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/MassTransit.Tests/MessageContext_Specs.cs
Expand Up @@ -171,6 +171,7 @@ public void Setup()
});
x.Timeout = TestTimeout;
});
Await(() => _request);
}

protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
Expand Down Expand Up @@ -226,6 +227,8 @@ public void Setup()
{
});
});

Await(() => _request);
}

protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
Expand Down Expand Up @@ -264,6 +267,7 @@ public void Setup()
});
});
Await(() => _request);
}

protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
Expand Down Expand Up @@ -308,6 +312,7 @@ public void Setup()
{
});
});
Await(() => _request);
}
}
}
2 changes: 1 addition & 1 deletion src/MassTransit/Transports/SendEndpointCache.cs
Expand Up @@ -52,7 +52,7 @@ public void Dispose()

public async Task<ISendEndpoint> GetSendEndpoint(Uri address)
{
Cached<ISendEndpoint> cached = _cache.Get(address);
Cached<ISendEndpoint> cached = await _cache.Get(address).ConfigureAwait(false);

var endpoint = await cached.Value.ConfigureAwait(false);

Expand Down
50 changes: 25 additions & 25 deletions src/MassTransit/Util/Caching/LazyMemoryCache.cs
Expand Up @@ -59,6 +59,8 @@ public class LazyMemoryCache<TKey, TValue> :
readonly MemoryCache _cache;
readonly KeyFormatter _keyFormatter;
readonly PolicyProvider _policyProvider;

readonly LimitedConcurrencyLevelTaskScheduler _scheduler;
readonly ValueFactory _valueFactory;
readonly ValueRemoved _valueRemoved;

Expand All @@ -71,11 +73,12 @@ public class LazyMemoryCache<TKey, TValue> :
_valueRemoved = valueRemoved ?? DefaultValueRemoved;

_cache = new MemoryCache(name);
_scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
}

public void Dispose()
{
_cache.Dispose();
Task.Factory.StartNew(() => _cache.Dispose(), CancellationToken.None, TaskCreationOptions.HideScheduler, _scheduler);
}

Task DefaultValueRemoved(string key, TValue value, string reason)
Expand All @@ -95,40 +98,37 @@ static ICacheExpiration DefaultPolicyProvider(ICacheExpirationSelector selector)

void Touch(string textKey)
{
_cache.Get(textKey);
Task.Factory.StartNew(() => _cache.Get(textKey), CancellationToken.None, TaskCreationOptions.HideScheduler, _scheduler);
}

public Cached<TValue> Get(TKey key)
public Task<Cached<TValue>> Get(TKey key)
{
var textKey = _keyFormatter(key);

var result = _cache.Get(textKey) as Cached<TValue>;
if (result != null)
return Task.Factory.StartNew(() =>
{
if (!result.Value.IsFaulted && !result.Value.IsCanceled)
return result;

_cache.Remove(textKey);
}

var cacheItemValue = new CachedValue(_valueFactory, key, () => Touch(textKey));
var cacheItem = new CacheItem(textKey, cacheItemValue);
var cacheItemPolicy = _policyProvider(new CacheExpirationSelector(key)).Policy;
cacheItemPolicy.RemovedCallback = OnCacheItemRemoved;
var textKey = _keyFormatter(key);
var existingItem = _cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
if (existingItem != cacheItem)
{
result = existingItem.Value as CachedValue;
var result = _cache.Get(textKey) as Cached<TValue>;
if (result != null)
{
return result;
if (!result.Value.IsFaulted && !result.Value.IsCanceled)
return result;
_cache.Remove(textKey);
}
_cache.Set(cacheItem, cacheItemPolicy);
}
var cacheItemValue = new CachedValue(_valueFactory, key, () => Touch(textKey));
var cacheItem = new CacheItem(textKey, cacheItemValue);
var cacheItemPolicy = _policyProvider(new CacheExpirationSelector(key)).Policy;
cacheItemPolicy.RemovedCallback = OnCacheItemRemoved;
var added = _cache.Add(cacheItem, cacheItemPolicy);
if (!added)
{
throw new InvalidOperationException($"The item was not added to the cache: {key}");
}
return cacheItemValue;
return cacheItemValue;
}, CancellationToken.None, TaskCreationOptions.HideScheduler, _scheduler);
}

void OnCacheItemRemoved(CacheEntryRemovedArguments arguments)
Expand Down

0 comments on commit 8fc2b80

Please sign in to comment.