Skip to content

Commit

Permalink
Feature/low priority write (#5588)
Browse files Browse the repository at this point in the history
* Add write flag

* Rocksdb integration

* Use low priority writes with full pruning

* Whitespace
  • Loading branch information
asdacap committed Apr 24, 2023
1 parent 3bd38ca commit db400f1
Show file tree
Hide file tree
Showing 21 changed files with 343 additions and 217 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain.FullPruning;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Db.FullPruning;
Expand All @@ -27,20 +31,28 @@ public class CopyTreeVisitorTests
[Timeout(Timeout.MaxTestTime)]
public void copies_state_between_dbs(int fullPruningMemoryBudgetMb, int maxDegreeOfParallelism)
{
MemDb trieDb = new();
MemDb clonedDb = new();
TestMemDb trieDb = new();
TestMemDb clonedDb = new();

VisitingOptions visitingOptions = new()
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
FullScanMemoryBudget = fullPruningMemoryBudgetMb.MiB(),
};

CopyDb(StartPruning(trieDb, clonedDb), trieDb, clonedDb, visitingOptions);
IPruningContext ctx = StartPruning(trieDb, clonedDb);
CopyDb(ctx, trieDb, clonedDb, visitingOptions);

List<byte[]> keys = trieDb.Keys.ToList();
List<byte[]> values = trieDb.Values.ToList();

ctx.Commit();

clonedDb.Count.Should().Be(132);
clonedDb.Keys.Should().BeEquivalentTo(trieDb.Keys);
clonedDb.Values.Should().BeEquivalentTo(trieDb.Values);
clonedDb.Keys.Should().BeEquivalentTo(keys);
clonedDb.Values.Should().BeEquivalentTo(values);

clonedDb.KeyWasWrittenWithFlags(keys[0], WriteFlags.LowPriority);
}

[Test, Timeout(Timeout.MaxTestTime)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ public void Dispose()
set => _context[key] = value;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_context.Set(key, value, flags);
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _context.Get(key, flags);
}

public void Commit()
{
WaitForFinish.Set();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics;
using System.Threading;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Db.FullPruning;
using Nethermind.Logging;
Expand Down Expand Up @@ -70,7 +71,7 @@ private void PersistNode(TrieNode node)
if (node.Keccak is not null)
{
// simple copy of nodes RLP
_pruningContext[node.Keccak.Bytes] = node.FullRlp;
_pruningContext.Set(node.Keccak.Bytes, node.FullRlp, WriteFlags.LowPriority);
Interlocked.Increment(ref _persistedNodes);

// log message every 1 mln nodes
Expand Down
17 changes: 10 additions & 7 deletions src/Nethermind/Nethermind.Core.Test/InMemoryBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class InMemoryBatch : IBatch
{
private readonly IKeyValueStore _store;
private readonly ConcurrentDictionary<byte[], byte[]?> _currentItems = new();
private WriteFlags _writeFlags = WriteFlags.None;

public InMemoryBatch(IKeyValueStore storeWithNoBatchSupport)
{
Expand All @@ -21,19 +22,21 @@ public void Dispose()
{
foreach (KeyValuePair<byte[], byte[]?> keyValuePair in _currentItems)
{
_store[keyValuePair.Key] = keyValuePair.Value;
_store.Set(keyValuePair.Key, keyValuePair.Value, _writeFlags);
}

GC.SuppressFinalize(this);
}

public byte[]? this[ReadOnlySpan<byte> key]
public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
get => _store[key];
set
{
_currentItems[key.ToArray()] = value;
}
_currentItems[key.ToArray()] = value;
_writeFlags = flags;
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _store.Get(key, flags);
}
}
}
30 changes: 14 additions & 16 deletions src/Nethermind/Nethermind.Core.Test/TestMemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,12 @@ namespace Nethermind.Core.Test;
public class TestMemDb : MemDb
{
private List<(byte[], ReadFlags)> _readKeys = new();
private List<byte[]> _writeKeys = new();
private List<(byte[], WriteFlags)> _writeKeys = new();
private List<byte[]> _removedKeys = new();

public Func<byte[], byte[]>? ReadFunc { get; set; }
public Action<byte[]>? RemoveFunc { get; set; }

public override byte[]? this[ReadOnlySpan<byte> key]
{
get
{
return Get(key);
}
set
{
_writeKeys.Add(key.ToArray());
base[key] = value;
}
}

public override byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
_readKeys.Add((key.ToArray(), flags));
Expand All @@ -43,9 +30,15 @@ public class TestMemDb : MemDb
return base.Get(key, flags);
}

public override void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_writeKeys.Add((key.ToArray(), flags));
base.Set(key, value, flags);
}

