From 7da2aa7e328ff787e87bea7dfffbdbcf77feec9d Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 25 Jun 2022 15:23:36 +0200 Subject: [PATCH] Added TimeBasedUuid offset (#5995) --- ....ApprovePersistenceQuery.Core.verified.txt | 10 + ...pprovePersistenceQuery.DotNet.verified.txt | 10 + ...c.ApprovePersistenceQuery.Net.verified.txt | 10 + .../OffsetSpec.cs | 42 +++ .../Utils/TimeUuid.cs | 270 ++++++++++++++++++ src/core/Akka.Persistence.Query/Offset.cs | 54 +++- 6 files changed, 390 insertions(+), 6 deletions(-) create mode 100644 src/core/Akka.Persistence.Query.Tests/OffsetSpec.cs create mode 100644 src/core/Akka.Persistence.Query.Tests/Utils/TimeUuid.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Core.verified.txt index 3b5866285b5..3dcab8d069f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Core.verified.txt @@ -67,6 +67,7 @@ namespace Akka.Persistence.Query public abstract int CompareTo(Akka.Persistence.Query.Offset other); public static Akka.Persistence.Query.Offset NoOffset() { } public static Akka.Persistence.Query.Offset Sequence(long value) { } + public static Akka.Persistence.Query.Offset TimeBasedUuid(System.Guid value) { } } public sealed class PersistenceQuery : Akka.Actor.IExtension { @@ -96,4 +97,13 @@ namespace Akka.Persistence.Query public override bool Equals(object obj) { } public override int GetHashCode() { } } + public sealed class TimeBasedUuid : Akka.Persistence.Query.Offset, System.IComparable + { + public TimeBasedUuid(System.Guid value) { } + public System.Guid Value { get; } + public int CompareTo(Akka.Persistence.Query.TimeBasedUuid other) { } + public override int CompareTo(Akka.Persistence.Query.Offset other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + } } \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt index a92ce6624e4..6216b35fdca 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt @@ -67,6 +67,7 @@ namespace Akka.Persistence.Query public abstract int CompareTo(Akka.Persistence.Query.Offset other); public static Akka.Persistence.Query.Offset NoOffset() { } public static Akka.Persistence.Query.Offset Sequence(long value) { } + public static Akka.Persistence.Query.Offset TimeBasedUuid(System.Guid value) { } } public sealed class PersistenceQuery : Akka.Actor.IExtension { @@ -96,4 +97,13 @@ namespace Akka.Persistence.Query public override bool Equals(object obj) { } public override int GetHashCode() { } } + public sealed class TimeBasedUuid : Akka.Persistence.Query.Offset, System.IComparable + { + public TimeBasedUuid(System.Guid value) { } + public System.Guid Value { get; } + public int CompareTo(Akka.Persistence.Query.TimeBasedUuid other) { } + public override int CompareTo(Akka.Persistence.Query.Offset other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + } } \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt index 3b5866285b5..3dcab8d069f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt @@ -67,6 +67,7 @@ namespace Akka.Persistence.Query public abstract int CompareTo(Akka.Persistence.Query.Offset other); public static Akka.Persistence.Query.Offset NoOffset() { } public static Akka.Persistence.Query.Offset Sequence(long value) { } + public static Akka.Persistence.Query.Offset TimeBasedUuid(System.Guid value) { } } public sealed class PersistenceQuery : Akka.Actor.IExtension { @@ -96,4 +97,13 @@ namespace Akka.Persistence.Query public override bool Equals(object obj) { } public override int GetHashCode() { } } + public sealed class TimeBasedUuid : Akka.Persistence.Query.Offset, System.IComparable + { + public TimeBasedUuid(System.Guid value) { } + public System.Guid Value { get; } + public int CompareTo(Akka.Persistence.Query.TimeBasedUuid other) { } + public override int CompareTo(Akka.Persistence.Query.Offset other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + } } \ No newline at end of file diff --git a/src/core/Akka.Persistence.Query.Tests/OffsetSpec.cs b/src/core/Akka.Persistence.Query.Tests/OffsetSpec.cs new file mode 100644 index 00000000000..6557c294844 --- /dev/null +++ b/src/core/Akka.Persistence.Query.Tests/OffsetSpec.cs @@ -0,0 +1,42 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; +using Xunit; + +namespace Akka.Persistence.Query.Tests +{ + public class OffsetSpec + { + [Fact] + public void TimeBasedUuid_offset_must_be_ordered_correctly() + { + var uuid1 = new TimeBasedUuid(TimeUuid.Parse("49225740-2019-11ea-a6f0-a0a60c2ef4ff")); //2019-12-16T15:32:36.148Z[UTC] + var uuid2 = new TimeBasedUuid(TimeUuid.Parse("91be23d0-2019-11ea-a752-ffae2393b6e4")); //2019-12-16T15:34:37.965Z[UTC] + var uuid3 = new TimeBasedUuid(TimeUuid.Parse("91f95810-2019-11ea-a752-ffae2393b6e4")); //2019-12-16T15:34:38.353Z[UTC] + + ((TimeUuid)uuid1.Value).GetDate().Should().BeBefore(((TimeUuid)uuid2.Value).GetDate()); + ((TimeUuid)uuid2.Value).GetDate().Should().BeBefore(((TimeUuid)uuid3.Value).GetDate()); + + new List() { uuid2, uuid1, uuid3 }.OrderBy(_ => Guid.NewGuid()) + .Should().BeEquivalentTo(new List() { uuid1, uuid2, uuid3 }); + new List() { uuid3, uuid2, uuid1 }.OrderBy(_ => Guid.NewGuid()) + .Should().BeEquivalentTo(new List() { uuid1, uuid2, uuid3 }); + } + + [Fact] + public void Sequence_offset_must_be_ordered_correctly() + { + var sequenceBasedList = new List { 1L, 2L, 3L }.Select(l => new Sequence(l)); + var shuffledSequenceBasedList = sequenceBasedList.OrderBy(_ => Guid.NewGuid()); + shuffledSequenceBasedList.Should().BeEquivalentTo(sequenceBasedList); + } + } +} diff --git a/src/core/Akka.Persistence.Query.Tests/Utils/TimeUuid.cs b/src/core/Akka.Persistence.Query.Tests/Utils/TimeUuid.cs new file mode 100644 index 00000000000..59ecfeca75f --- /dev/null +++ b/src/core/Akka.Persistence.Query.Tests/Utils/TimeUuid.cs @@ -0,0 +1,270 @@ +// +// Copyright (C) DataStax Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; + +namespace Akka.Persistence.Query +{ + /// + /// Represents a v1 uuid + /// + internal struct TimeUuid : IEquatable, IComparable + { + private static readonly DateTimeOffset GregorianCalendarTime = new DateTimeOffset(1582, 10, 15, 0, 0, 0, TimeSpan.Zero); + //Reuse the random generator to avoid collisions + private static readonly Random RandomGenerator = new Random(); + private static readonly object RandomLock = new object(); + private static readonly byte[] MinNodeId = { 0x80, 0x80, 0x80, 0x80, 0x80, 0x80 }; + private static readonly byte[] MinClockId = { 0x80, 0x80 }; + private static readonly byte[] MaxNodeId = { 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f }; + private static readonly byte[] MaxClockId = { 0x7f, 0x7f }; + + private readonly Guid _value; + + private TimeUuid(Guid value) + { + _value = value; + } + + /// + /// Creates a new instance of . + /// + /// 6-byte node identifier + /// 2-byte clock identifier + /// The timestamp + /// + private TimeUuid(byte[] nodeId, byte[] clockId, DateTimeOffset time) + { + if (nodeId == null || nodeId.Length != 6) + { + throw new ArgumentException("node Id should contain 6 bytes", nameof(nodeId)); + } + if (clockId == null || clockId.Length != 2) + { + throw new ArgumentException("clock Id should contain 2 bytes", nameof(clockId)); + } + var timeBytes = BitConverter.GetBytes((time - GregorianCalendarTime).Ticks); + if (!BitConverter.IsLittleEndian) + { + Array.Reverse(timeBytes); + } + var buffer = new byte[16]; + //Positions 0-7 Timestamp + Buffer.BlockCopy(timeBytes, 0, buffer, 0, 8); + //Position 8-9 Clock + Buffer.BlockCopy(clockId, 0, buffer, 8, 2); + //Positions 10-15 Node + Buffer.BlockCopy(nodeId, 0, buffer, 10, 6); + + //Version Byte: Time based + //0001xxxx + //turn off first 4 bits + buffer[7] &= 0x0f; //00001111 + //turn on fifth bit + buffer[7] |= 0x10; //00010000 + + //Variant Byte: 1.0.x + //10xxxxxx + //turn off first 2 bits + buffer[8] &= 0x3f; //00111111 + //turn on first bit + buffer[8] |= 0x80; //10000000 + + _value = new Guid(buffer); + } + + /// + /// Returns a value indicating whether this instance and a specified TimeUuid object represent the same value. + /// + public bool Equals(TimeUuid other) + { + return _value.Equals(other._value); + } + + /// + /// Returns a value indicating whether this instance and a specified TimeUuid object represent the same value. + /// + public override bool Equals(object obj) + { + var otherTimeUuid = obj as TimeUuid?; + return otherTimeUuid != null && Equals(otherTimeUuid.Value); + } + + /// + /// Gets the DateTimeOffset representation of this instance + /// + public DateTimeOffset GetDate() + { + var bytes = _value.ToByteArray(); + //Remove version bit + bytes[7] &= 0x0f; //00001111 + //Remove variant + bytes[8] &= 0x3f; //00111111 + if (!BitConverter.IsLittleEndian) + { + Array.Reverse(bytes); + } + var timestamp = BitConverter.ToInt64(bytes, 0); + var ticks = timestamp + GregorianCalendarTime.Ticks; + + return new DateTimeOffset(ticks, TimeSpan.Zero); + } + + /// + /// Returns the hash code for this instance. + /// + public override int GetHashCode() + { + return _value.GetHashCode(); + } + + /// + /// Returns a 16-element byte array that contains the value of this instance. + /// + public byte[] ToByteArray() + { + return _value.ToByteArray(); + } + + /// + /// Gets the Guid representation of the Id + /// + public Guid ToGuid() + { + return _value; + } + + /// + /// Compares the current TimeUuid with another TimeUuid based on the time representation of this instance. + /// + public int CompareTo(TimeUuid other) + { + return GetDate().CompareTo(other.GetDate()); + } + + /// + /// Returns a string representation of the value of this instance in registry format. + /// + /// + public override string ToString() + { + return _value.ToString(); + } + + /// + /// Returns a string representation + /// + public string ToString(string format, IFormatProvider provider) + { + return _value.ToString(format, provider); + } + + /// + /// Returns a string representation + /// + public string ToString(string format) + { + return _value.ToString(format); + } + + /// + /// Returns the smaller possible type 1 uuid with the provided date. + /// + public static TimeUuid Min(DateTimeOffset date) + { + return new TimeUuid(MinNodeId, MinClockId, date); + } + + /// + /// Returns the biggest possible type 1 uuid with the provided Date. + /// + public static TimeUuid Max(DateTimeOffset date) + { + return new TimeUuid(MaxNodeId, MaxClockId, date); + } + + /// + /// Initializes a new instance of the TimeUuid structure, using a random node id and clock sequence and the current date time + /// + public static TimeUuid NewId() + { + return NewId(DateTimeOffset.Now); + } + + /// + /// Initializes a new instance of the TimeUuid structure, using a random node id and clock sequence + /// + public static TimeUuid NewId(DateTimeOffset date) + { + byte[] nodeId; + byte[] clockId; + lock (RandomLock) + { + //oh yeah, thread safety + nodeId = new byte[6]; + clockId = new byte[2]; + RandomGenerator.NextBytes(nodeId); + RandomGenerator.NextBytes(clockId); + } + return new TimeUuid(nodeId, clockId, date); + } + + /// + /// Initializes a new instance of the TimeUuid structure + /// + public static TimeUuid NewId(byte[] nodeId, byte[] clockId, DateTimeOffset date) + { + return new TimeUuid(nodeId, clockId, date); + } + + /// + /// Converts the string representation of a time-based uuid (v1) to the equivalent + /// structure. + /// + /// + /// + public static TimeUuid Parse(string input) + { + return new TimeUuid(Guid.Parse(input)); + } + + /// + /// From TimeUuid to Guid + /// + public static implicit operator Guid(TimeUuid value) + { + return value.ToGuid(); + } + + /// + /// From Guid to TimeUuid + /// + public static implicit operator TimeUuid(Guid value) + { + return new TimeUuid(value); + } + + public static bool operator ==(TimeUuid id1, TimeUuid id2) + { + return id1.ToGuid() == id2.ToGuid(); + } + + public static bool operator !=(TimeUuid id1, TimeUuid id2) + { + return id1.ToGuid() != id2.ToGuid(); + } + } +} diff --git a/src/core/Akka.Persistence.Query/Offset.cs b/src/core/Akka.Persistence.Query/Offset.cs index 6e5f303485b..ac3f86c6156 100644 --- a/src/core/Akka.Persistence.Query/Offset.cs +++ b/src/core/Akka.Persistence.Query/Offset.cs @@ -23,15 +23,15 @@ public abstract class Offset : IComparable public static Offset NoOffset() => Query.NoOffset.Instance; /// - /// Corresponds to an ordered sequence number for the events.Note that the corresponding - /// offset of each event is provided in the , - /// which makes it possible to resume the stream at a later point from a given offset. - /// The `offset` is exclusive, i.e.the event with the exact same sequence number will not be included - /// in the returned stream. This means that you can use the offset that is returned in - /// as the `offset` parameter in a subsequent query. + /// Factory to create an offset of type /// public static Offset Sequence(long value) => new Sequence(value); + /// + /// Factory to create an offset of type + /// + public static Offset TimeBasedUuid(Guid value) => new TimeBasedUuid(value); + /// /// Used to compare to other implementations. /// @@ -43,9 +43,11 @@ public abstract class Offset : IComparable /// Corresponds to an ordered sequence number for the events.Note that the corresponding /// offset of each event is provided in the , /// which makes it possible to resume the stream at a later point from a given offset. + /// /// The `offset` is exclusive, i.e.the event with the exact same sequence number will not be included /// in the returned stream. This means that you can use the offset that is returned in /// as the `offset` parameter in a subsequent query. + /// /// public sealed class Sequence : Offset, IComparable { @@ -83,6 +85,46 @@ public override int CompareTo(Offset other) } } + /// + /// Corresponds to an ordered unique identifier of the events. Note that the corresponding + /// offset of each event is provided in the , which makes it + /// possible to resume the stream at a later point from a given offset. + /// + /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + /// in the returned stream. This means that you can use the offset that is returned in `EventEnvelope` + /// as the `offset` parameter in a subsequent query. + /// + /// + public sealed class TimeBasedUuid : Offset, IComparable + { + public Guid Value { get; } + + /// + /// Initializes a new instance of the class. + /// + public TimeBasedUuid(Guid value) => Value = value; + + public int CompareTo(TimeBasedUuid other) => Value.CompareTo(other.Value); + + private bool Equals(TimeBasedUuid other) => Value == other.Value; + + public override bool Equals(object obj) + { + if (obj is null) return false; + if (ReferenceEquals(this, obj)) return true; + return obj is TimeBasedUuid uUID && Equals(uUID); + } + + public override int GetHashCode() => Value.GetHashCode(); + + public override int CompareTo(Offset other) + { + return other is TimeBasedUuid seq + ? CompareTo(seq) + : throw new InvalidOperationException($"Can't compare offset of type {GetType()} to offset of type {other.GetType()}"); + } + } + /// /// Used when retrieving all events. ///