Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/low priority write #5588

Merged
merged 4 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain.FullPruning;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Test;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Db.FullPruning;
Expand All @@ -27,20 +31,28 @@ public class CopyTreeVisitorTests
[Timeout(Timeout.MaxTestTime)]
public void copies_state_between_dbs(int fullPruningMemoryBudgetMb, int maxDegreeOfParallelism)
{
MemDb trieDb = new();
MemDb clonedDb = new();
TestMemDb trieDb = new();
TestMemDb clonedDb = new();

VisitingOptions visitingOptions = new()
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
FullScanMemoryBudget = fullPruningMemoryBudgetMb.MiB(),
};

CopyDb(StartPruning(trieDb, clonedDb), trieDb, clonedDb, visitingOptions);
IPruningContext ctx = StartPruning(trieDb, clonedDb);
CopyDb(ctx, trieDb, clonedDb, visitingOptions);

List<byte[]> keys = trieDb.Keys.ToList();
List<byte[]> values = trieDb.Values.ToList();

ctx.Commit();

clonedDb.Count.Should().Be(132);
clonedDb.Keys.Should().BeEquivalentTo(trieDb.Keys);
clonedDb.Values.Should().BeEquivalentTo(trieDb.Values);
clonedDb.Keys.Should().BeEquivalentTo(keys);
clonedDb.Values.Should().BeEquivalentTo(values);

clonedDb.KeyWasWrittenWithFlags(keys[0], WriteFlags.LowPriority);
}

[Test, Timeout(Timeout.MaxTestTime)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ public void Dispose()
set => _context[key] = value;
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_context.Set(key, value, flags);
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _context.Get(key, flags);
}

public void Commit()
{
WaitForFinish.Set();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics;
using System.Threading;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Db.FullPruning;
using Nethermind.Logging;
Expand Down Expand Up @@ -70,7 +71,7 @@ private void PersistNode(TrieNode node)
if (node.Keccak is not null)
{
// simple copy of nodes RLP
_pruningContext[node.Keccak.Bytes] = node.FullRlp;
_pruningContext.Set(node.Keccak.Bytes, node.FullRlp, WriteFlags.LowPriority);
Interlocked.Increment(ref _persistedNodes);

// log message every 1 mln nodes
Expand Down
17 changes: 10 additions & 7 deletions src/Nethermind/Nethermind.Core.Test/InMemoryBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class InMemoryBatch : IBatch
{
private readonly IKeyValueStore _store;
private readonly ConcurrentDictionary<byte[], byte[]?> _currentItems = new();
private WriteFlags _writeFlags = WriteFlags.None;

public InMemoryBatch(IKeyValueStore storeWithNoBatchSupport)
{
Expand All @@ -21,19 +22,21 @@ public void Dispose()
{
foreach (KeyValuePair<byte[], byte[]?> keyValuePair in _currentItems)
{
_store[keyValuePair.Key] = keyValuePair.Value;
_store.Set(keyValuePair.Key, keyValuePair.Value, _writeFlags);
}

GC.SuppressFinalize(this);
}

public byte[]? this[ReadOnlySpan<byte> key]
public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
get => _store[key];
set
{
_currentItems[key.ToArray()] = value;
}
_currentItems[key.ToArray()] = value;
_writeFlags = flags;
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _store.Get(key, flags);
}
}
}
30 changes: 14 additions & 16 deletions src/Nethermind/Nethermind.Core.Test/TestMemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,12 @@ namespace Nethermind.Core.Test;
public class TestMemDb : MemDb
{
private List<(byte[], ReadFlags)> _readKeys = new();
private List<byte[]> _writeKeys = new();
private List<(byte[], WriteFlags)> _writeKeys = new();
private List<byte[]> _removedKeys = new();

public Func<byte[], byte[]>? ReadFunc { get; set; }
public Action<byte[]>? RemoveFunc { get; set; }

public override byte[]? this[ReadOnlySpan<byte> key]
{
get
{
return Get(key);
}
set
{
_writeKeys.Add(key.ToArray());
base[key] = value;
}
}

public override byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
_readKeys.Add((key.ToArray(), flags));
Expand All @@ -43,9 +30,15 @@ public class TestMemDb : MemDb
return base.Get(key, flags);
}

public override void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_writeKeys.Add((key.ToArray(), flags));
base.Set(key, value, flags);
}

