Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,15 @@ void ConfigureNode(IApplicationBuilder app)
}

void StartNode(IApplicationBuilder app) {
try {
StartNodeCore(app);
} catch (AggregateException aggEx) when (aggEx.InnerException is { } innerEx) {
// We only really care that *something* is wrong - throw the first inner exception.
throw innerEx;
}
}

void StartNodeCore(IApplicationBuilder app) {
// TRUNCATE IF NECESSARY
var truncPos = Db.Config.TruncateCheckpoint.Read();
if (truncPos != -1)
Expand Down
88 changes: 40 additions & 48 deletions src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,62 +91,54 @@ public async ValueTask Open(bool verifyHash = true, bool readOnly = false, int t
var lastChunkVersions = Config.FileNamingStrategy.GetAllVersionsFor(lastChunkNum);
var chunkEnumerator = new TFChunkEnumerator(Config.FileNamingStrategy);

try
{
await Parallel.ForEachAsync(
GetAllLatestChunksExceptLast(chunkEnumerator, lastChunkNum,
token), // the last chunk is dealt with separately
new ParallelOptions { MaxDegreeOfParallelism = threads, CancellationToken = token },
async (chunkInfo, token) =>
await Parallel.ForEachAsync(
GetAllLatestChunksExceptLast(chunkEnumerator, lastChunkNum,
token), // the last chunk is dealt with separately
new ParallelOptions { MaxDegreeOfParallelism = threads, CancellationToken = token },
async (chunkInfo, token) =>
{
TFChunk.TFChunk chunk;
if (lastChunkVersions.Length == 0 &&
(chunkInfo.ChunkStartNumber + 1) * (long)Config.ChunkSize == checkpoint)
{
TFChunk.TFChunk chunk;
if (lastChunkVersions.Length == 0 &&
(chunkInfo.ChunkStartNumber + 1) * (long)Config.ChunkSize == checkpoint)
{
// The situation where the logical data size is exactly divisible by ChunkSize,
// so it might happen that we have checkpoint indicating one more chunk should exist,
// but the actual last chunk is (lastChunkNum-1) one and it could be not completed yet -- perfectly valid situation.
var footer = await ReadChunkFooter(chunkInfo.ChunkFileName, token);
if (footer.IsCompleted)
chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false,
unbufferedRead: Config.Unbuffered,
tracker: _tracker,
reduceFileCachePressure: Config.ReduceFileCachePressure,
getTransformFactory: TransformManager.GetFactoryForExistingChunk,
token: token);
else
{
chunk = await TFChunk.TFChunk.FromOngoingFile(chunkInfo.ChunkFileName, Config.ChunkSize,
unbuffered: Config.Unbuffered,
writethrough: Config.WriteThrough,
reduceFileCachePressure: Config.ReduceFileCachePressure,
tracker: _tracker,
getTransformFactory: TransformManager.GetFactoryForExistingChunk,
token);
// chunk is full with data, we should complete it right here
if (!readOnly)
await chunk.Complete(token);
}
}
else
{
// The situation where the logical data size is exactly divisible by ChunkSize,
// so it might happen that we have checkpoint indicating one more chunk should exist,
// but the actual last chunk is (lastChunkNum-1) one and it could be not completed yet -- perfectly valid situation.
var footer = await ReadChunkFooter(chunkInfo.ChunkFileName, token);
if (footer.IsCompleted)
chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false,
unbufferedRead: Config.Unbuffered,
reduceFileCachePressure: Config.ReduceFileCachePressure,
tracker: _tracker,
reduceFileCachePressure: Config.ReduceFileCachePressure,
getTransformFactory: TransformManager.GetFactoryForExistingChunk,
token: token);
else
{
chunk = await TFChunk.TFChunk.FromOngoingFile(chunkInfo.ChunkFileName, Config.ChunkSize,
unbuffered: Config.Unbuffered,
writethrough: Config.WriteThrough,
reduceFileCachePressure: Config.ReduceFileCachePressure,
tracker: _tracker,
getTransformFactory: TransformManager.GetFactoryForExistingChunk,
token);
// chunk is full with data, we should complete it right here
if (!readOnly)
await chunk.Complete(token);
}
}
else
{
chunk = await TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false,
unbufferedRead: Config.Unbuffered,
reduceFileCachePressure: Config.ReduceFileCachePressure,
tracker: _tracker,
getTransformFactory: TransformManager.GetFactoryForExistingChunk,
token: token);
}

// This call is theadsafe.
await Manager.AddChunk(chunk, token);
});
}
catch (AggregateException aggEx)
{
// We only really care that *something* is wrong - throw the first inner exception.
throw aggEx.InnerException;
}
// This call is theadsafe.
await Manager.AddChunk(chunk, token);
});

if (lastChunkVersions.Length == 0)
{
Expand Down