Skip to content

Commit

Permalink
Auto tuner for Reduce operations
Browse files Browse the repository at this point in the history
  • Loading branch information
configurator committed May 9, 2012
1 parent 90b59ee commit 9f8a7dd
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Raven.Abstractions/Data/DatabaseStatistics.cs
Expand Up @@ -19,6 +19,8 @@ public class DatabaseStatistics
public string[] StaleIndexes { get; set; }

public int CurrentNumberOfItemsToIndexInSingleBatch { get; set; }

public int CurrentNumberOfItemsToReduceInSingleBatch { get; set; }

public IndexStats[] Indexes { get; set; }

Expand Down
31 changes: 29 additions & 2 deletions Raven.Database/Config/InMemoryRavenConfiguration.cs
Expand Up @@ -37,8 +37,10 @@ public InMemoryRavenConfiguration()
Settings = new NameValueCollection(StringComparer.InvariantCultureIgnoreCase);

BackgroundTasksPriority = ThreadPriority.Normal;
MaxNumberOfItemsToIndexInSingleBatch = Environment.Is64BitProcess ? 128*1024 : 64*1024;
MaxNumberOfItemsToIndexInSingleBatch = Environment.Is64BitProcess ? 128 * 1024 : 64 * 1024;
MaxNumberOfItemsToReduceInSingleBatch = MaxNumberOfItemsToIndexInSingleBatch / 2;
InitialNumberOfItemsToIndexInSingleBatch = Environment.Is64BitProcess ? 512 : 256;
InitialNumberOfItemsToReduceInSingleBatch = InitialNumberOfItemsToIndexInSingleBatch / 2;

AvailableMemoryForRaisingIndexBatchSizeLimit = Math.Min(768, MemoryStatistics.TotalPhysicalMemory/2);
MaxNumberOfParallelIndexTasks = 8;
Expand Down Expand Up @@ -110,6 +112,19 @@ public void Initialize()
InitialNumberOfItemsToIndexInSingleBatch = Math.Min(int.Parse(initialNumberOfItemsToIndexInSingleBatch),
MaxNumberOfItemsToIndexInSingleBatch);
}
var maxNumberOfItemsToReduceInSingleBatch = Settings["Raven/MaxNumberOfItemsToReduceInSingleBatch"];
if (maxNumberOfItemsToReduceInSingleBatch != null)
{
MaxNumberOfItemsToReduceInSingleBatch = Math.Max(int.Parse(maxNumberOfItemsToReduceInSingleBatch), 128);
InitialNumberOfItemsToReduceInSingleBatch = Math.Min(MaxNumberOfItemsToReduceInSingleBatch,
InitialNumberOfItemsToReduceInSingleBatch);
}
var initialNumberOfItemsToReduceInSingleBatch = Settings["Raven/InitialNumberOfItemsToReduceInSingleBatch"];
if (initialNumberOfItemsToReduceInSingleBatch != null)
{
InitialNumberOfItemsToReduceInSingleBatch = Math.Min(int.Parse(initialNumberOfItemsToReduceInSingleBatch),
MaxNumberOfItemsToReduceInSingleBatch);
}

var maxNumberOfParallelIndexTasks = Settings["Raven/MaxNumberOfParallelIndexTasks"];
MaxNumberOfParallelIndexTasks = maxNumberOfParallelIndexTasks != null ? int.Parse(maxNumberOfParallelIndexTasks) : Environment.ProcessorCount;
Expand Down Expand Up @@ -347,10 +362,22 @@ public string ServerUrl

/// <summary>
/// The initial number of items to take when indexing a batch
/// Default: 5,000
/// Default: 512 or 256 depending on CPU architecture
/// </summary>
public int InitialNumberOfItemsToIndexInSingleBatch { get; set; }

/// <summary>
/// Max number of items to take for reducing in a batch
/// Minimum: 128
/// </summary>
public int MaxNumberOfItemsToReduceInSingleBatch { get; set; }

/// <summary>
/// The initial number of items to take when reducing a batch
/// Default: 256 or 128 depending on CPU architecture
/// </summary>
public int InitialNumberOfItemsToReduceInSingleBatch { get; set; }

