Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions FoundationDB.Client/Core/IFdbTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public interface IFdbTransactionHandler : IDisposable
/// </remarks>
long GetCommittedVersion();

/// <summary>Returns the <see cref="VersionStamp"/> which was used by versionstamps operations in this transaction.</summary>
Task<VersionStamp> GetVersionStampAsync(CancellationToken ct);

/// <summary>Sets the snapshot read version used by a transaction. This is not needed in simple cases.</summary>
/// <param name="version">Read version to use in this transaction</param>
/// <remarks>
Expand Down
8 changes: 7 additions & 1 deletion FoundationDB.Client/FdbMutationType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ public enum FdbMutationType
/// If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
/// The smaller of the two values is then stored in the database.
/// </summary>
Min = 13
Min = 13,

//TODO: XML Comments!
VersionStampedKey = 14,

//TODO: XML Comments!
VersionStampedValue = 15,

}

Expand Down
100 changes: 100 additions & 0 deletions FoundationDB.Client/FdbTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public sealed partial class FdbTransaction : IFdbTransaction
/// <summary>CancellationToken that should be used for all async operations executing inside this transaction</summary>
private CancellationToken m_cancellation;

/// <summary>Random token (but constant per transaction retry) used to generate incomplete VersionStamps</summary>
private ulong m_versionStampToken;

#endregion

#region Constructors...
Expand Down Expand Up @@ -284,6 +287,81 @@ public void SetReadVersion(long version)
m_handler.SetReadVersion(version);
}

/// <summary>Returns the <see cref="VersionStamp"/> which was used by versionstamps operations in this transaction.</summary>
/// <remarks>
/// The Task will be ready only after the successful completion of a call to <see cref="CommitAsync"/> on this transaction.
/// Read-only transactions do not modify the database when committed and will result in the Task completing with an error.
/// Keep in mind that a transaction which reads keys and then sets them to their current values may be optimized to a read-only transaction.
/// </remarks>
public Task<VersionStamp> GetVersionStampAsync()
{
EnsureNotFailedOrDisposed();

return m_handler.GetVersionStampAsync(m_cancellation);
}

private ulong GenerateNewVersionStampToken()
{
// We need to generate a 80-bits stamp, and also need to mark it as 'incomplete' by forcing the highest bit to 1.
// Since this is supposed to be a version number with a ~1M tickrate per seconds, we will play it safe, and force the 8 highest bits to 1,
// meaning that we only reduce the database potential lifetime but 1/256th, before getting into trouble.
//
// By doing some empirical testing, it also seems that the last 16 bits are a transction batch order which is usually a low number.
// Again, we will force the 4 highest bit to 1 to reduce the change of collision with a complete version stamp.
//
// So the final token will look like: 'FF xx xx xx xx xx xx xx Fy yy', were 'x' is the random token, and 'y' will lowest 12 bits of the transaction retry count

var rnd = new Random(); //TODO: singleton? (need locking!!)
ulong x;
unsafe
{
double r = rnd.NextDouble();
x = *(ulong*) &r;
}
x |= 0xFF00000000000000UL;

lock (this)
{
ulong token = m_versionStampToken;
if (token == 0)
{
token = x;
m_versionStampToken = x;
}
return token;
}
}

/// <summary>Return a place-holder 80-bit VersionStamp, whose value is not yet known, but will be filled by the database at commit time.</summary>
/// <returns>This value can used to generate temporary keys or value, for use with the <see cref="FdbMutationType.VersionStampedKey"/> or <see cref="FdbMutationType.VersionStampedValue"/> mutations</returns>
/// <remarks>
/// The generate placeholder will use a random value that is unique per transaction (and changes at reach retry).
/// If the key contains the exact 80-bit byte signature of this token, the corresponding location will be tagged and replaced with the actual VersionStamp at commit time.
/// If another part of the key contains (by random chance) the same exact byte sequence, then an error will be triggered, and hopefully the transaction will retry with another byte sequence.
/// </remarks>
[Pure]
public VersionStamp CreateVersionStamp()
{
var token = m_versionStampToken;
if (token == 0) token = GenerateNewVersionStampToken();
return VersionStamp.Custom(token, (ushort) (m_context.Retries | 0xF000), incomplete: true);
}

