Skip to content

Commit

Permalink
Perf/sorted batch (#6142)
Browse files Browse the repository at this point in the history
* Sorted keccak batch

* Using list only

* Change algo

* Remove valuekeccak key

* Whitespace
  • Loading branch information
asdacap committed Sep 29, 2023
1 parent 63070a3 commit b3bc144
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public bool HasBlock(long blockNumber, Keccak blockHash)

public void EnsureCanonical(Block block)
{
using IBatch batch = _transactionDb.StartBatch();
using IBatch batch = _transactionDb.StartBatch().ToSortedBatch();

long headNumber = _blockTree.FindBestSuggestedHeader()?.Number ?? 0;

Expand Down
60 changes: 60 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/SortedBatchTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test.Builders;
using NUnit.Framework;

namespace Nethermind.Core.Test;

public class SortedBatchTests
{
[Test]
public void Test_BatchWillSort()
{
TestBatch baseBatch = new TestBatch();

IBatch sortedBatch = baseBatch.ToSortedBatch();

IList<byte[]> expectedOrder = new List<byte[]>();
for (int i = 0; i < 10; i++)
{
sortedBatch[TestItem.ValueKeccaks[i].ToByteArray()] = TestItem.ValueKeccaks[i].ToByteArray();
expectedOrder.Add(TestItem.ValueKeccaks[i].ToByteArray());
}

baseBatch.DisposeCalled.Should().BeFalse();
baseBatch.SettedValues.Count.Should().Be(0);

sortedBatch.Dispose();

baseBatch.DisposeCalled.Should().BeTrue();
expectedOrder = expectedOrder.Order(Bytes.Comparer).ToList();
baseBatch.SettedValues.Should().BeEquivalentTo(expectedOrder);
}

private class TestBatch : IBatch
{
public bool DisposeCalled { get; set; }
public List<byte[]> SettedValues { get; set; } = new();

public void Dispose()
{
DisposeCalled = true;
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return null;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
SettedValues.Add(key.ToArray());
}
}
}
96 changes: 96 additions & 0 deletions src/Nethermind/Nethermind.Core/SortedBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Concurrent;
using System.Linq;
using Nethermind.Core.Collections;
using Nethermind.Core.Extensions;

namespace Nethermind.Core;

/// <summary>
/// Rocksdb's skiplist memtable have a fast path when the keys are inserted in ascending order. Otherwise, its seems
/// to be limited to 1Mil writes per second.
/// TODO: FlatDB layout need different key type to match the DB ordering.
/// </summary>
public class SortedBatch : IBatch
{
private const int InitialBatchSize = 300;
private static readonly int MaxCached = Environment.ProcessorCount;

private static ConcurrentQueue<SpanDictionary<byte, byte[]?>> s_cache = new();

private readonly IBatch _baseBatch;
private WriteFlags _writeFlags = WriteFlags.None;

private SpanDictionary<byte, byte[]?> _batchData = CreateOrGetFromCache();

public SortedBatch(IBatch dbOnTheRocks)
{
_baseBatch = dbOnTheRocks;
}

public void Dispose()
{
SpanDictionary<byte, byte[]?> batchData = _batchData;
// Clear the _batchData reference so is null if Get is called.
_batchData = null!;
if (batchData.Count == 0)
{
// No data to write, skip empty batches
ReturnToCache(batchData);
_baseBatch.Dispose();
return;
}

// Sort the batch by key
foreach (var kv in batchData.OrderBy(i => i.Key, Bytes.Comparer))
{
_baseBatch.Set(kv.Key, kv.Value, _writeFlags);
}
ReturnToCache(batchData);

_baseBatch.Dispose();
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
// Not checking _isDisposing here as for some reason, sometimes is is read after dispose
return _batchData?.TryGetValue(key, out byte[]? value) ?? false ? value : _baseBatch.Get(key);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_batchData[key] = value;

_writeFlags = flags;
}

private static SpanDictionary<byte, byte[]?> CreateOrGetFromCache()
{
if (s_cache.TryDequeue(out SpanDictionary<byte, byte[]?>? batchData))
{
return batchData;
}

return new SpanDictionary<byte, byte[]?>(InitialBatchSize, Bytes.SpanEqualityComparer);
}

private static void ReturnToCache(SpanDictionary<byte, byte[]?> batchData)
{
if (s_cache.Count >= MaxCached) return;

batchData.Clear();
batchData.TrimExcess(capacity: InitialBatchSize);
s_cache.Enqueue(batchData);
}
}

public static class BatchExtensions
{
public static SortedBatch ToSortedBatch(this IBatch batch)
{
return new SortedBatch(batch);
}
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
}
else
{
_underlyingBatch.Set(key, value, _columnDb._columnFamily);
_underlyingBatch.Set(key, value, _columnDb._columnFamily, flags);
}
}
}
Expand Down
72 changes: 12 additions & 60 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -788,20 +788,15 @@ public IBatch StartBatch()

internal class RocksDbBatch : IBatch
{
private const int InitialBatchSize = 300;
private static readonly int MaxCached = Environment.ProcessorCount;

private static ConcurrentQueue<SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)>> s_cache = new();

private readonly DbOnTheRocks _dbOnTheRocks;
private readonly WriteBatch _rocksBatch;
private WriteFlags _writeFlags = WriteFlags.None;
private bool _isDisposed;

private SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> _batchData = CreateOrGetFromCache();

public RocksDbBatch(DbOnTheRocks dbOnTheRocks)
{
_dbOnTheRocks = dbOnTheRocks;
_rocksBatch = new WriteBatch();

if (_dbOnTheRocks._isDisposing)
{
Expand All @@ -822,37 +817,11 @@ public void Dispose()
}
_isDisposed = true;

SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> batchData = _batchData;
// Clear the _batchData reference so is null if Get is called.
_batchData = null!;
if (batchData.Count == 0)
{
// No data to write, skip empty batches
ReturnToCache(batchData);
_dbOnTheRocks._currentBatches.TryRemove(this);
return;
}

try
{
WriteBatch rocksBatch = new();
// Sort the batch by key
foreach (var kv in batchData.OrderBy(i => i.Key, Bytes.Comparer))
{
if (kv.Value.data is null)
{
rocksBatch.Delete(kv.Key, kv.Value.cf);
}
else
{
rocksBatch.Put(kv.Key, kv.Value.data, kv.Value.cf);
}
}
ReturnToCache(batchData);

_dbOnTheRocks._db.Write(rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags));
_dbOnTheRocks._db.Write(_rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags));
_dbOnTheRocks._currentBatches.TryRemove(this);
rocksBatch.Dispose();
_rocksBatch.Dispose();
}
catch (RocksDbSharpException e)
{
Expand All @@ -864,7 +833,9 @@ public void Dispose()
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
// Not checking _isDisposing here as for some reason, sometimes is is read after dispose
return _batchData?.TryGetValue(key, out var value) ?? false ? value.data : _dbOnTheRocks.Get(key, flags);
// Note: The batch itself is not checked. Will need to use WriteBatchWithIndex to do that, but that will
// hurt perf.
return _dbOnTheRocks.Get(key, flags);
}

public void Delete(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf = null)
Expand All @@ -874,17 +845,18 @@ public void Delete(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf = null)
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (null, cf);
_rocksBatch.Delete(key, cf);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, ColumnFamilyHandle? cf = null)
public void Set(ReadOnlySpan<byte> key, byte[]? value, ColumnFamilyHandle? cf = null, WriteFlags flags = WriteFlags.None)
{
if (_isDisposed)
{
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (value, cf);
_rocksBatch.Put(key, value, cf);
_writeFlags = flags;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
Expand All @@ -894,29 +866,9 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (value, null);

_rocksBatch.Put(key, value);
_writeFlags = flags;
}

private static SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> CreateOrGetFromCache()
{
if (s_cache.TryDequeue(out SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)>? batchData))
{
return batchData;
}

return new(InitialBatchSize, Bytes.SpanEqualityComparer);
}

private static void ReturnToCache(SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> batchData)
{
if (s_cache.Count >= MaxCached) return;

batchData.Clear();
batchData.TrimExcess(capacity: InitialBatchSize);
s_cache.Enqueue(batchData);
}
}

public void Flush()
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.State.Test/StateTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void Minimal_writes_when_setting_on_empty_scenario_2()
tree.Set(new Keccak("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeb1eeeeeb0"), _account0);
tree.Set(new Keccak("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeb1eeeeeb1"), _account0);
tree.Commit(0);
Assert.That(db.WritesCount, Is.EqualTo(7), "writes"); // extension, branch, leaf, extension, branch, 2x same leaf
Assert.That(db.WritesCount, Is.EqualTo(6), "writes"); // extension, branch, leaf, extension, branch, 2x same leaf
Assert.That(Trie.Metrics.TreeNodeHashCalculations, Is.EqualTo(7), "hashes");
Assert.That(Trie.Metrics.TreeNodeRlpEncodings, Is.EqualTo(7), "encodings");
}
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ public void Rlp_is_cloned_when_cloning()
leaf2.ResolveKey(trieStore, false);
leaf2.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf2));
trieStore.FinishBlockCommit(TrieType.State, 0, leaf2);

TrieNode trieNode = new(NodeType.Branch);
trieNode.SetChild(1, leaf1);
Expand Down
6 changes: 2 additions & 4 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Buffers;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;

namespace Nethermind.Trie.Pruning
{
Expand Down Expand Up @@ -614,7 +612,7 @@ private void Persist(BlockCommitSet commitSet, WriteFlags writeFlags = WriteFlag

try
{
_currentBatch ??= _keyValueStore.StartBatch();
_currentBatch ??= _keyValueStore.StartBatch().ToSortedBatch();
if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root} in {commitSet.BlockNumber}");

Stopwatch stopwatch = Stopwatch.StartNew();
Expand All @@ -639,7 +637,7 @@ private void Persist(BlockCommitSet commitSet, WriteFlags writeFlags = WriteFlag

private void Persist(TrieNode currentNode, long blockNumber, WriteFlags writeFlags = WriteFlags.None)
{
_currentBatch ??= _keyValueStore.StartBatch();
_currentBatch ??= _keyValueStore.StartBatch().ToSortedBatch();
if (currentNode is null)
{
throw new ArgumentNullException(nameof(currentNode));
Expand Down

0 comments on commit b3bc144

Please sign in to comment.