Skip to content

Commit

Permalink
Refactor/columned db (#6214)
Browse files Browse the repository at this point in the history
* Column db refactor

* Add tests

* Whitespace

* Minor cleanup

* Fix build
  • Loading branch information
asdacap authored Oct 24, 2023
1 parent 61d6bbd commit fe2b14e
Show file tree
Hide file tree
Showing 32 changed files with 454 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class PersistentReceiptStorage : IReceiptStorage
private readonly IReceiptsRecovery _receiptsRecovery;
private long? _lowestInsertedReceiptBlock;
private readonly IDbWithSpan _blocksDb;
private readonly IDbWithSpan _defaultColumn;
private readonly IDb _transactionDb;
private static readonly Keccak MigrationBlockNumberKey = Keccak.Compute(nameof(MigratedBlockNumber));
private long _migratedBlockNumber;
Expand All @@ -48,9 +49,10 @@ public PersistentReceiptStorage(
ReceiptArrayStorageDecoder? storageDecoder = null
)
{
long Get(Keccak key, long defaultValue) => _database.Get(key)?.ToLongFromBigEndianByteArrayWithoutLeadingZeros() ?? defaultValue;

_database = receiptsDb ?? throw new ArgumentNullException(nameof(receiptsDb));
_defaultColumn = _database.GetColumnDb(ReceiptsColumns.Default);
long Get(Keccak key, long defaultValue) => _defaultColumn.Get(key)?.ToLongFromBigEndianByteArrayWithoutLeadingZeros() ?? defaultValue;

_specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider));
_receiptsRecovery = receiptsRecovery ?? throw new ArgumentNullException(nameof(receiptsRecovery));
_blocksDb = _database.GetColumnDb(ReceiptsColumns.Blocks);
Expand All @@ -60,7 +62,7 @@ public PersistentReceiptStorage(
_storageDecoder = storageDecoder ?? ReceiptArrayStorageDecoder.Instance;
_receiptConfig = receiptConfig ?? throw new ArgumentNullException(nameof(receiptConfig));

byte[] lowestBytes = _database.Get(Keccak.Zero);
byte[] lowestBytes = _defaultColumn.Get(Keccak.Zero);
_lowestInsertedReceiptBlock = lowestBytes is null ? (long?)null : new RlpStream(lowestBytes).DecodeLong();
_migratedBlockNumber = Get(MigrationBlockNumberKey, long.MaxValue);

Expand Down Expand Up @@ -109,14 +111,14 @@ public Keccak FindBlockHash(Keccak txHash)
// Find receipt stored with old - obsolete format.
private TxReceipt FindReceiptObsolete(Keccak hash)
{
var receiptData = _database.GetSpan(hash);
var receiptData = _defaultColumn.GetSpan(hash);
try
{
return DeserializeReceiptObsolete(hash, receiptData);
}
finally
{
_database.DangerousReleaseMemory(receiptData);
_defaultColumn.DangerousReleaseMemory(receiptData);
}
}

Expand Down Expand Up @@ -300,7 +302,7 @@ public long? LowestInsertedReceiptBlockNumber
_lowestInsertedReceiptBlock = value;
if (value.HasValue)
{
_database.Set(Keccak.Zero, Rlp.Encode(value.Value).Bytes);
_defaultColumn.Set(Keccak.Zero, Rlp.Encode(value.Value).Bytes);
}
}
}
Expand All @@ -311,7 +313,7 @@ public long MigratedBlockNumber
set
{
_migratedBlockNumber = value;
_database.Set(MigrationBlockNumberKey, MigratedBlockNumber.ToBigEndianByteArrayWithoutLeadingZeros());
_defaultColumn.Set(MigrationBlockNumberKey, MigratedBlockNumber.ToBigEndianByteArrayWithoutLeadingZeros());
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Nethermind.Core.Test;

public class TestMemColumnsDb<TKey> : TestMemDb, IColumnsDb<TKey>
public class TestMemColumnsDb<TKey> : IColumnsDb<TKey>
where TKey : notnull
{
private readonly IDictionary<TKey, IDbWithSpan> _columnDbs = new Dictionary<TKey, IDbWithSpan>();
Expand All @@ -26,8 +26,8 @@ public TestMemColumnsDb(params TKey[] keys)
public IDbWithSpan GetColumnDb(TKey key) => !_columnDbs.TryGetValue(key, out var db) ? _columnDbs[key] = new TestMemDb() : db;
public IEnumerable<TKey> ColumnKeys => _columnDbs.Keys;

public IReadOnlyDb CreateReadOnly(bool createInMemWriteStore)
public IColumnsBatch<TKey> StartBatch()
{
return new ReadOnlyColumnsDb<TKey>(this, createInMemWriteStore);
return new InMemoryColumnBatch<TKey>(this);
}
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public ColumnDb(RocksDb rocksDb, DbOnTheRocks mainDb, string name)
{
_rocksDb = rocksDb;
_mainDb = mainDb;
if (name == "Default") name = "default";
_columnFamily = _rocksDb.GetColumnFamily(name);
Name = name;
}
Expand Down
57 changes: 56 additions & 1 deletion src/Nethermind/Nethermind.Db.Rocks/ColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using FastEnumUtility;
using Nethermind.Core;
using Nethermind.Db.Rocks.Config;
using Nethermind.Logging;
using RocksDbSharp;
Expand Down Expand Up @@ -46,11 +47,16 @@ protected override void BuildOptions<O>(PerTableDbConfig dbConfig, Options<O> op

public IEnumerable<T> ColumnKeys => _columnDbs.Keys;

public IReadOnlyDb CreateReadOnly(bool createInMemWriteStore)
public IReadOnlyColumnDb<T> CreateReadOnly(bool createInMemWriteStore)
{
return new ReadOnlyColumnsDb<T>(this, createInMemWriteStore);
}

public new IColumnsBatch<T> StartBatch()
{
return new RocksColumnsBatch(this);
}

protected override void ApplyOptions(IDictionary<string, string> options)
{
string[] keys = options.Select<KeyValuePair<string, string>, string>(e => e.Key).ToArray();
Expand All @@ -61,4 +67,53 @@ protected override void ApplyOptions(IDictionary<string, string> options)
}
base.ApplyOptions(options);
}

private class RocksColumnsBatch : IColumnsBatch<T>
{
internal RocksDbBatch _batch;
private ColumnsDb<T> _columnsDb;

public RocksColumnsBatch(ColumnsDb<T> columnsDb)
{
_batch = new RocksDbBatch(columnsDb);
_columnsDb = columnsDb;
}

public IBatch GetColumnBatch(T key)
{
return new RocksColumnBatch(_columnsDb._columnDbs[key], this);
}

public void Dispose()
{
_batch.Dispose();
}
}

private class RocksColumnBatch : IBatch
{
private readonly ColumnDb _column;
private readonly RocksColumnsBatch _batch;

public RocksColumnBatch(ColumnDb column, RocksColumnsBatch batch)
{
_column = column;
_batch = batch;
}

public void Dispose()
{
_batch.Dispose();
}

public byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None)
{
return _column.Get(key, flags);
}

public void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None)
{
_batch._batch.Set(key, value, _column._columnFamily, flags);
}
}
}
9 changes: 6 additions & 3 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
using System.Collections.Generic;
using System.IO;
using System.IO.Abstractions;
using System.Linq;
using System.Reflection;
using System.Threading;
using ConcurrentCollections;
using Nethermind.Config;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Exceptions;
using Nethermind.Core.Extensions;
Expand Down Expand Up @@ -133,8 +131,13 @@ private RocksDb Init(string basePath, string dbPath, IDbConfig dbConfig, ILogMan
if (columnNames != null)
{
columnFamilies = new ColumnFamilies();
foreach (string columnFamily in columnNames)
foreach (string enumColumnName in columnNames)
{
string columnFamily = enumColumnName;

// "default" is a special column name with rocksdb, which is what previously not specifying column goes to
if (columnFamily == "Default") columnFamily = "default";

ColumnFamilyOptions options = new();
BuildOptions(new PerTableDbConfig(dbConfig, _settings, columnFamily), options, sharedCache);
columnFamilies.Add(columnFamily, options);
Expand Down
41 changes: 27 additions & 14 deletions src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,47 @@

using System;
using System.Collections.Generic;
using Nethermind.Core.Attributes;
using Nethermind.JsonRpc.Client;
using Nethermind.Logging;
using Nethermind.Serialization.Json;

namespace Nethermind.Db.Rpc
{
public class RpcColumnsDb<T> : RpcDb, IColumnsDb<T>
public class RpcColumnsDb<T> : IColumnsDb<T> where T : struct, Enum
{
public RpcColumnsDb(string dbName, IJsonSerializer jsonSerializer, IJsonRpcClient rpcClient, ILogManager logManager, IDb recordDb) : base(dbName, jsonSerializer, rpcClient, logManager, recordDb)
private readonly string _dbName;
private readonly IJsonSerializer _jsonSerializer;
private readonly IJsonRpcClient _rpcClient;
private readonly ILogManager _logManager;
private readonly IColumnsDb<T> _recordDb;

public RpcColumnsDb(
string dbName,
IJsonSerializer jsonSerializer,
IJsonRpcClient rpcClient,
ILogManager logManager,
IColumnsDb<T> recordDb
)
{
_dbName = dbName;
_jsonSerializer = jsonSerializer;
_rpcClient = rpcClient;
_logManager = logManager;
_recordDb = recordDb;
}

[Todo(Improve.MissingFunctionality, "Need to implement RPC method for column DB's")]
public IDbWithSpan GetColumnDb(T key) => this;

[Todo(Improve.MissingFunctionality, "Need to implement RPC method for column DB's")]
public IEnumerable<T> ColumnKeys { get; } = Array.Empty<T>();

public Span<byte> GetSpan(ReadOnlySpan<byte> key) => this[key].AsSpan();
public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
public IDbWithSpan GetColumnDb(T key)
{
this[key] = value.ToArray();
string dbName = _dbName + key;
IDbWithSpan column = _recordDb.GetColumnDb(key);
return new RpcDb(dbName, _jsonSerializer, _rpcClient, _logManager, column);
}

public void DangerousReleaseMemory(in Span<byte> span)
{
public IEnumerable<T> ColumnKeys => Enum.GetValues<T>();

public IColumnsBatch<T> StartBatch()
{
return new InMemoryColumnBatch<T>(this);
}
}
}
16 changes: 15 additions & 1 deletion src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Nethermind.Db.Rpc
{
public class RpcDb : IDb
public class RpcDb : IDb, IDbWithSpan
{
private readonly string _dbName;
private readonly IJsonSerializer _jsonSerializer;
Expand Down Expand Up @@ -101,5 +101,19 @@ private byte[] GetThroughRpc(ReadOnlySpan<byte> key)

return value;
}

public Span<byte> GetSpan(ReadOnlySpan<byte> key)
{
return Get(key);
}

public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value)
{
Set(key, value.ToArray());
}

public void DangerousReleaseMemory(in Span<byte> span)
{
}
}
}
10 changes: 6 additions & 4 deletions src/Nethermind/Nethermind.Db.Rpc/RpcDbFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public RpcDbFactory(

public IColumnsDb<T> CreateColumnsDb<T>(RocksDbSettings rocksDbSettings) where T : struct, Enum
{
var rocksDb = _wrappedRocksDbFactory.CreateColumnsDb<T>(rocksDbSettings);
return new ReadOnlyColumnsDb<T>(new RpcColumnsDb<T>(rocksDbSettings.DbName, _jsonSerializer, _jsonRpcClient, _logManager, rocksDb), true);
IColumnsDb<T> rocksDb = _wrappedRocksDbFactory.CreateColumnsDb<T>(rocksDbSettings);
return new ReadOnlyColumnsDb<T>(
new RpcColumnsDb<T>(rocksDbSettings.DbName, _jsonSerializer, _jsonRpcClient, _logManager, rocksDb),
true);
}

public IColumnsDb<T> CreateColumnsDb<T>(string dbName)
public IColumnsDb<T> CreateColumnsDb<T>(string dbName) where T : struct, Enum
{
var memDb = _wrappedMemDbFactory.CreateColumnsDb<T>(dbName);
IColumnsDb<T> memDb = _wrappedMemDbFactory.CreateColumnsDb<T>(dbName);
return new ReadOnlyColumnsDb<T>(new RpcColumnsDb<T>(dbName, _jsonSerializer, _jsonRpcClient, _logManager, memDb), true);
}

Expand Down
Loading

0 comments on commit fe2b14e

Please sign in to comment.