/// <summary>Return a place-holder 96-bit VersionStamp with an attached user version, whose value is not yet known, but will be filled by the database at commit time.</summary>
/// <returns>This value can used to generate temporary keys or value, for use with the <see cref="FdbMutationType.VersionStampedKey"/> or <see cref="FdbMutationType.VersionStampedValue"/> mutations</returns>
/// <remarks>
/// The generate placeholder will use a random value that is unique per transaction (and changes at reach retry).
/// If the key contains the exact 80-bit byte signature of this token, the corresponding location will be tagged and replaced with the actual VersionStamp at commit time.
/// If another part of the key contains (by random chance) the same exact byte sequence, then an error will be triggered, and hopefully the transaction will retry with another byte sequence.
/// </remarks>
public VersionStamp CreateVersionStamp(int userVersion)
{
var token = m_versionStampToken;
if (token == 0) token = GenerateNewVersionStampToken();

return VersionStamp.Custom(token, (ushort) (m_context.Retries | 0xF000), userVersion, incomplete: true);
}

#endregion

#region Get...
Expand Down Expand Up @@ -513,6 +591,23 @@ private static void EnsureMutationTypeIsSupported(FdbMutationType mutation, int
return;
}

if (mutation == FdbMutationType.VersionStampedKey || mutation == FdbMutationType.VersionStampedValue)
{
if (selectedApiVersion < 400)
{
if (Fdb.GetMaxApiVersion() >= 400)
{
throw new FdbException(FdbError.InvalidMutationType, "Atomic mutations for VersionStamps are only supported starting from API level 400. You need to select API level 400 or more at the start of your process.");
}
else
{
throw new FdbException(FdbError.InvalidMutationType, "Atomic mutations Max and Min are only supported starting from client version 4.x. You need to update the version of the client, and select API level 400 or more at the start of your process..");
}
}
// ok!
return;
}

// this could be a new mutation type, or an invalid value.
throw new FdbException(FdbError.InvalidMutationType, "An invalid mutation type was issued. If you are attempting to call a new mutation type, you will need to update the version of this assembly, and select the latest API level.");
}
Expand Down Expand Up @@ -755,6 +850,11 @@ private void RestoreDefaultSettings()
{
this.Timeout = m_database.DefaultTimeout;
}

// if we have used a random token for versionstamps, we need to clear it (and generate a new one)
// => this ensure that if the error was due to a collision between the token and another part of the key,
// a transaction retry will hopefully use a different token that does not collide.
m_versionStampToken = 0;
}

/// <summary>Reset the transaction to its initial state.</summary>
Expand Down
84 changes: 84 additions & 0 deletions FoundationDB.Client/FdbTransactionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace FoundationDB.Client
using System.Threading.Tasks;
using Doxense.Diagnostics.Contracts;
using Doxense.Linq;
using Doxense.Memory;
using Doxense.Serialization.Encoders;
using JetBrains.Annotations;

Expand Down Expand Up @@ -425,6 +426,89 @@ public static void AtomicMin([NotNull] this IFdbTransaction trans, Slice key, Sl
trans.Atomic(key, value, FdbMutationType.Min);
}

private static int GetVersionStampOffset(Slice buffer, Slice token, string argName)
{
// the buffer MUST contain one incomplete stamp, either the random token of the current transsaction or the default token (all-FF)

int p = token.HasValue ? buffer.IndexOf(token) : -1;
if (p >= 0)
{ // found a candidate spot, we have to make sure that it is only present once in the key!

if (buffer.IndexOf(token, p + token.Count) >= 0)
{
if (argName == "key")
throw new ArgumentException("The key should only contain one occurrence of a VersionStamp.", argName);
else
throw new ArgumentException("The value should only contain one occurrence of a VersionStamp.", argName);
}
}
else
{ // not found, maybe it is using the default incomplete stamp (all FF) ?
p = buffer.IndexOf(VersionStamp.IncompleteToken);
if (p < 0)
{
if (argName == "key")
throw new ArgumentException("The key should contain at least one VersionStamp.", argName);
else
throw new ArgumentException("The value should contain at least one VersionStamp.", argName);
}
}
Contract.Assert(p >= 0 && p + token.Count <= buffer.Count);

return p;
}