/// <summary>
/// The maximum number of indexing tasks allowed to run in parallel
/// Default: The number of processors in the current machine
Expand Down
1 change: 1 addition & 0 deletions Raven.Database/DocumentDatabase.cs
Expand Up @@ -246,6 +246,7 @@ public DatabaseStatistics Statistics
var result = new DatabaseStatistics
{
CurrentNumberOfItemsToIndexInSingleBatch = workContext.CurrentNumberOfItemsToIndexInSingleBatch,
CurrentNumberOfItemsToReduceInSingleBatch = workContext.CurrentNumberOfItemsToReduceInSingleBatch,
CountOfIndexes = IndexStorage.Indexes.Length,
Errors = workContext.Errors,
Triggers = PutTriggers.Select(x => new DatabaseStatistics.TriggerInfo {Name = x.ToString(), Type = "Put"})
Expand Down
4 changes: 1 addition & 3 deletions Raven.Database/Indexing/AbstractIndexingExecuter.cs
Expand Up @@ -19,15 +19,13 @@ public abstract class AbstractIndexingExecuter
protected ITransactionalStorage transactionalStorage;
protected int workCounter;
protected int lastFlushedWorkCounter;
protected IndexBatchSizeAutoTuner autoTuner;
protected BaseBatchSizeAutoTuner autoTuner;

protected AbstractIndexingExecuter(ITransactionalStorage transactionalStorage, WorkContext context, TaskScheduler scheduler)
{
this.transactionalStorage = transactionalStorage;
this.context = context;
this.scheduler = scheduler;

autoTuner = new IndexBatchSizeAutoTuner(context);
}

public void Execute()
Expand Down
40 changes: 25 additions & 15 deletions Raven.Database/Indexing/BaseBatchSizeAutoTuner.cs
@@ -1,26 +1,28 @@
using System;
using Raven.Database.Config;
using System.Linq;
using System.Collections.Generic;

namespace Raven.Database.Indexing
{
public class BaseBatchSizeAutoTuner
public abstract class BaseBatchSizeAutoTuner
{
private readonly WorkContext context;
private int numberOfItemsToIndexInSingleBatch;
protected readonly WorkContext context;

private int currentNumber;

public BaseBatchSizeAutoTuner(WorkContext context)
{
this.context = context;
NumberOfItemsToIndexInSingleBatch = context.Configuration.InitialNumberOfItemsToIndexInSingleBatch;
this.NumberOfItemsToIndexInSingleBatch = InitialNumberOfItems;
}

public int NumberOfItemsToIndexInSingleBatch
{
get { return numberOfItemsToIndexInSingleBatch; }
get { return currentNumber; }
set
{
context.CurrentNumberOfItemsToIndexInSingleBatch = numberOfItemsToIndexInSingleBatch = value;
CurrentNumberOfItems = currentNumber = value;
}
}

Expand All @@ -36,7 +38,7 @@ public void AutoThrottleBatchSize(int amountOfItemsToIndex, int size)
}
finally
{
context.Configuration.IndexingScheduler.RecordAmountOfItemsToIndex(amountOfItemsToIndex);
RecordAmountOfItems(amountOfItemsToIndex);
}
}

Expand All @@ -47,7 +49,7 @@ private void ConsiderIncreasingBatchSize(int amountOfItemsToIndex, int size)
return;
}

