Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/sorted batch #6142

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public bool HasBlock(long blockNumber, Keccak blockHash)

public void EnsureCanonical(Block block)
{
using IBatch batch = _transactionDb.StartBatch();
using IBatch batch = _transactionDb.StartBatch().ToSortedBatch();

long headNumber = _blockTree.FindBestSuggestedHeader()?.Number ?? 0;

Expand Down
60 changes: 60 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/SortedBatchTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test.Builders;
using NUnit.Framework;

namespace Nethermind.Core.Test;

public class SortedBatchTests
{
[Test]
public void Test_BatchWillSort()
{
TestBatch baseBatch = new TestBatch();

IBatch sortedBatch = baseBatch.ToSortedBatch();

IList<byte[]> expectedOrder = new List<byte[]>();
for (int i = 0; i < 10; i++)
{
sortedBatch[TestItem.ValueKeccaks[i].ToByteArray()] = TestItem.ValueKeccaks[i].ToByteArray();
expectedOrder.Add(TestItem.ValueKeccaks[i].ToByteArray());
}

baseBatch.DisposeCalled.Should().BeFalse();
baseBatch.SettedValues.Count.Should().Be(0);

sortedBatch.Dispose();

baseBatch.DisposeCalled.Should().BeTrue();
expectedOrder = expectedOrder.Order(Bytes.Comparer).ToList();
baseBatch.SettedValues.Should().BeEquivalentTo(expectedOrder);
}

private class TestBatch : IBatch
{
public bool DisposeCalled { get; set; }
public List<byte[]> SettedValues { get; set; } = new();

public void Dispose()
{
DisposeCalled = true;
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return null;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
SettedValues.Add(key.ToArray());
}
}
}
96 changes: 96 additions & 0 deletions src/Nethermind/Nethermind.Core/SortedBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Concurrent;
using System.Linq;
using Nethermind.Core.Collections;
using Nethermind.Core.Extensions;

namespace Nethermind.Core;

/// <summary>
/// Rocksdb's skiplist memtable have a fast path when the keys are inserted in ascending order. Otherwise, its seems
/// to be limited to 1Mil writes per second.
/// TODO: FlatDB layout need different key type to match the DB ordering.
/// </summary>
public class SortedBatch : IBatch
{
private const int InitialBatchSize = 300;
private static readonly int MaxCached = Environment.ProcessorCount;

private static ConcurrentQueue<SpanDictionary<byte, byte[]?>> s_cache = new();

private readonly IBatch _baseBatch;
private WriteFlags _writeFlags = WriteFlags.None;

private SpanDictionary<byte, byte[]?> _batchData = CreateOrGetFromCache();

public SortedBatch(IBatch dbOnTheRocks)
{
_baseBatch = dbOnTheRocks;
}

public void Dispose()
{
SpanDictionary<byte, byte[]?> batchData = _batchData;
// Clear the _batchData reference so is null if Get is called.
_batchData = null!;
if (batchData.Count == 0)
{
// No data to write, skip empty batches
ReturnToCache(batchData);
_baseBatch.Dispose();
return;
}

// Sort the batch by key
foreach (var kv in batchData.OrderBy(i => i.Key, Bytes.Comparer))
{
_baseBatch.Set(kv.Key, kv.Value, _writeFlags);
}
ReturnToCache(batchData);

_baseBatch.Dispose();
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
// Not checking _isDisposing here as for some reason, sometimes is is read after dispose
return _batchData?.TryGetValue(key, out byte[]? value) ?? false ? value : _baseBatch.Get(key);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_batchData[key] = value;

_writeFlags = flags;
}

private static SpanDictionary<byte, byte[]?> CreateOrGetFromCache()
{
if (s_cache.TryDequeue(out SpanDictionary<byte, byte[]?>? batchData))
{
return batchData;
}

return new SpanDictionary<byte, byte[]?>(InitialBatchSize, Bytes.SpanEqualityComparer);
}

private static void ReturnToCache(SpanDictionary<byte, byte[]?> batchData)
{
if (s_cache.Count >= MaxCached) return;

batchData.Clear();
batchData.TrimExcess(capacity: InitialBatchSize);
s_cache.Enqueue(batchData);
}
}

public static class BatchExtensions
{
public static SortedBatch ToSortedBatch(this IBatch batch)
{
return new SortedBatch(batch);
}
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
}
else
{
_underlyingBatch.Set(key, value, _columnDb._columnFamily);
_underlyingBatch.Set(key, value, _columnDb._columnFamily, flags);
}
}
}
Expand Down
72 changes: 12 additions & 60 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -788,20 +788,15 @@ public IBatch StartBatch()

internal class RocksDbBatch : IBatch
{
private const int InitialBatchSize = 300;
private static readonly int MaxCached = Environment.ProcessorCount;

private static ConcurrentQueue<SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)>> s_cache = new();

private readonly DbOnTheRocks _dbOnTheRocks;
private readonly WriteBatch _rocksBatch;
private WriteFlags _writeFlags = WriteFlags.None;
private bool _isDisposed;

private SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> _batchData = CreateOrGetFromCache();