/// <summary>Set the <paramref name="value"/> of the <paramref name="key"/> in the database, with the <see cref="VersionStamp"/> replaced by the resolved version at commit time.</summary>
/// <param name="trans">Transaction to use for the operation</param>
/// <param name="key">Name of the key whose value is to be mutated. This key must contain a single <see cref="VersionStamp"/>, whose position will be automatically detected.</param>
/// <param name="value">New value for this key.</param>
public static void SetVersionStampedKey([NotNull] this IFdbTransaction trans, Slice key, Slice value)
{
Contract.NotNull(trans, nameof(trans));

//TODO: PERF: optimize this to not have to allocate!
var token = trans.CreateVersionStamp().ToSlice();
var offset = GetVersionStampOffset(key, token, nameof(key));

var writer = new SliceWriter(key.Count + 2);
writer.WriteBytes(key);
writer.WriteFixed16(checked((ushort) offset)); //note: currently stored as 16-bits in Little Endian

trans.Atomic(writer.ToSlice(), value, FdbMutationType.VersionStampedKey);
}

/// <summary>Set the <paramref name="value"/> of the <paramref name="key"/> in the database, with the <see cref="VersionStamp"/> replaced by the resolved version at commit time.</summary>
/// <param name="trans">Transaction to use for the operation</param>
/// <param name="key">Name of the key whose value is to be mutated. This key must contain a single <see cref="VersionStamp"/>, whose start is defined by <paramref name="stampOffset"/>.</param>
/// <param name="stampOffset">Offset within <paramref name="key"/> of the start of the 80-bit VersionStamp.</param>
/// <param name="value">New value for this key.</param>
public static void SetVersionStampedKey([NotNull] this IFdbTransaction trans, Slice key, int stampOffset, Slice value)
{
Contract.NotNull(trans, nameof(trans));

if (stampOffset > key.Count - 10) throw new ArgumentException("The VersionStamp overflows past the end of the key.", nameof(stampOffset));
if (stampOffset > 0xFFFF) throw new ArgumentException("The offset is too large to fit within 16-bits.");

var writer = new SliceWriter(key.Count + 2);
writer.WriteBytes(key);
writer.WriteFixed16(checked((ushort) stampOffset)); //note: currently stored as 16-bits in Little Endian

trans.Atomic(writer.ToSlice(), value, FdbMutationType.VersionStampedKey);
}

/// <summary>Set the <paramref name="value"/> of the <paramref name="key"/> in the database, with the first 10 bytes overwritten with the transaction's <see cref="VersionStamp"/>.</summary>
/// <param name="trans">Transaction to use for the operation</param>
/// <param name="key">Name of the key whose value is to be mutated.</param>
/// <param name="value">Value whose first 10 bytes will be overwritten by the database with the resolved VersionStamp at commit time. The rest of the value will be untouched.</param>
public static void SetVersionStampedValue([NotNull] this IFdbTransaction trans, Slice key, Slice value)
{
Contract.NotNull(trans, nameof(trans));

if (value.Count < 10) throw new ArgumentException("The value must be at least 10 bytes long.", nameof(value));

trans.Atomic(key, value, FdbMutationType.VersionStampedValue);
}

#endregion

#region GetRange...
Expand Down
18 changes: 18 additions & 0 deletions FoundationDB.Client/Filters/FdbTransactionFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,24 @@ public virtual long GetCommittedVersion()
return m_transaction.GetCommittedVersion();
}