if (context.Configuration.IndexingScheduler.GetLastAmountOfItemsToIndex().Any(x => x < NumberOfItemsToIndexInSingleBatch))
if (GetLastAmountOfItems().Any(x => x < NumberOfItemsToIndexInSingleBatch))
{
// this is the first time we hit the limit, we will give another go before we increase
// the batch size
Expand All @@ -74,7 +76,7 @@ private void ConsiderIncreasingBatchSize(int amountOfItemsToIndex, int size)

if (remainingMemoryAfterBatchSizeIncrease >= context.Configuration.AvailableMemoryForRaisingIndexBatchSizeLimit)
{
NumberOfItemsToIndexInSingleBatch = Math.Min(context.Configuration.MaxNumberOfItemsToIndexInSingleBatch,
NumberOfItemsToIndexInSingleBatch = Math.Min(MaxNumberOfItems,
NumberOfItemsToIndexInSingleBatch * 2);
return;
}
Expand Down Expand Up @@ -110,7 +112,7 @@ private bool ReduceBatchSizeIfCloseToMemoryCeiling()

// we are still too high, let us reduce the size and see what is going on.

NumberOfItemsToIndexInSingleBatch = Math.Max(context.Configuration.InitialNumberOfItemsToIndexInSingleBatch,
NumberOfItemsToIndexInSingleBatch = Math.Max(InitialNumberOfItems,
NumberOfItemsToIndexInSingleBatch / 2);

return true;
Expand All @@ -128,18 +130,18 @@ private bool ConsiderDecreasingBatchSize(int amountOfItemsToIndex)
// we didn't have a lot of work to do, so let us see if we can reduce the batch size

// we are at the configured minimum, nothing to do
if (NumberOfItemsToIndexInSingleBatch == context.Configuration.InitialNumberOfItemsToIndexInSingleBatch)
if (NumberOfItemsToIndexInSingleBatch == InitialNumberOfItems)
return true;

// we were above the max the last times, we can't reduce the work load now
if (context.Configuration.IndexingScheduler.GetLastAmountOfItemsToIndex().Any(x => x > NumberOfItemsToIndexInSingleBatch))
if (GetLastAmountOfItems().Any(x => x > NumberOfItemsToIndexInSingleBatch))
return true;

var old = NumberOfItemsToIndexInSingleBatch;
// we have had a couple of times were we didn't get to the current max, so we can probably
// reduce the max again now, this will reduce the memory consumption eventually, and will cause
// faster indexing times in case we get a big batch again
NumberOfItemsToIndexInSingleBatch = Math.Max(context.Configuration.InitialNumberOfItemsToIndexInSingleBatch,
NumberOfItemsToIndexInSingleBatch = Math.Max(InitialNumberOfItems,
NumberOfItemsToIndexInSingleBatch / 2);

// we just reduced the batch size because we have two concurrent runs where we had
Expand All @@ -164,12 +166,20 @@ private bool ConsiderDecreasingBatchSize(int amountOfItemsToIndex)
public void OutOfMemoryExceptionHappened()
{
// first thing to do, reset the number of items per batch
NumberOfItemsToIndexInSingleBatch = context.Configuration.InitialNumberOfItemsToIndexInSingleBatch;
NumberOfItemsToIndexInSingleBatch = InitialNumberOfItems;

// now, we need to be more conservative about how we are increasing memory usage, so instead of increasing
// every time we hit the limit twice, we will increase every time we hit it three times, then 5, 9, etc

context.Configuration.IndexingScheduler.LastAmountOfItemsToIndexToRemember *= 2;
LastAmountOfItemsToRemember *= 2;
}

// The following methods and properties are wrappers around members of the context which are different for the different indexes
protected abstract int InitialNumberOfItems { get; }
protected abstract int MaxNumberOfItems { get; }
protected abstract int CurrentNumberOfItems { get; set; }
protected abstract int LastAmountOfItemsToRemember { get; set; }
protected abstract void RecordAmountOfItems(int numberOfItems);
protected abstract IEnumerable<int> GetLastAmountOfItems();
}
}
51 changes: 41 additions & 10 deletions Raven.Database/Indexing/FairIndexingSchedulerWithNewIndexesBias.cs
Expand Up @@ -15,17 +15,21 @@ public class FairIndexingSchedulerWithNewIndexesBias : IIndexingScheduler
private int currentRepeated;
private bool activeFiltering;
private List<int> lastAmountOfItemsToIndex = new List<int>();
private List<int> lastAmountOfItemsToReduce = new List<int>();

public FairIndexingSchedulerWithNewIndexesBias()
{
LastAmountOfItemsToIndexToRemember = 1;
LastAmountOfItemsToReduceToRemember = 1;
}

public int LastAmountOfItemsToIndexToRemember { get; set; }

public int LastAmountOfItemsToReduceToRemember { get; set; }

public IList<IndexToWorkOn> FilterMapIndexes(IList<IndexToWorkOn> indexes)
{
if(indexes.Count == 0)
if (indexes.Count == 0)
return indexes;

var indexesByIndexedEtag = indexes
Expand All @@ -40,7 +44,7 @@ public IList<IndexToWorkOn> FilterMapIndexes(IList<IndexToWorkOn> indexes)
activeFiltering = false;
return indexes; // they all have the same one, so there aren't any delayed / new indexes
}

activeFiltering = true;

// we have indexes that haven't all caught up with up yet, so we need to start cycling through the
Expand All @@ -66,16 +70,16 @@ public void RecordAmountOfItemsToIndex(int value)
{
var currentLastAmountOfItemsToIndex = lastAmountOfItemsToIndex;
var amountOfItemsToIndex = activeFiltering && currentLastAmountOfItemsToIndex.Count > 0
? // if we are actively filtering, we have multiple levels, so we have to assume
// that the max amount is still the current one, this prevent the different levels of indexing batch
// size from "fighting" over the batch size.
Math.Max(currentLastAmountOfItemsToIndex.Max(), value)
: value;
// if we are actively filtering, we have multiple levels, so we have to assume
// that the max amount is still the current one, this prevent the different levels of indexing batch
// size from "fighting" over the batch size.
? Math.Max(currentLastAmountOfItemsToIndex.Max(), value)
: value;

var amountToTake = currentLastAmountOfItemsToIndex.Count;
if (amountToTake + 1 >= LastAmountOfItemsToIndexToRemember)
{
amountToTake = currentLastAmountOfItemsToIndex.Count-1;
amountToTake = currentLastAmountOfItemsToIndex.Count - 1;
}
lastAmountOfItemsToIndex = new List<int>(currentLastAmountOfItemsToIndex.Take(amountToTake))
{
Expand All @@ -84,11 +88,38 @@ public void RecordAmountOfItemsToIndex(int value)

}

public void RecordAmountOfItemsToReduce(int value)
{
var currentLastAmountOfItemsToReduce = lastAmountOfItemsToReduce;
var amountOfItemsToReduce = activeFiltering && currentLastAmountOfItemsToReduce.Count > 0
// if we are actively filtering, we have multiple levels, so we have to assume
// that the max amount is still the current one, this prevent the different levels of indexing batch
// size from "fighting" over the batch size.
? Math.Max(currentLastAmountOfItemsToReduce.Max(), value)
: value;

var amountToTake = currentLastAmountOfItemsToReduce.Count;
if (amountToTake + 1 >= LastAmountOfItemsToReduceToRemember)
{
amountToTake = currentLastAmountOfItemsToReduce.Count - 1;
}
lastAmountOfItemsToReduce = new List<int>(currentLastAmountOfItemsToReduce.Take(amountToTake))
{
amountOfItemsToReduce
};

}

public IEnumerable<int> GetLastAmountOfItemsToIndex()
{
return lastAmountOfItemsToIndex;
}

public IEnumerable<int> GetLastAmountOfItemsToReduce()
{
return lastAmountOfItemsToReduce;
}

// here we compare, but only up to the last 116 bits, not the full 128 bits
// this means that we can gather documents that are within 4K docs from one another, because
// at that point, it doesn't matter much, it would be gone within one or two indexing cycles
Expand All @@ -105,10 +136,10 @@ public int GetHashCode(Guid obj)
var bytes = obj.ToByteArray();
for (var i = 0; i < 14; i++)
{
start = (start*397) ^ bytes[i];
start = (start * 397) ^ bytes[i];
}
var last4Bits = bytes[15] >> 4;
start = (start*397) ^ last4Bits;
start = (start * 397) ^ last4Bits;
return start;
}

Expand Down
3 changes: 3 additions & 0 deletions Raven.Database/Indexing/IIndexingScheduler.cs
Expand Up @@ -6,7 +6,10 @@ public interface IIndexingScheduler
{
IList<IndexToWorkOn> FilterMapIndexes(IList<IndexToWorkOn> indexes);
void RecordAmountOfItemsToIndex(int value);
void RecordAmountOfItemsToReduce(int value);
IEnumerable<int> GetLastAmountOfItemsToIndex();
IEnumerable<int> GetLastAmountOfItemsToReduce();
int LastAmountOfItemsToIndexToRemember { get; set; }
int LastAmountOfItemsToReduceToRemember { get; set; }
}
}
47 changes: 47 additions & 0 deletions Raven.Database/Indexing/IndexBatchSizeAutoTuner.cs
@@ -0,0 +1,47 @@
using System;
using Raven.Database.Config;
using System.Linq;
using System.Collections.Generic;

namespace Raven.Database.Indexing
{
public class IndexBatchSizeAutoTuner : BaseBatchSizeAutoTuner
{
public IndexBatchSizeAutoTuner(WorkContext context)
: base(context)
{
}

protected override int InitialNumberOfItems
{
get { return context.Configuration.InitialNumberOfItemsToIndexInSingleBatch; }
}

protected override int MaxNumberOfItems
{
get { return context.Configuration.MaxNumberOfItemsToIndexInSingleBatch; }
}

protected override int CurrentNumberOfItems
{
get { return context.CurrentNumberOfItemsToIndexInSingleBatch; }
set { context.CurrentNumberOfItemsToIndexInSingleBatch = value; }
}

protected override int LastAmountOfItemsToRemember
{
get { return context.Configuration.IndexingScheduler.LastAmountOfItemsToIndexToRemember; }
set { context.Configuration.IndexingScheduler.LastAmountOfItemsToIndexToRemember = value; }
}

protected override void RecordAmountOfItems(int numberOfItems)
{
context.Configuration.IndexingScheduler.RecordAmountOfItemsToIndex(numberOfItems);
}

protected override IEnumerable<int> GetLastAmountOfItems()
{
return context.Configuration.IndexingScheduler.GetLastAmountOfItemsToIndex();
}
}
}
1 change: 1 addition & 0 deletions Raven.Database/Indexing/IndexingExecuter.cs
Expand Up @@ -25,6 +25,7 @@ public class IndexingExecuter : AbstractIndexingExecuter
public IndexingExecuter(ITransactionalStorage transactionalStorage, WorkContext context, TaskScheduler scheduler)
: base(transactionalStorage, context, scheduler)
{
autoTuner = new IndexBatchSizeAutoTuner(context);
}

protected override bool IsIndexStale(IndexStats indexesStat, IStorageActionsAccessor actions)
Expand Down

0 comments on commit 9f8a7dd

Please sign in to comment.