From b399bdbd4af754b3fb7025d2d40496facee825a1 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 2 Feb 2026 13:34:25 -0500 Subject: [PATCH] Improve error handling in ClusterVNode and refactor TFChunkDb processing Changed: Added error handling in StartNode method to throw the first inner exception from AggregateException. Signed-off-by: Yordis Prieto --- src/EventStore.Core/ClusterVNode.cs | 9 ++ .../TransactionLog/Chunks/TFChunkDb.cs | 88 +++++++++---------- 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index a7aa632af..f93ba0a47 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -1664,6 +1664,15 @@ void ConfigureNode(IApplicationBuilder app) } void StartNode(IApplicationBuilder app) { + try { + StartNodeCore(app); + } catch (AggregateException aggEx) when (aggEx.InnerException is { } innerEx) { + // We only really care that *something* is wrong - throw the first inner exception. + throw innerEx; + } + } + + void StartNodeCore(IApplicationBuilder app) { // TRUNCATE IF NECESSARY var truncPos = Db.Config.TruncateCheckpoint.Read(); if (truncPos != -1) diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs index d3d64823f..6f9f5d630 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs @@ -91,62 +91,54 @@ public async ValueTask Open(bool verifyHash = true, bool readOnly = false, int t var lastChunkVersions = Config.FileNamingStrategy.GetAllVersionsFor(lastChunkNum); var chunkEnumerator = new TFChunkEnumerator(Config.FileNamingStrategy); - try - { - await Parallel.ForEachAsync( - GetAllLatestChunksExceptLast(chunkEnumerator, lastChunkNum, - token), // the last chunk is dealt with separately - new ParallelOptions { MaxDegreeOfParallelism = threads, CancellationToken = token }, - async (chunkInfo, token) => + await Parallel.ForEachAsync( + GetAllLatestChunksExceptLast(chunkEnumerator, lastChunkNum, + token), // the last chunk is dealt with separately + new ParallelOptions { MaxDegreeOfParallelism = threads, CancellationToken = token }, + async (chunkInfo, token) => + { + TFChunk.TFChunk chunk; + if (lastChunkVersions.Length == 0 && + (chunkInfo.ChunkStartNumber + 1) * (long)Config.ChunkSize == checkpoint) { - TFChunk.TFChunk chunk; - if (lastChunkVersions.Length == 0 && - (chunkInfo.ChunkStartNumber + 1) * (long)Config.ChunkSize == checkpoint) - { - // The situation where the logical data size is exactly divisible by ChunkSize, - // so it might happen that we have checkpoint indicating one more chunk should exist, - // but the actual last chunk is (lastChunkNum-1) one and it could be not completed yet -- perfectly valid situation. - var footer = await ReadChunkFooter(chunkInfo.ChunkFileName, token); - if (footer.IsCompleted) - chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false, - unbufferedRead: Config.Unbuffered, - tracker: _tracker, - reduceFileCachePressure: Config.ReduceFileCachePressure, - getTransformFactory: TransformManager.GetFactoryForExistingChunk, - token: token); - else - { - chunk = await TFChunk.TFChunk.FromOngoingFile(chunkInfo.ChunkFileName, Config.ChunkSize, - unbuffered: Config.Unbuffered, - writethrough: Config.WriteThrough, - reduceFileCachePressure: Config.ReduceFileCachePressure, - tracker: _tracker, - getTransformFactory: TransformManager.GetFactoryForExistingChunk, - token); - // chunk is full with data, we should complete it right here - if (!readOnly) - await chunk.Complete(token); - } - } - else - { + // The situation where the logical data size is exactly divisible by ChunkSize, + // so it might happen that we have checkpoint indicating one more chunk should exist, + // but the actual last chunk is (lastChunkNum-1) one and it could be not completed yet -- perfectly valid situation. + var footer = await ReadChunkFooter(chunkInfo.ChunkFileName, token); + if (footer.IsCompleted) chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false, unbufferedRead: Config.Unbuffered, - reduceFileCachePressure: Config.ReduceFileCachePressure, tracker: _tracker, + reduceFileCachePressure: Config.ReduceFileCachePressure, getTransformFactory: TransformManager.GetFactoryForExistingChunk, token: token); + else + { + chunk = await TFChunk.TFChunk.FromOngoingFile(chunkInfo.ChunkFileName, Config.ChunkSize, + unbuffered: Config.Unbuffered, + writethrough: Config.WriteThrough, + reduceFileCachePressure: Config.ReduceFileCachePressure, + tracker: _tracker, + getTransformFactory: TransformManager.GetFactoryForExistingChunk, + token); + // chunk is full with data, we should complete it right here + if (!readOnly) + await chunk.Complete(token); } + } + else + { + chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false, + unbufferedRead: Config.Unbuffered, + reduceFileCachePressure: Config.ReduceFileCachePressure, + tracker: _tracker, + getTransformFactory: TransformManager.GetFactoryForExistingChunk, + token: token); + } - // This call is theadsafe. - await Manager.AddChunk(chunk, token); - }); - } - catch (AggregateException aggEx) - { - // We only really care that *something* is wrong - throw the first inner exception. - throw aggEx.InnerException; - } + // This call is theadsafe. + await Manager.AddChunk(chunk, token); + }); if (lastChunkVersions.Length == 0) {