Skip to content
Browse files

Merge branch 'master' of github.com:ayende/ravendb

  • Loading branch information...
2 parents 9d220f1 + 3780a65 commit 5763d2b8236efe58c62b98941e2b2b0a44227eba @fitzchak committed
View
35 Raven.Storage.Esent/StorageActions/MappedResults.cs
@@ -141,29 +141,34 @@ public IEnumerable<MappedResultInfo> GetMappedResultsReduceKeysAfter(string inde
if (Api.TrySeek(session, MappedResults, SeekGrbit.SeekLE) == false)
return Enumerable.Empty<MappedResultInfo>();
- var optimizer = new OptimizedIndexReader(Session, MappedResults, take);
+ var results = new Dictionary<string, MappedResultInfo>();
while (
- optimizer.Count < take &&
+ results.Count < take &&
Api.RetrieveColumnAsString(session, MappedResults, tableColumnsCache.MappedResultsColumns["view"], Encoding.Unicode, RetrieveColumnGrbit.RetrieveFromIndex) == indexName)
{
-
- optimizer.Add();
-
- // the index is view ascending and etag descending
+ var mappedResultInfo = new MappedResultInfo
+ {
+ ReduceKey =
+ Api.RetrieveColumnAsString(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key"]),
+ Etag = new Guid(Api.RetrieveColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["etag"])),
+ Timestamp =
+ Api.RetrieveColumnAsDateTime(session, MappedResults, tableColumnsCache.MappedResultsColumns["timestamp"]).
+ Value,
+ Data = loadData
+ ? Api.RetrieveColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"]).
+ ToJObject()
+ : null,
+ };
+
+ results[mappedResultInfo.ReduceKey] = mappedResultInfo;
+
+ // the index is view ascending and etag descending
// that means that we are going backward to go up
if (Api.TryMovePrevious(session, MappedResults) == false)
break;
}
- return optimizer.Select(() => new MappedResultInfo
- {
- ReduceKey = Api.RetrieveColumnAsString(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key"]),
- Etag = new Guid(Api.RetrieveColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["etag"])),
- Timestamp = Api.RetrieveColumnAsDateTime(session, MappedResults, tableColumnsCache.MappedResultsColumns["timestamp"]).Value,
- Data = loadData
- ? Api.RetrieveColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"]).ToJObject()
- : null,
- });
+ return results.Values;
}
}
}
View
15 Raven.Storage.Managed/MappedResultsStorageAction.cs
@@ -92,7 +92,7 @@ public void DeleteMappedResultsForView(string view)
public IEnumerable<MappedResultInfo> GetMappedResultsReduceKeysAfter(string indexName, Guid lastReducedEtag, bool loadData, int take)
{
- return storage.MappedResults["ByViewAndEtagDesc"]
+ var mappedResultInfos = storage.MappedResults["ByViewAndEtagDesc"]
// the index is sorted view ascending and then etag descending
// we index before this index, then backward toward the last one.
.SkipBefore(new RavenJObject { { "view", indexName }, {"etag", lastReducedEtag.ToByteArray()}})
@@ -112,7 +112,18 @@ public IEnumerable<MappedResultInfo> GetMappedResultsReduceKeysAfter(string inde
mappedResultInfo.Data = readResult.Data().ToJObject();
}
return mappedResultInfo;
- }).Take(take);
+ });
+
+ var results = new Dictionary<string, MappedResultInfo>();
+
+ foreach (var mappedResultInfo in mappedResultInfos)
+ {
+ results[mappedResultInfo.ReduceKey] = mappedResultInfo;
+ if (results.Count == take)
+ break;
+ }
+
+ return results.Values;
}
}
}
View
388 Raven.Tests/Storage/ReduceStaleness.cs
@@ -1,198 +1,198 @@
-using System;
-using Raven.Database.Config;
-using Raven.Database.Extensions;
-using Raven.Database.Impl;
-using Raven.Database.Indexing;
-using Raven.Database.Storage;
-using Raven.Json.Linq;
-using EsentTransactionalStorage = Raven.Storage.Esent.TransactionalStorage;
-using MuninTransactionalStorage = Raven.Storage.Managed.TransactionalStorage;
-using Xunit;
-using System.Linq;
-
-namespace Raven.Tests.Storage
-{
- public class ReduceStaleness : IDisposable
- {
- public ReduceStaleness()
- {
- IOExtensions.DeleteDirectory("Test");
- }
-
- [Fact]
- public void Esent_when_there_are_multiple_map_results_for_multiple_indexes()
- {
- using(var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_multiple_map_results_for_multiple_indexes(transactionalStorage);
- }
-
- }
-
-
- [Fact]
- public void Munin_when_there_are_multiple_map_results_for_multiple_indexes()
- {
- using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_multiple_map_results_for_multiple_indexes(transactionalStorage);
- }
-
- }
-
-
-
-
- private static void when_there_are_multiple_map_results_for_multiple_indexes(ITransactionalStorage transactionalStorage)
- {
- transactionalStorage.Initialize(new DummyUuidGenerator());
-
- transactionalStorage.Batch(accessor =>
- {
- accessor.Indexing.AddIndex("a",true);
- accessor.Indexing.AddIndex("b", true);
- accessor.Indexing.AddIndex("c", true);
-
- accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- });
-
- transactionalStorage.Batch(actionsAccessor =>
- {
- Assert.True(actionsAccessor.Staleness.IsReduceStale("a"));
- Assert.True(actionsAccessor.Staleness.IsReduceStale("b"));
- Assert.True(actionsAccessor.Staleness.IsReduceStale("c"));
- });
- }
-
- [Fact]
- public void Esent_when_there_are_multiple_map_results_and_we_ask_for_results()
- {
- using (var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_multiple_map_results_and_we_ask_for_results(transactionalStorage);
- }
-
- }
-
-
- [Fact]
- public void Munin_when_there_are_multiple_map_results_and_we_ask_for_results()
- {
- using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_multiple_map_results_and_we_ask_for_results(transactionalStorage);
- }
- }
-
- private static void when_there_are_multiple_map_results_and_we_ask_for_results(ITransactionalStorage transactionalStorage)
- {
- transactionalStorage.Initialize(new DummyUuidGenerator());
-
- transactionalStorage.Batch(accessor =>
- {
- accessor.Indexing.AddIndex("a", true);
- accessor.Indexing.AddIndex("b", true);
- accessor.Indexing.AddIndex("c", true);
-
- accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- });
-
- transactionalStorage.Batch(actionsAccessor =>
- {
- Assert.Equal(2, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("a", Guid.Empty, false, 100).Count());
- Assert.Equal(2, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("b", Guid.Empty, false, 100).Count());
- Assert.Equal(2, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("c", Guid.Empty, false, 100).Count());
- });
- }
-
- [Fact]
- public void Esent_when_there_are_updates_to_map_reduce_results()
- {
- using (var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_updates_to_map_reduce_results(transactionalStorage);
- }
-
- }
-
-
- [Fact]
- public void Munin_when_there_are_updates_to_map_reduce_results()
- {
- using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
- {
- DataDirectory = "Test"
- }, () => { }))
- {
- when_there_are_updates_to_map_reduce_results(transactionalStorage);
- }
- }
-
-
- private static void when_there_are_updates_to_map_reduce_results(ITransactionalStorage transactionalStorage)
- {
- var dummyUuidGenerator = new DummyUuidGenerator();
- transactionalStorage.Initialize(dummyUuidGenerator);
- Guid a = Guid.Empty;
- Guid b = Guid.Empty;
- Guid c = Guid.Empty;
- transactionalStorage.Batch(accessor =>
- {
- accessor.Indexing.AddIndex("a", true);
- accessor.Indexing.AddIndex("b", true);
- accessor.Indexing.AddIndex("c", true);
-
- accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- a = dummyUuidGenerator.CreateSequentialUuid();
- accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- b = dummyUuidGenerator.CreateSequentialUuid();
- accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- c = dummyUuidGenerator.CreateSequentialUuid();
- accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
- });
-
- transactionalStorage.Batch(actionsAccessor =>
+using System;
+using Raven.Database.Config;
+using Raven.Database.Extensions;
+using Raven.Database.Impl;
+using Raven.Database.Indexing;
+using Raven.Database.Storage;
+using Raven.Json.Linq;
+using EsentTransactionalStorage = Raven.Storage.Esent.TransactionalStorage;
+using MuninTransactionalStorage = Raven.Storage.Managed.TransactionalStorage;
+using Xunit;
+using System.Linq;
+
+namespace Raven.Tests.Storage
+{
+ public class ReduceStaleness : IDisposable
+ {
+ public ReduceStaleness()
+ {
+ IOExtensions.DeleteDirectory("Test");
+ }
+
+ [Fact]
+ public void Esent_when_there_are_multiple_map_results_for_multiple_indexes()
+ {
+ using(var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_multiple_map_results_for_multiple_indexes(transactionalStorage);
+ }
+
+ }
+
+
+ [Fact]
+ public void Munin_when_there_are_multiple_map_results_for_multiple_indexes()
+ {
+ using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_multiple_map_results_for_multiple_indexes(transactionalStorage);
+ }
+
+ }
+
+
+
+
+ private static void when_there_are_multiple_map_results_for_multiple_indexes(ITransactionalStorage transactionalStorage)
+ {
+ transactionalStorage.Initialize(new DummyUuidGenerator());
+
+ transactionalStorage.Batch(accessor =>
+ {
+ accessor.Indexing.AddIndex("a",true);
+ accessor.Indexing.AddIndex("b", true);
+ accessor.Indexing.AddIndex("c", true);
+
+ accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ });
+
+ transactionalStorage.Batch(actionsAccessor =>
+ {
+ Assert.True(actionsAccessor.Staleness.IsReduceStale("a"));
+ Assert.True(actionsAccessor.Staleness.IsReduceStale("b"));
+ Assert.True(actionsAccessor.Staleness.IsReduceStale("c"));
+ });
+ }
+
+ [Fact]
+ public void Esent_when_there_are_multiple_map_results_and_we_ask_for_results()
+ {
+ using (var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_multiple_map_results_and_we_ask_for_results(transactionalStorage);
+ }
+
+ }
+
+
+ [Fact]
+ public void Munin_when_there_are_multiple_map_results_and_we_ask_for_results()
+ {
+ using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_multiple_map_results_and_we_ask_for_results(transactionalStorage);
+ }
+ }
+
+ private static void when_there_are_multiple_map_results_and_we_ask_for_results(ITransactionalStorage transactionalStorage)
+ {
+ transactionalStorage.Initialize(new DummyUuidGenerator());
+
+ transactionalStorage.Batch(accessor =>
+ {
+ accessor.Indexing.AddIndex("a", true);
+ accessor.Indexing.AddIndex("b", true);
+ accessor.Indexing.AddIndex("c", true);
+
+ accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ });
+
+ transactionalStorage.Batch(actionsAccessor =>
+ {
+ Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("a", Guid.Empty, false, 100).Count());
+ Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("b", Guid.Empty, false, 100).Count());
+ Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("c", Guid.Empty, false, 100).Count());
+ });
+ }
+
+ [Fact]
+ public void Esent_when_there_are_updates_to_map_reduce_results()
+ {
+ using (var transactionalStorage = new EsentTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_updates_to_map_reduce_results(transactionalStorage);
+ }
+
+ }
+
+
+ [Fact]
+ public void Munin_when_there_are_updates_to_map_reduce_results()
+ {
+ using (var transactionalStorage = new MuninTransactionalStorage(new RavenConfiguration
+ {
+ DataDirectory = "Test"
+ }, () => { }))
+ {
+ when_there_are_updates_to_map_reduce_results(transactionalStorage);
+ }
+ }
+
+
+ private static void when_there_are_updates_to_map_reduce_results(ITransactionalStorage transactionalStorage)
+ {
+ var dummyUuidGenerator = new DummyUuidGenerator();
+ transactionalStorage.Initialize(dummyUuidGenerator);
+ Guid a = Guid.Empty;
+ Guid b = Guid.Empty;
+ Guid c = Guid.Empty;
+ transactionalStorage.Batch(accessor =>
+ {
+ accessor.Indexing.AddIndex("a", true);
+ accessor.Indexing.AddIndex("b", true);
+ accessor.Indexing.AddIndex("c", true);
+
+ accessor.MappedResults.PutMappedResult("a", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ a = dummyUuidGenerator.CreateSequentialUuid();
+ accessor.MappedResults.PutMappedResult("a", "a/2", "a", new RavenJObject(), MapReduceIndex.ComputeHash("a", "a"));
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ b = dummyUuidGenerator.CreateSequentialUuid();
+ accessor.MappedResults.PutMappedResult("b", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("b", "a"));
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ c = dummyUuidGenerator.CreateSequentialUuid();
+ accessor.MappedResults.PutMappedResult("c", "a/1", "a", new RavenJObject(), MapReduceIndex.ComputeHash("c", "a"));
+ });
+
+ transactionalStorage.Batch(actionsAccessor =>
{
Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("a", a, false, 100).Count());
Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("b", b, false, 100).Count());
- Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("c", c, false, 100).Count());
- });
- }
-
- /// <summary>
- /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
- /// </summary>
- /// <filterpriority>2</filterpriority>
- public void Dispose()
- {
- IOExtensions.DeleteDirectory("Test");
- }
- }
+ Assert.Equal(1, actionsAccessor.MappedResults.GetMappedResultsReduceKeysAfter("c", c, false, 100).Count());
+ });
+ }
+
+ /// <summary>
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+ /// </summary>
+ /// <filterpriority>2</filterpriority>
+ public void Dispose()
+ {
+ IOExtensions.DeleteDirectory("Test");
+ }
+ }
}
View
85 Raven.Tryouts/Program.cs
@@ -1,10 +1,93 @@
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using Raven.Client.Document;
+using Raven.Client.Embedded;
+using Raven.Client.Indexes;
+
namespace Raven.Tryouts
{
+ public class Foo
+ {
+ public string Bar { get; set; }
+ }
+
+ public class SimpleReduceResult
+ {
+ public string Bar { get; set; }
+ public long Count { get; set; }
+ }
+
+ public class SimpleMRIndex : AbstractIndexCreationTask<Foo,SimpleReduceResult>
+ {
+ public SimpleMRIndex()
+ {
+ Map = foos => from foo in foos
+ select new
+ {
+ Bar = foo.Bar,
+ Count = 1L
+ };
+
+ Reduce = results => from result in results
+ group result by result.Bar
+ into g
+ select new
+ {
+ Bar = g.Key,
+ Count = g.Sum(c => c.Count)
+ };
+ }
+ }
+
class Program
{
- private static void Main()
+ static void Main(string[] args)
{
+ using (var store = new EmbeddableDocumentStore
+ {
+ DataDirectory = @"~\data",
+ UseEmbeddedHttpServer = true
+ }.Initialize())
+ {
+
+ var sp = Stopwatch.StartNew();
+ for (int i = 0; i < 5000; i++)
+ {
+ using (var session = store.OpenSession())
+ {
+ for (int j = 0; j < 100; j++)
+ {
+ session.Store(new Foo { Bar = "IamBar" });
+ }
+ session.SaveChanges();
+ }
+ if(i % 100 == 0)
+ {
+ Console.Write(".");
+ }
+ }
+
+ Console.Clear();
+ Console.WriteLine("Wrote 500,000 docs in " + sp.Elapsed);
+ Console.WriteLine("Done inserting data");
+
+ sp.Restart();
+ new SimpleMRIndex().Execute(store);
+ Console.WriteLine("indexing...");
+
+ while (store.DatabaseCommands.GetStatistics().StaleIndexes.Length > 0)
+ {
+ Console.Write("\r{0:#,#} - {1:#,#}",store.DatabaseCommands.GetStatistics().Indexes[0].IndexingAttempts, store.DatabaseCommands.GetStatistics().Indexes[0].ReduceIndexingAttempts);
+ Thread.Sleep(100);
+ }
+ Console.WriteLine();
+ Console.WriteLine("Indexed 500,000 docs in " + sp.Elapsed);
+
+ Console.ReadLine();
+ }
}
}
}

0 comments on commit 5763d2b

Please sign in to comment.
Something went wrong with that request. Please try again.