public override Span<byte> GetSpan(ReadOnlySpan<byte> key)
{
return this[key];
return Get(key);
}

public override void Remove(ReadOnlySpan<byte> key)
Expand All @@ -72,7 +65,12 @@ public void KeyWasReadWithFlags(byte[] key, ReadFlags flags, int times = 1)

public void KeyWasWritten(byte[] key, int times = 1)
{
_writeKeys.Count(it => Bytes.AreEqual(it, key)).Should().Be(times);
_writeKeys.Count(it => Bytes.AreEqual(it.Item1, key)).Should().Be(times);
}

public void KeyWasWrittenWithFlags(byte[] key, WriteFlags flags, int times = 1)
{
_writeKeys.Count(it => Bytes.AreEqual(it.Item1, key) && it.Item2 == flags).Should().Be(times);
}

public void KeyWasRemoved(Func<byte[], bool> cond, int times = 1)
Expand Down
10 changes: 7 additions & 3 deletions src/Nethermind/Nethermind.Core/FakeBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ public void Dispose()
_onDispose?.Invoke();
}

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
get => _storePretendingToSupportBatches[key];
set => _storePretendingToSupportBatches[key] = value;
return _storePretendingToSupportBatches.Get(key, flags);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_storePretendingToSupportBatches.Set(key, value, flags);
}
}
}
23 changes: 17 additions & 6 deletions src/Nethermind/Nethermind.Core/IKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ namespace Nethermind.Core
{
public interface IKeyValueStore : IReadOnlyKeyValueStore
{
new byte[]? this[ReadOnlySpan<byte> key] { get; set; }
new byte[]? this[ReadOnlySpan<byte> key]
{
get => Get(key, ReadFlags.None);
set => Set(key, value, WriteFlags.None);
}

void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None);
}

public interface IReadOnlyKeyValueStore
{
byte[]? this[ReadOnlySpan<byte> key] { get; }
byte[]? this[ReadOnlySpan<byte> key] => Get(key, ReadFlags.None);

byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return this[key];
}
byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None);
}

public enum ReadFlags
Expand All @@ -28,4 +31,12 @@ public enum ReadFlags
// to reduce CPU usage
HintCacheMiss,
}

public enum WriteFlags
{
None,

// Hint that this is a low priority write
LowPriority,
}
}
49 changes: 24 additions & 25 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,22 @@ public ColumnDb(RocksDb rocksDb, DbOnTheRocks mainDb, string name)

public string Name { get; }

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
get
UpdateReadMetrics();
return _rocksDb.Get(key, _columnFamily);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
UpdateWriteMetrics();
if (value is null)
{
UpdateReadMetrics();
return _rocksDb.Get(key, _columnFamily);
_rocksDb.Remove(key, _columnFamily, _mainDb.WriteFlagsToWriteOptions(flags));
}
set
else
{
UpdateWriteMetrics();
if (value is null)
{
_rocksDb.Remove(key, _columnFamily, _mainDb.WriteOptions);
}
else
{
_rocksDb.Put(key, value, _columnFamily, _mainDb.WriteOptions);
}
_rocksDb.Put(key, value, _columnFamily, _mainDb.WriteFlagsToWriteOptions(flags));
}
}

Expand Down Expand Up @@ -84,19 +82,20 @@ public void Dispose()
_underlyingBatch.Dispose();
}

public byte[]? this[ReadOnlySpan<byte> key]
public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _underlyingBatch.Get(key, flags);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
get => _underlyingBatch[key];
set
if (value is null)
{
_underlyingBatch._rocksBatch.Delete(key, _columnDb._columnFamily);
}
else
{
if (value is null)
{
_underlyingBatch._rocksBatch.Delete(key, _columnDb._columnFamily);
}
else
{
_underlyingBatch._rocksBatch.Put(key, value, _columnDb._columnFamily);
}
_underlyingBatch._rocksBatch.Put(key, value, _columnDb._columnFamily);
}
}
}
Expand Down
Loading