Skip to content

Commit

Permalink
RavenDB-22294 Fix include cmpXchg & atomic-guard
Browse files Browse the repository at this point in the history
  • Loading branch information
haludi committed May 10, 2024
1 parent 43d123e commit c203c43
Show file tree
Hide file tree
Showing 20 changed files with 640 additions and 186 deletions.
10 changes: 7 additions & 3 deletions src/Raven.Client/Documents/Session/ClusterTransactionSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,15 @@ internal void RegisterCompareExchangeValues(BlittableJsonReaderObject values, bo

var val = CompareExchangeValueResultParser<BlittableJsonReaderObject>.GetSingleValue(value, materializeMetadata: false, _session.Conventions);
if(includingMissingAtomicGuards &&
val.Key.StartsWith(Constants.CompareExchange.RvnAtomicPrefix, StringComparison.OrdinalIgnoreCase) &&
ClusterWideTransactionHelper.IsAtomicGuardKey(val.Key) &&
val.ChangeVector != null)
{
_missingDocumentsToAtomicGuardIndex ??= new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
_missingDocumentsToAtomicGuardIndex.Add(val.Key.Substring(Constants.CompareExchange.RvnAtomicPrefix.Length), val.ChangeVector);
_missingDocumentsToAtomicGuardIndex.Add(ClusterWideTransactionHelper.ExtractDocumentIdFromAtomicGuard(val.Key), val.ChangeVector);
}
else if (val.Index < 0)
{
RegisterMissingCompareExchangeValue(val.Key);
}
else
{
Expand All @@ -265,7 +269,7 @@ internal CompareExchangeSessionValue RegisterCompareExchangeValue(CompareExchang
{
Debug.Assert(value != null, "value != null");

if (value.Key.StartsWith(Constants.CompareExchange.RvnAtomicPrefix, StringComparison.InvariantCultureIgnoreCase))
if (ClusterWideTransactionHelper.IsAtomicGuardKey(value.Key))
throw new InvalidOperationException($"'{value.Key}' is an atomic guard and you cannot load it via the session");

if (_session.NoTracking)
Expand Down
25 changes: 25 additions & 0 deletions src/Raven.Client/Util/ClusterWideTransactionHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using Sparrow.Json;

namespace Raven.Client.Util;

public static class ClusterWideTransactionHelper
{
public static bool IsAtomicGuardKey(string id, out string docId)
{
if (IsAtomicGuardKey(id))
{
docId = null;
return false;
}

docId = ExtractDocumentIdFromAtomicGuard(id);
return true;
}

public static bool IsAtomicGuardKey(string key) => key.StartsWith(Constants.CompareExchange.RvnAtomicPrefix, StringComparison.OrdinalIgnoreCase);

public static string GetAtomicGuardKey(string docId) => Constants.CompareExchange.RvnAtomicPrefix + docId;

public static string ExtractDocumentIdFromAtomicGuard(string key) => key.Substring(Constants.CompareExchange.RvnAtomicPrefix.Length);
}
48 changes: 48 additions & 0 deletions src/Raven.Server/Documents/ClusterTransactionIndexWaiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Sparrow.Server;
using Sparrow.Utils;

namespace Raven.Server.Documents;

public class ClusterTransactionIndexWaiter
{
private long _lastCompletedIndex;
private readonly AsyncManualResetEvent _notifiedListeners = new AsyncManualResetEvent();

public long LastIndex => Interlocked.Read(ref _lastCompletedIndex);

public void SetAndNotifyListenersIfHigher(long newIndex)
{
if (ThreadingHelper.InterlockedExchangeMax(ref _lastCompletedIndex, newIndex))
{
_notifiedListeners.SetAndResetAtomically();
}
}

public async Task WaitAsync(long index, CancellationToken token)
{
while (true)
{
Task waitAsync = _notifiedListeners.WaitAsync(token);
long lastIndex = LastIndex;
if (index <= lastIndex)
break;

try
{
await waitAsync;
}
catch (TaskCanceledException)
{
ThrowCanceledException(index, lastIndex);
}
}
}

private static void ThrowCanceledException(long index, long lastModifiedIndex)
{
throw new OperationCanceledException($"Cancelled while waiting to get an index notification for {index}. lastModifiedIndex {lastModifiedIndex}");
}
}
6 changes: 2 additions & 4 deletions src/Raven.Server/Documents/DatabasesLandlord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private async Task HandleClusterDatabaseChanged(string databaseName, long index,
await database.StateChangedAsync(index);
if (type == ClusterStateMachine.SnapshotInstalled)
{
database.NotifyOnPendingClusterTransaction(index, changeType);
database.NotifyOnPendingClusterTransaction();
}
break;

Expand All @@ -175,13 +175,12 @@ private async Task HandleClusterDatabaseChanged(string databaseName, long index,
break;

case ClusterDatabaseChangeType.PendingClusterTransactions:
case ClusterDatabaseChangeType.ClusterTransactionCompleted:
if (ForTestingPurposes?.BeforeHandleClusterTransactionOnDatabaseChanged != null)
await ForTestingPurposes.BeforeHandleClusterTransactionOnDatabaseChanged.Invoke(_serverStore);

database.DatabaseGroupId = topology.DatabaseTopologyIdBase64;
database.ClusterTransactionId = topology.ClusterTransactionIdBase64;
database.NotifyOnPendingClusterTransaction(index, changeType);
database.NotifyOnPendingClusterTransaction();
break;

default:
Expand Down Expand Up @@ -1003,7 +1002,6 @@ public enum ClusterDatabaseChangeType
RecordRestored,
ValueChanged,
PendingClusterTransactions,
ClusterTransactionCompleted
}

public bool UnloadDirectly(StringSegment databaseName, DateTime? wakeup = null, [CallerMemberName] string caller = null)
Expand Down
55 changes: 30 additions & 25 deletions src/Raven.Server/Documents/DocumentDatabase.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Raven.Client.Documents.Operations.Configuration;
Expand All @@ -31,7 +28,6 @@
using Raven.Server.Documents.Subscriptions;
using Raven.Server.Documents.TcpHandlers;
using Raven.Server.Documents.TimeSeries;
using Raven.Server.Json;
using Raven.Server.NotificationCenter.Notifications;
using Raven.Server.NotificationCenter.Notifications.Details;
using Raven.Server.Routing;
Expand All @@ -46,7 +42,6 @@
using Sparrow;
using Sparrow.Backups;
using Sparrow.Collections;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Json.Sync;
using Sparrow.Logging;
Expand All @@ -55,12 +50,10 @@
using Sparrow.Server.Meters;
using Sparrow.Server.Utils;
using Sparrow.Threading;
using Sparrow.Utils;
using Voron;
using Voron.Data.Tables;
using Voron.Exceptions;
using Voron.Impl.Backup;
using static Raven.Server.Documents.DatabasesLandlord;
using Constants = Raven.Client.Constants;
using DatabaseInfo = Raven.Client.ServerWide.Operations.DatabaseInfo;
using DatabaseSmuggler = Raven.Server.Smuggler.Documents.DatabaseSmuggler;
Expand Down Expand Up @@ -198,7 +191,6 @@ public DocumentDatabase(string name, RavenConfiguration configuration, ServerSto
}
}


public ServerStore ServerStore => _serverStore;

public DateTime LastIdleTime => new DateTime(_lastIdleTicks);
Expand Down Expand Up @@ -372,7 +364,7 @@ record = _serverStore.Cluster.ReadDatabase(context, Name, out index);
using (ctx.OpenReadTransaction())
{
var lastCompletedClusterTransactionIndex = DocumentsStorage.ReadLastCompletedClusterTransactionIndex(ctx.Transaction.InnerTransaction);
Interlocked.Exchange(ref LastCompletedClusterTransactionIndex, lastCompletedClusterTransactionIndex);
ClusterTransactionIndexWaiter.SetAndNotifyListenersIfHigher(lastCompletedClusterTransactionIndex);
}

_ = Task.Run(async () =>
Expand Down Expand Up @@ -452,22 +444,16 @@ internal DatabaseDisabledException CreateDatabaseShutdownException(Exception e =
private readonly ManualResetEventSlim _hasClusterTransaction;
public readonly DatabaseMetricCacher MetricCacher;

public void NotifyOnPendingClusterTransaction(long index, DatabasesLandlord.ClusterDatabaseChangeType changeType)
public void NotifyOnPendingClusterTransaction()
{
if (changeType == DatabasesLandlord.ClusterDatabaseChangeType.ClusterTransactionCompleted)
{
RachisLogIndexNotifications.NotifyListenersAbout(index, e: null);
return;
}

_hasClusterTransaction.Set();
}

private long? _nextClusterCommand;
private long _lastCompletedClusterTransaction;
public long LastCompletedClusterTransaction => _lastCompletedClusterTransaction;

public long LastCompletedClusterTransactionIndex;
public readonly ClusterTransactionIndexWaiter ClusterTransactionIndexWaiter = new ClusterTransactionIndexWaiter();
public bool IsEncrypted => MasterKey != null;

private PoolOfThreads.LongRunningWork _clusterTransactionsThread;
Expand Down Expand Up @@ -531,10 +517,8 @@ private void ExecuteClusterTransaction()

public List<ClusterTransactionCommand.SingleClusterDatabaseCommand> ExecuteClusterTransaction(TransactionOperationContext context, int batchSize)
{
var batch = new List<ClusterTransactionCommand.SingleClusterDatabaseCommand>(
ClusterTransactionCommand.ReadCommandsBatch(context, Name, fromCount: _nextClusterCommand,
lastCompletedClusterTransactionIndex: LastCompletedClusterTransactionIndex, take: batchSize));

var (batch, maxIndex) = ReadCommandBatch(context, batchSize);

ServerStore.ForTestingPurposes?.BeforeExecuteClusterTransactionBatch?.Invoke(Name, batch);

Stopwatch stopwatch = null;
Expand All @@ -544,9 +528,12 @@ public List<ClusterTransactionCommand.SingleClusterDatabaseCommand> ExecuteClust
//_nextClusterCommand refers to each individual put/delete while batch size refers to number of transaction (each contains multiple commands)
_logger.Info($"Read {batch.Count:#,#;;0} cluster transaction commands - fromCount: {_nextClusterCommand}, take: {batchSize}");
}

if (batch.Count == 0)
{
if(maxIndex != -1)
ClusterTransactionIndexWaiter.SetAndNotifyListenersIfHigher(maxIndex);

var index = _serverStore.Cluster.GetLastCompareExchangeIndexForDatabase(context, Name);

if (RachisLogIndexNotifications.LastModifiedIndex != index)
Expand Down Expand Up @@ -574,9 +561,11 @@ public List<ClusterTransactionCommand.SingleClusterDatabaseCommand> ExecuteClust
}

ExecuteClusterTransactionOneByOne(batch);
ClusterTransactionIndexWaiter.SetAndNotifyListenersIfHigher(maxIndex);
return batch;
}

ClusterTransactionIndexWaiter.SetAndNotifyListenersIfHigher(maxIndex);
foreach (var command in batch)
{
OnClusterTransactionCompletion(command, mergedCommands);
Expand Down Expand Up @@ -607,6 +596,20 @@ public List<ClusterTransactionCommand.SingleClusterDatabaseCommand> ExecuteClust
return batch;
}

private (List<ClusterTransactionCommand.SingleClusterDatabaseCommand> Batch, long MaxIndex) ReadCommandBatch(TransactionOperationContext context, int batchSize)
{
var batch = new List<ClusterTransactionCommand.SingleClusterDatabaseCommand>();
var maxIndex = -1L;
foreach (var command in ClusterTransactionCommand.ReadCommandsBatch(context, Name, fromCount: _nextClusterCommand, take: batchSize))
{
if(command.Commands.Count > 0)
batch.Add(command);

maxIndex = command.Index;
}
return (batch, maxIndex);
}

private void ExecuteClusterTransactionOneByOne(List<ClusterTransactionCommand.SingleClusterDatabaseCommand> batch)
{
foreach (var command in batch)
Expand All @@ -618,9 +621,13 @@ private void ExecuteClusterTransactionOneByOne(List<ClusterTransactionCommand.Si
var mergedCommand = new BatchHandler.ClusterTransactionMergedCommand(this, singleCommand);
try
{
//The batch list only includes command with database commands.
//We want to update also indexes for cluster trx commands without documents commands (has only cmpXchg commands)
//So we update here until the index before the current command (and before executing it)
ClusterTransactionIndexWaiter.SetAndNotifyListenersIfHigher(command.Index - 1);
TxMerger.EnqueueSync(mergedCommand);
OnClusterTransactionCompletion(command, mergedCommand);

_clusterTransactionDelayOnFailure = 1000;
command.Processed = true;
}
Expand Down Expand Up @@ -653,8 +660,6 @@ private void OnClusterTransactionCompletion(ClusterTransactionCommand.SingleClus
var index = command.Index;
var options = mergedCommands.Options[index];

ThreadingHelper.InterlockedExchangeMax(ref LastCompletedClusterTransactionIndex, index);

ClusterTransactionWaiter.TrySetResult(options.TaskId, index, mergedCommands.ModifiedCollections);

_nextClusterCommand = command.PreviousCount + command.Commands.Count;
Expand Down
6 changes: 3 additions & 3 deletions src/Raven.Server/Documents/DocumentPutAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
using System.Linq;
using Raven.Client.Exceptions;
using Raven.Client.Exceptions.Documents;
using Raven.Client.Util;
using Sparrow.Server;
using static Raven.Server.Documents.DocumentsStorage;
using Constants = Raven.Client.Constants;
using Raven.Server.ServerWide;
using Raven.Server.ServerWide.Commands;

namespace Raven.Server.Documents
{
Expand Down Expand Up @@ -79,13 +79,13 @@ public void ValidateAtomicGuard(string id, NonPersistentDocumentFlags nonPersist
using (_serverStore.ContextPool.AllocateOperationContext(out TransactionOperationContext clusterContext))
using (clusterContext.OpenReadTransaction())
{
var guardId = CompareExchangeKey.GetStorageKey(_parent._documentDatabase.Name, ClusterTransactionCommand.GetAtomicGuardKey(id));
var guardId = CompareExchangeKey.GetStorageKey(_parent._documentDatabase.Name, ClusterWideTransactionHelper.GetAtomicGuardKey(id));
var (indexFromCluster, val) = _serverStore.Cluster.GetCompareExchangeValue(clusterContext, guardId);
if (indexFromChangeVector != indexFromCluster)
{
throw new ConcurrencyException(
$"Cannot PUT document '{id}' because its change vector's cluster transaction index is set to {indexFromChangeVector} " +
$"but the compare exchange guard ('{ClusterTransactionCommand.GetAtomicGuardKey(id)}') is {(val == null ? "missing" : $"set to {indexFromCluster}")}")
$"but the compare exchange guard ('{ClusterWideTransactionHelper.GetAtomicGuardKey(id)}') is {(val == null ? "missing" : $"set to {indexFromCluster}")}")
{
Id = id
};
Expand Down
8 changes: 3 additions & 5 deletions src/Raven.Server/Documents/Handlers/BatchHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
using Raven.Client.Documents.Changes;
using Raven.Client.Documents.Commands.Batches;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Linq;
using Raven.Client.Documents.Operations.Attachments;
using Raven.Client.Documents.Operations.CompareExchange;
using Raven.Client.Documents.Operations.Counters;
Expand All @@ -25,10 +24,10 @@
using Raven.Client.Extensions;
using Raven.Client.Json;
using Raven.Client.ServerWide;
using Raven.Client.Util;
using Raven.Server.Documents.Indexes;
using Raven.Server.Documents.Patch;
using Raven.Server.Documents.PeriodicBackup;
using Raven.Server.Documents.Replication;
using Raven.Server.Documents.TimeSeries;
using Raven.Server.Json;
using Raven.Server.Rachis;
Expand All @@ -39,7 +38,6 @@
using Raven.Server.Smuggler;
using Raven.Server.TrafficWatch;
using Raven.Server.Utils;
using Raven.Server.Web;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Server;
Expand Down Expand Up @@ -173,7 +171,7 @@ private void ValidateCommandForClusterWideTransaction(ArraySegment<BatchRequestP

if (disableAtomicDocumentWrites == false)
{
if (ClusterTransactionCommand.IsAtomicGuardKey(commandData.Id, out _))
if (ClusterWideTransactionHelper.IsAtomicGuardKey(commandData.Id))
throw new CompareExchangeInvalidKeyException($"You cannot manipulate the atomic guard '{commandData.Id}' via the cluster-wide session");
}

Expand Down Expand Up @@ -330,7 +328,7 @@ private void ThrowClusterTransactionConcurrencyException(List<ClusterTransaction

private async Task WaitForDatabaseCompletion(Task<HashSet<string>> onDatabaseCompletionTask, long index, ClusterTransactionOptions options, CancellationToken token)
{
var lastCompleted = Interlocked.Read(ref Database.LastCompletedClusterTransactionIndex);
var lastCompleted = Database.ClusterTransactionIndexWaiter.LastIndex;
HashSet<string> modifiedCollections = null;
if (lastCompleted < index)
modifiedCollections = await onDatabaseCompletionTask; // already registered to the token
Expand Down
Loading

0 comments on commit c203c43

Please sign in to comment.