Skip to content

Commit

Permalink
Add Parallel shard access using .NET 4.0 Tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
webpaul committed Apr 7, 2010
1 parent b9e7f41 commit 095d9e6
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
1 change: 1 addition & 0 deletions Raven.Client.Tests/Raven.Client.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<ItemGroup>
<Compile Include="BaseTest.cs" />
<Compile Include="Document\When_Using_Multiple_Unsharded_Servers.cs" />
<Compile Include="Shard\When_Using_Parallel_Access_Strategy.cs" />
<Compile Include="Shard\When_Using_Sharded_Servers.cs" />
<Compile Include="Document\DocumentStoreEmbeddedTests.cs" />
<Compile Include="Document\DocumentStoreServerTests.cs" />
Expand Down
58 changes: 58 additions & 0 deletions Raven.Client.Tests/Shard/When_Using_Parallel_Access_Strategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.IO;
using System.Reflection;
using System.Threading;
using Raven.Database;
using Raven.Server;
using Xunit;
using System.Collections.Generic;
using Raven.Client.Shard;
using Rhino.Mocks;
using Raven.Client.Shard.ShardStrategy.ShardAccess;

namespace Raven.Client.Tests
{
public class When_Using_Parallel_Access_Strategy : BaseTest
{
[Fact]
public void Can_get_complete_result_list()
{
var shard1 = MockRepository.GenerateStub<IDocumentSession>();
shard1.Stub(x => x.GetAll<Company>())
//.Callback(() => { Thread.Sleep(500); return true; })
.Return(new[] { new Company { Name = "Company1" } });

var shard2 = MockRepository.GenerateStub<IDocumentSession>();
shard2.Stub(x => x.GetAll<Company>())
//.Callback(() => { Thread.Sleep(100); return true; })
.Return(new[] { new Company { Name = "Company2" } });

var results = new ParallelShardAccessStrategy().Apply(new[] { shard1, shard2 }, x => x.GetAll<Company>());

Assert.Equal(2, results.Count);
}

[Fact]
public void Null_result_is_not_an_exception()
{
var shard1 = MockRepository.GenerateStub<IDocumentSession>();
shard1.Stub(x => x.GetAll<Company>()).Return(null);

var results = new ParallelShardAccessStrategy().Apply(new[] { shard1 }, x => x.GetAll<Company>());

Assert.Equal(0, results.Count);
}

[Fact]
public void Execution_exceptions_are_rethrown()
{
var shard1 = MockRepository.GenerateStub<IDocumentSession>();
shard1.Stub(x => x.GetAll<Company>()).Throw(new ApplicationException("Oh noes!"));

Assert.Throws(typeof(ApplicationException), () =>
{
new ParallelShardAccessStrategy().Apply(new[] { shard1 }, x => x.GetAll<Company>());
});
}
}
}
1 change: 1 addition & 0 deletions Raven.Client/Raven.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<Compile Include="Document\IDocumentSession.cs" />
<Compile Include="Document\IDocumentStore.cs" />
<Compile Include="Shard\ShardStrategy\ShardAccess\IShardAccessStrategy.cs" />
<Compile Include="Shard\ShardStrategy\ShardAccess\ParallelShardAccessStrategy.cs" />
<Compile Include="Shard\ShardStrategy\ShardResolution\IShardResolutionStrategy.cs" />
<Compile Include="Shard\ShardStrategy\ShardSelection\IShardSelectionStrategy.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Raven.Client.Shard.ShardStrategy.ShardAccess
{
public class ParallelShardAccessStrategy: IShardAccessStrategy
{
public IList<T> Apply<T>(IList<IDocumentSession> shardSessions, Func<IDocumentSession, IList<T>> operation)
{
var returnList = new List<T>();

//List.AddRange not threadsafe, make sure addrange calls don't happen concurrently
object lockObject = new object();

shardSessions
.Select(shardSession =>
Task.Factory
.StartNew(() => operation(shardSession))
.AddToListOnComplete(lockObject, returnList)
)
.WaitAll()
;

return returnList;
}
}

internal static class ParallelExtensions
{
public static Task AddToListOnComplete<T>(this Task<IList<T>> task, object lockObject, List<T> returnList)
{
return task.ContinueWith(x => {
lock (lockObject)
{
if (x.Result != null)
returnList.AddRange(x.Result);
}
});
}

public static void WaitAll(this IEnumerable<Task> tasks)
{
try
{
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{
//when task takes exception it wraps in aggregate exception, if in continuation
//then could be double wrapped, etc. This should always get us the original
while (true)
{
if (ex.InnerException == null || !(ex is AggregateException))
throw ex;
else
ex = ex.InnerException;
}
}
}
}
}

0 comments on commit 095d9e6

Please sign in to comment.