Skip to content

Commit

Permalink
Storage abstract (neo-project#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzhang authored and Luchuan committed Jan 10, 2020
1 parent 8f56564 commit f45a8b7
Show file tree
Hide file tree
Showing 81 changed files with 943 additions and 908 deletions.
12 changes: 6 additions & 6 deletions src/neo/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class ConsensusContext : IDisposable, ISerializable
/// <summary>
/// Key for saving consensus state.
/// </summary>
private static readonly byte[] ConsensusStateKey = { 0xf4 };
private const byte ConsensusStatePrefix = 0xf4;

public Block Block;
public byte ViewNumber;
Expand All @@ -42,11 +42,11 @@ internal class ConsensusContext : IDisposable, ISerializable
/// </summary>
public SendersFeeMonitor SendersFeeMonitor = new SendersFeeMonitor();

public Snapshot Snapshot { get; private set; }
public SnapshotView Snapshot { get; private set; }
private KeyPair keyPair;
private int _witnessSize;
private readonly Wallet wallet;
private readonly Store store;
private readonly IStore store;

public int F => (Validators.Length - 1) / 3;
public int M => Validators.Length - F;
Expand Down Expand Up @@ -74,7 +74,7 @@ internal class ConsensusContext : IDisposable, ISerializable

public int Size => throw new NotImplementedException();

public ConsensusContext(Wallet wallet, Store store)
public ConsensusContext(Wallet wallet, IStore store)
{
this.wallet = wallet;
this.store = store;
Expand Down Expand Up @@ -146,7 +146,7 @@ public uint GetPrimaryIndex(byte viewNumber)

public bool Load()
{
byte[] data = store.Get(ConsensusStateKey);
byte[] data = store.TryGet(ConsensusStatePrefix, null);
if (data is null || data.Length == 0) return false;
using (MemoryStream ms = new MemoryStream(data, false))
using (BinaryReader reader = new BinaryReader(ms))
Expand Down Expand Up @@ -409,7 +409,7 @@ public void Reset(byte viewNumber)

public void Save()
{
store.PutSync(ConsensusStateKey, this.ToArray());
store.PutSync(ConsensusStatePrefix, null, this.ToArray());
}

public void Serialize(BinaryWriter writer)
Expand Down
4 changes: 2 additions & 2 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal class Timer { public uint Height; public byte ViewNumber; }
/// </summary>
private bool isRecovering = false;

public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet)
public ConsensusService(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
: this(localNode, taskManager, new ConsensusContext(wallet, store))
{
}
Expand Down Expand Up @@ -601,7 +601,7 @@ protected override void PostStop()
base.PostStop();
}

public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet)
public static Props Props(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
{
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox");
}
Expand Down
8 changes: 4 additions & 4 deletions src/neo/IO/Caching/CloneCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ protected override void AddInternal(TKey key, TValue value)
innerCache.Add(key, value);
}

public override void DeleteInternal(TKey key)
protected override void DeleteInternal(TKey key)
{
innerCache.Delete(key);
}

protected override IEnumerable<KeyValuePair<TKey, TValue>> FindInternal(byte[] key_prefix)
protected override IEnumerable<(TKey, TValue)> FindInternal(byte[] key_prefix)
{
foreach (KeyValuePair<TKey, TValue> pair in innerCache.Find(key_prefix))
yield return new KeyValuePair<TKey, TValue>(pair.Key, pair.Value.Clone());
foreach (var (key, value) in innerCache.Find(key_prefix))
yield return (key, value.Clone());
}

protected override TValue GetInternal(TKey key)
Expand Down
10 changes: 5 additions & 5 deletions src/neo/IO/Caching/DataCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void Delete(TKey key)
}
}

public abstract void DeleteInternal(TKey key);
protected abstract void DeleteInternal(TKey key);

public void DeleteWhere(Func<TKey, TValue, bool> predicate)
{
Expand All @@ -123,7 +123,7 @@ public void DeleteWhere(Func<TKey, TValue, bool> predicate)
/// </summary>
/// <param name="key_prefix">Must maintain the deserialized format of TKey</param>
/// <returns>Entries found with the desired prefix</returns>
public IEnumerable<KeyValuePair<TKey, TValue>> Find(byte[] key_prefix = null)
public IEnumerable<(TKey Key, TValue Value)> Find(byte[] key_prefix = null)
{
IEnumerable<(byte[], TKey, TValue)> cached;
lock (dictionary)
Expand Down Expand Up @@ -159,21 +159,21 @@ public void DeleteWhere(Func<TKey, TValue, bool> predicate)
{
if (!c2 || (c1 && ByteArrayComparer.Default.Compare(i1.KeyBytes, i2.KeyBytes) < 0))
{
yield return new KeyValuePair<TKey, TValue>(i1.Key, i1.Item);
yield return (i1.Key, i1.Item);
c1 = e1.MoveNext();
i1 = c1 ? e1.Current : default;
}
else
{
yield return new KeyValuePair<TKey, TValue>(i2.Key, i2.Item);
yield return (i2.Key, i2.Item);
c2 = e2.MoveNext();
i2 = c2 ? e2.Current : default;
}
}
}
}