public override Span<byte> GetSpan(ReadOnlySpan<byte> key)
{
return this[key];
return Get(key);
}

public override void Remove(ReadOnlySpan<byte> key)
Expand All @@ -72,7 +65,12 @@ public void KeyWasReadWithFlags(byte[] key, ReadFlags flags, int times = 1)

public void KeyWasWritten(byte[] key, int times = 1)
{
_writeKeys.Count(it => Bytes.AreEqual(it, key)).Should().Be(times);
_writeKeys.Count(it => Bytes.AreEqual(it.Item1, key)).Should().Be(times);
}

public void KeyWasWrittenWithFlags(byte[] key, WriteFlags flags, int times = 1)
{
_writeKeys.Count(it => Bytes.AreEqual(it.Item1, key) && it.Item2 == flags).Should().Be(times);
}

public void KeyWasRemoved(Func<byte[], bool> cond, int times = 1)
Expand Down
10 changes: 7 additions & 3 deletions src/Nethermind/Nethermind.Core/FakeBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ public void Dispose()
_onDispose?.Invoke();
}

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
get => _storePretendingToSupportBatches[key];
set => _storePretendingToSupportBatches[key] = value;
return _storePretendingToSupportBatches.Get(key, flags);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_storePretendingToSupportBatches.Set(key, value, flags);
}
}
}
23 changes: 17 additions & 6 deletions src/Nethermind/Nethermind.Core/IKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ namespace Nethermind.Core
{
public interface IKeyValueStore : IReadOnlyKeyValueStore
{
new byte[]? this[ReadOnlySpan<byte> key] { get; set; }
new byte[]? this[ReadOnlySpan<byte> key]
{
get => Get(key, ReadFlags.None);
set => Set(key, value, WriteFlags.None);
}

void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None);
}

public interface IReadOnlyKeyValueStore
{
byte[]? this[ReadOnlySpan<byte> key] { get; }
byte[]? this[ReadOnlySpan<byte> key] => Get(key, ReadFlags.None);

byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return this[key];
}
byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None);
}

public enum ReadFlags
Expand All @@ -28,4 +31,12 @@ public enum ReadFlags
// to reduce CPU usage
HintCacheMiss,
}

public enum WriteFlags
{
None,

// Hint that this is a low priority write
LowPriority,
}
}
49 changes: 24 additions & 25 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,22 @@ public ColumnDb(RocksDb rocksDb, DbOnTheRocks mainDb, string name)

public string Name { get; }

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
get
UpdateReadMetrics();
return _rocksDb.Get(key, _columnFamily);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
UpdateWriteMetrics();
if (value is null)
{
UpdateReadMetrics();
return _rocksDb.Get(key, _columnFamily);
_rocksDb.Remove(key, _columnFamily, _mainDb.WriteFlagsToWriteOptions(flags));
}
set
else
{
UpdateWriteMetrics();
if (value is null)
{
_rocksDb.Remove(key, _columnFamily, _mainDb.WriteOptions);
}
else
{
_rocksDb.Put(key, value, _columnFamily, _mainDb.WriteOptions);
}
_rocksDb.Put(key, value, _columnFamily, _mainDb.WriteFlagsToWriteOptions(flags));
}
}

Expand Down Expand Up @@ -84,19 +82,20 @@ public void Dispose()
_underlyingBatch.Dispose();
}

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _underlyingBatch.Get(key, flags);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
get => _underlyingBatch[key];
set
if (value is null)
{
_underlyingBatch._rocksBatch.Delete(key, _columnDb._columnFamily);
}
else
{
if (value is null)
{
_underlyingBatch._rocksBatch.Delete(key, _columnDb._columnFamily);
}
else
{
_underlyingBatch._rocksBatch.Put(key, value, _columnDb._columnFamily);
}
_underlyingBatch._rocksBatch.Put(key, value, _columnDb._columnFamily);
}
}
}
Expand Down
Loading

0 comments on commit db400f1

Please sign in to comment.