public virtual Task<VersionStamp> GetVersionStampAsync()
{
ThrowIfDisposed();
return m_transaction.GetVersionStampAsync();
}

public virtual VersionStamp CreateVersionStamp()
{
ThrowIfDisposed();
return m_transaction.CreateVersionStamp();
}

public virtual VersionStamp CreateVersionStamp(int userVersion)
{
ThrowIfDisposed();
return m_transaction.CreateVersionStamp(userVersion);
}

public virtual void SetReadVersion(long version)
{
ThrowIfDisposed();
Expand Down
8 changes: 8 additions & 0 deletions FoundationDB.Client/Filters/Logging/FdbLoggedTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ public override Task OnErrorAsync(FdbError code)
);
}

public override Task<VersionStamp> GetVersionStampAsync()
{
return ExecuteAsync(
new FdbTransactionLog.GetVersionStampCommand(),
(tr, cmd) => tr.GetVersionStampAsync()
);
}

public override void Set(Slice key, Slice value)
{
Execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,12 @@ public override string GetResult(KeyResolver resolver)

}

public sealed class GetVersionStampCommand : Command<VersionStamp>
{
public override Operation Op { get { return Operation.GetVersionStamp; } }

}

public sealed class GetReadVersionCommand : Command<long>
{
public override Operation Op { get { return Operation.GetReadVersion; } }
Expand Down
1 change: 1 addition & 0 deletions FoundationDB.Client/Filters/Logging/FdbTransactionLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ public enum Operation
Reset,
OnError,
SetOption,
GetVersionStamp,

Log,
}
Expand Down
26 changes: 26 additions & 0 deletions FoundationDB.Client/IFdbTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,32 @@ public interface IFdbTransaction : IFdbReadOnlyTransaction
/// </remarks>
long GetCommittedVersion();

/// <summary>Returns the <see cref="VersionStamp"/> which was used by versionstamps operations in this transaction.</summary>
/// <remarks>
/// The Task will be ready only after the successful completion of a call to <see cref="CommitAsync"/> on this transaction.
/// Read-only transactions do not modify the database when committed and will result in the Task completing with an error.
/// Keep in mind that a transaction which reads keys and then sets them to their current values may be optimized to a read-only transaction.
/// </remarks>
Task<VersionStamp> GetVersionStampAsync();

/// <summary>Return a place-holder 80-bit VersionStamp, whose value is not yet known, but will be filled by the database at commit time.</summary>
/// <returns>This value can used to generate temporary keys or value, for use with the <see cref="FdbMutationType.VersionStampedKey"/> or <see cref="FdbMutationType.VersionStampedValue"/> mutations</returns>
/// <remarks>
/// The generate placeholder will use a random value that is unique per transaction (and changes at reach retry).
/// If the key contains the exact 80-bit byte signature of this token, the corresponding location will be tagged and replaced with the actual VersionStamp at commit time.
/// If another part of the key contains (by random chance) the same exact byte sequence, then an error will be triggered, and hopefully the transaction will retry with another byte sequence.
/// </remarks>
VersionStamp CreateVersionStamp();

/// <summary>Return a place-holder 96-bit VersionStamp with an attached user version, whose value is not yet known, but will be filled by the database at commit time.</summary>
/// <returns>This value can used to generate temporary keys or value, for use with the <see cref="FdbMutationType.VersionStampedKey"/> or <see cref="FdbMutationType.VersionStampedValue"/> mutations</returns>
/// <remarks>
/// The generate placeholder will use a random value that is unique per transaction (and changes at reach retry).
/// If the key contains the exact 80-bit byte signature of this token, the corresponding location will be tagged and replaced with the actual VersionStamp at commit time.
/// If another part of the key contains (by random chance) the same exact byte sequence, then an error will be triggered, and hopefully the transaction will retry with another byte sequence.
/// </remarks>
VersionStamp CreateVersionStamp(int userVersion);

/// <summary>
/// Watch a key for any change in the database.
/// </summary>
Expand Down
Loading