protected abstract IEnumerable<KeyValuePair<TKey, TValue>> FindInternal(byte[] key_prefix);
protected abstract IEnumerable<(TKey Key, TValue Value)> FindInternal(byte[] key_prefix);

public IEnumerable<Trackable> GetChangeSet()
{
Expand Down
9 changes: 6 additions & 3 deletions src/neo/IO/Data/LevelDB/SliceBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ public SliceBuilder Add(long value)

public SliceBuilder Add(IEnumerable<byte> value)
{
data.AddRange(value);
if (value != null)
data.AddRange(value);
return this;
}

public SliceBuilder Add(string value)
{
data.AddRange(Encoding.UTF8.GetBytes(value));
if (value != null)
data.AddRange(Encoding.UTF8.GetBytes(value));
return this;
}

public SliceBuilder Add(ISerializable value)
{
data.AddRange(value.ToArray());
if (value != null)
data.AddRange(value.ToArray());
return this;
}

Expand Down
70 changes: 51 additions & 19 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public class FillCompleted { }
private readonly Dictionary<UInt256, Block> block_cache = new Dictionary<UInt256, Block>();
private readonly Dictionary<uint, LinkedList<Block>> block_cache_unverified = new Dictionary<uint, LinkedList<Block>>();
internal readonly RelayCache ConsensusRelayCache = new RelayCache(100);
private Snapshot currentSnapshot;
private SnapshotView currentSnapshot;

public Store Store { get; }
public IStore Store { get; }
public ReadOnlyView View { get; }
public MemoryPool MemPool { get; }
public uint Height => currentSnapshot.Height;
public uint HeaderHeight => currentSnapshot.HeaderHeight;
Expand Down Expand Up @@ -95,27 +96,28 @@ static Blockchain()
}
}