public RocksDbBatch(DbOnTheRocks dbOnTheRocks)
{
_dbOnTheRocks = dbOnTheRocks;
_rocksBatch = new WriteBatch();

if (_dbOnTheRocks._isDisposing)
{
Expand All @@ -822,37 +817,11 @@ public void Dispose()
}
_isDisposed = true;

SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> batchData = _batchData;
// Clear the _batchData reference so is null if Get is called.
_batchData = null!;
if (batchData.Count == 0)
{
// No data to write, skip empty batches
ReturnToCache(batchData);
_dbOnTheRocks._currentBatches.TryRemove(this);
return;
}

try
{
WriteBatch rocksBatch = new();
// Sort the batch by key
foreach (var kv in batchData.OrderBy(i => i.Key, Bytes.Comparer))
{
if (kv.Value.data is null)
{
rocksBatch.Delete(kv.Key, kv.Value.cf);
}
else
{
rocksBatch.Put(kv.Key, kv.Value.data, kv.Value.cf);
}
}
ReturnToCache(batchData);

_dbOnTheRocks._db.Write(rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags));
_dbOnTheRocks._db.Write(_rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags));
_dbOnTheRocks._currentBatches.TryRemove(this);
rocksBatch.Dispose();
_rocksBatch.Dispose();
}
catch (RocksDbSharpException e)
{
Expand All @@ -864,7 +833,9 @@ public void Dispose()
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
// Not checking _isDisposing here as for some reason, sometimes is is read after dispose
return _batchData?.TryGetValue(key, out var value) ?? false ? value.data : _dbOnTheRocks.Get(key, flags);
// Note: The batch itself is not checked. Will need to use WriteBatchWithIndex to do that, but that will
// hurt perf.
return _dbOnTheRocks.Get(key, flags);
}

public void Delete(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf = null)
Expand All @@ -874,17 +845,18 @@ public void Delete(ReadOnlySpan<byte> key, ColumnFamilyHandle? cf = null)
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (null, cf);
_rocksBatch.Delete(key, cf);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, ColumnFamilyHandle? cf = null)
public void Set(ReadOnlySpan<byte> key, byte[]? value, ColumnFamilyHandle? cf = null, WriteFlags flags = WriteFlags.None)
{
if (_isDisposed)
{
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (value, cf);
_rocksBatch.Put(key, value, cf);
_writeFlags = flags;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
Expand All @@ -894,29 +866,9 @@ public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteF
throw new ObjectDisposedException($"Attempted to write a disposed batch {_dbOnTheRocks.Name}");
}

_batchData[key] = (value, null);

_rocksBatch.Put(key, value);
_writeFlags = flags;
}

private static SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> CreateOrGetFromCache()
{
if (s_cache.TryDequeue(out SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)>? batchData))
{
return batchData;
}

return new(InitialBatchSize, Bytes.SpanEqualityComparer);
}

private static void ReturnToCache(SpanDictionary<byte, (byte[]? data, ColumnFamilyHandle? cf)> batchData)
{
if (s_cache.Count >= MaxCached) return;

batchData.Clear();
batchData.TrimExcess(capacity: InitialBatchSize);
s_cache.Enqueue(batchData);
}
}

public void Flush()
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.State.Test/StateTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void Minimal_writes_when_setting_on_empty_scenario_2()
tree.Set(new Keccak("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeb1eeeeeb0"), _account0);
tree.Set(new Keccak("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeb1eeeeeb1"), _account0);
tree.Commit(0);
Assert.That(db.WritesCount, Is.EqualTo(7), "writes"); // extension, branch, leaf, extension, branch, 2x same leaf
Assert.That(db.WritesCount, Is.EqualTo(6), "writes"); // extension, branch, leaf, extension, branch, 2x same leaf
Assert.That(Trie.Metrics.TreeNodeHashCalculations, Is.EqualTo(7), "hashes");
Assert.That(Trie.Metrics.TreeNodeRlpEncodings, Is.EqualTo(7), "encodings");
}
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ public void Rlp_is_cloned_when_cloning()
leaf2.ResolveKey(trieStore, false);
leaf2.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf2));
trieStore.FinishBlockCommit(TrieType.State, 0, leaf2);

TrieNode trieNode = new(NodeType.Branch);
trieNode.SetChild(1, leaf1);
Expand Down
6 changes: 2 additions & 4 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Buffers;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;

namespace Nethermind.Trie.Pruning
{
Expand Down Expand Up @@ -614,7 +612,7 @@ private void Persist(BlockCommitSet commitSet, WriteFlags writeFlags = WriteFlag

try
{
_currentBatch ??= _keyValueStore.StartBatch();
_currentBatch ??= _keyValueStore.StartBatch().ToSortedBatch();
if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root} in {commitSet.BlockNumber}");

Stopwatch stopwatch = Stopwatch.StartNew();
Expand All @@ -639,7 +637,7 @@ private void Persist(BlockCommitSet commitSet, WriteFlags writeFlags = WriteFlag

private void Persist(TrieNode currentNode, long blockNumber, WriteFlags writeFlags = WriteFlags.None)
{
_currentBatch ??= _keyValueStore.StartBatch();
_currentBatch ??= _keyValueStore.StartBatch().ToSortedBatch();
if (currentNode is null)
{
throw new ArgumentNullException(nameof(currentNode));
Expand Down
Loading