public Blockchain(NeoSystem system, Store store)
public Blockchain(NeoSystem system, IStore store)
{
this.system = system;
this.MemPool = new MemoryPool(system, ProtocolSettings.Default.MemoryPoolMaxTransactions);
this.Store = store;
this.View = new ReadOnlyView(store);
lock (lockObj)
{
if (singleton != null)
throw new InvalidOperationException();
header_index.AddRange(store.GetHeaderHashList().Find().OrderBy(p => (uint)p.Key).SelectMany(p => p.Value.Hashes));
header_index.AddRange(View.HeaderHashList.Find().OrderBy(p => (uint)p.Key).SelectMany(p => p.Value.Hashes));
stored_header_count += (uint)header_index.Count;
if (stored_header_count == 0)
{
header_index.AddRange(store.GetBlocks().Find().OrderBy(p => p.Value.Index).Select(p => p.Key));
header_index.AddRange(View.Blocks.Find().OrderBy(p => p.Value.Index).Select(p => p.Key));
}
else
{
HashIndexState hashIndex = store.GetHeaderHashIndex().Get();
HashIndexState hashIndex = View.HeaderHashIndex.Get();
if (hashIndex.Index >= stored_header_count)
{
DataCache<UInt256, TrimmedBlock> cache = store.GetBlocks();
DataCache<UInt256, TrimmedBlock> cache = View.Blocks;
for (UInt256 hash = hashIndex.Hash; hash != header_index[(int)stored_header_count - 1];)
{
header_index.Insert((int)stored_header_count, hash);
Expand All @@ -139,13 +141,13 @@ public Blockchain(NeoSystem system, Store store)
public bool ContainsBlock(UInt256 hash)
{
if (block_cache.ContainsKey(hash)) return true;
return Store.ContainsBlock(hash);
return View.ContainsBlock(hash);
}

public bool ContainsTransaction(UInt256 hash)
{
if (MemPool.ContainsKey(hash)) return true;
return Store.ContainsTransaction(hash);
return View.ContainsTransaction(hash);
}

private static Transaction DeployNativeContracts()
Expand Down Expand Up @@ -175,11 +177,19 @@ private static Transaction DeployNativeContracts()
};
}

public Block GetBlock(uint index)
{
if (index == 0) return GenesisBlock;
UInt256 hash = GetBlockHash(index);
if (hash == null) return null;
return GetBlock(hash);
}

public Block GetBlock(UInt256 hash)
{
if (block_cache.TryGetValue(hash, out Block block))
return block;
return Store.GetBlock(hash);
return View.GetBlock(hash);
}

public UInt256 GetBlockHash(uint index)
Expand All @@ -193,16 +203,38 @@ public static UInt160 GetConsensusAddress(ECPoint[] validators)
return Contract.CreateMultiSigRedeemScript(validators.Length - (validators.Length - 1) / 3, validators).ToScriptHash();
}

public Snapshot GetSnapshot()
public Header GetHeader(uint index)
{
if (index == 0) return GenesisBlock.Header;
UInt256 hash = GetBlockHash(index);
if (hash == null) return null;
return GetHeader(hash);
}

public Header GetHeader(UInt256 hash)
{
if (block_cache.TryGetValue(hash, out Block block))
return block.Header;
return View.GetHeader(hash);
}

public UInt256 GetNextBlockHash(UInt256 hash)
{
Header header = GetHeader(hash);
if (header == null) return null;
return GetBlockHash(header.Index + 1);
}

public SnapshotView GetSnapshot()
{
return Store.GetSnapshot();
return new SnapshotView(Store);
}

public Transaction GetTransaction(UInt256 hash)
{
if (MemPool.TryGetValue(hash, out Transaction transaction))
return transaction;
return Store.GetTransaction(hash);
return View.GetTransaction(hash);
}

private void OnImport(IEnumerable<Block> blocks)
Expand Down Expand Up @@ -237,7 +269,7 @@ private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
// Add the transactions to the memory pool
foreach (var tx in transactions)
{
if (Store.ContainsTransaction(tx.Hash))
if (View.ContainsTransaction(tx.Hash))
continue;
if (!NativeContract.Policy.CheckPolicy(tx, currentSnapshot))
continue;
Expand Down Expand Up @@ -320,7 +352,7 @@ private RelayResultReason OnNewBlock(Block block)
if (block.Index == header_index.Count)
{
header_index.Add(block.Hash);
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
snapshot.Blocks.Add(block.Hash, block.Header.Trim());
snapshot.HeaderHashIndex.GetAndChange().Set(block);
Expand All @@ -344,7 +376,7 @@ private RelayResultReason OnNewConsensus(ConsensusPayload payload)

private void OnNewHeaders(Header[] headers)
{
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
foreach (Header header in headers)
{
Expand Down Expand Up @@ -425,7 +457,7 @@ protected override void OnReceive(object message)

private void Persist(Block block)
{
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
List<ApplicationExecuted> all_application_executed = new List<ApplicationExecuted>();
snapshot.PersistingBlock = block;
Expand Down Expand Up @@ -503,12 +535,12 @@ protected override void PostStop()
currentSnapshot?.Dispose();
}

public static Props Props(NeoSystem system, Store store)
public static Props Props(NeoSystem system, IStore store)
{
return Akka.Actor.Props.Create(() => new Blockchain(system, store)).WithMailbox("blockchain-mailbox");
}

private void SaveHeaderHashList(Snapshot snapshot = null)
private void SaveHeaderHashList(SnapshotView snapshot = null)
{
if ((header_index.Count - stored_header_count < 2000))
return;
Expand Down
8 changes: 4 additions & 4 deletions src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MemoryPool(NeoSystem system, int capacity)
Capacity = capacity;
}

internal bool LoadPolicy(Snapshot snapshot)
internal bool LoadPolicy(StoreView snapshot)
{
_maxTxPerBlock = (int)NativeContract.Policy.GetMaxTransactionsPerBlock(snapshot);
long newFeePerByte = NativeContract.Policy.GetFeePerByte(snapshot);
Expand Down Expand Up @@ -348,7 +348,7 @@ internal void InvalidateVerifiedTransactions()
}

// Note: this must only be called from a single thread (the Blockchain actor)
internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
internal void UpdatePoolForBlockPersisted(Block block, StoreView snapshot)
{
bool policyChanged = LoadPolicy(snapshot);

Expand Down Expand Up @@ -407,7 +407,7 @@ internal void InvalidateAllTransactions()
}

private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double millisecondsTimeout, Snapshot snapshot)
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double millisecondsTimeout, StoreView snapshot)
{
DateTime reverifyCutOffTimeStamp = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
List<PoolItem> reverifiedItems = new List<PoolItem>(count);
Expand Down Expand Up @@ -483,7 +483,7 @@ internal void InvalidateAllTransactions()
/// <param name="maxToVerify">Max transactions to reverify, the value passed can be >=1</param>
/// <param name="snapshot">The snapshot to use for verifying.</param>
/// <returns>true if more unsorted messages exist, otherwise false</returns>
internal bool ReVerifyTopUnverifiedTransactionsIfNeeded(int maxToVerify, Snapshot snapshot)
internal bool ReVerifyTopUnverifiedTransactionsIfNeeded(int maxToVerify, StoreView snapshot)
{
if (Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight)
return false;
Expand Down
6 changes: 3 additions & 3 deletions src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public class NeoSystem : IDisposable
public IActorRef Consensus { get; private set; }
public RpcServer RpcServer { get; private set; }

private readonly Store store;
private readonly IStore store;
private ChannelsConfig start_message = null;
private bool suspend = false;

public NeoSystem(Store store)
public NeoSystem(IStore store)
{
this.store = store;
Plugin.LoadPlugins(this);
Expand Down Expand Up @@ -69,7 +69,7 @@ internal void ResumeNodeStartup()
}
}

public void StartConsensus(Wallet wallet, Store consensus_store = null, bool ignoreRecoveryLogs = false)
public void StartConsensus(Wallet wallet, IStore consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
Expand Down
Loading

0 comments on commit f45a8b7

Please sign in to comment.