From 884e6b4907b0f03d4e2c7b37e90fee4f4fb90f7a Mon Sep 17 00:00:00 2001 From: Ryan Karg Date: Mon, 25 Mar 2024 11:35:47 -0700 Subject: [PATCH] Cassandra Clustering implementation --- .github/workflows/ci.yml | 27 + Directory.Packages.props | 2 + Orleans.sln | 17 + .../CassandraClusteringTable.cs | 153 ++++ .../CassandraGatewayListProvider.cs | 64 ++ .../Hosting/CassandraClusteringOptions.cs | 7 + .../CassandraMembershipHostingExtensions.cs | 62 ++ .../Orleans.Clustering.Cassandra.csproj | 21 + .../OrleansQueries.cs | 335 +++++++++ .../Tester.Cassandra/Cassandra.dockerfile | 20 + .../Tester.Cassandra/Clustering/Cassandra.cs | 695 ++++++++++++++++++ .../Clustering/CassandraContainer.cs | 52 ++ .../Clustering/SiloAddressUtils.cs | 14 + .../Tester.Cassandra/Tester.Cassandra.csproj | 19 + .../Utility/TestExtensions.cs | 47 ++ 15 files changed, 1535 insertions(+) create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj create mode 100644 src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs create mode 100644 test/Extensions/Tester.Cassandra/Cassandra.dockerfile create mode 100644 test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs create mode 100644 test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs create mode 100644 test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs create mode 100644 test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj create mode 100644 test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82827c2bf8..28d3af2bc1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,6 +70,33 @@ jobs: path: | **/TestResults/* **/logs/* + test-cassandra: + name: Cassandra provider tests + runs-on: ubuntu-latest + strategy: + matrix: + provider: ["Cassandra"] + dbversion: ["3.11", "4.0", "4.1", "5.0"] + steps: + - uses: actions/checkout@v2 + - name: Setup .NET + uses: actions/setup-dotnet@v3 + with: + dotnet-version: | + 8.0.x + - name: Test + run: dotnet test --filter "Category=${{ matrix.provider }}&(Category=BVT|Category=SlowBVT|Category=Functional)" --blame-hang-timeout 10m --logger "trx" -- -parallel none -noshadow + env: + CASSANDRAVERSION: ${{ matrix.dbversion }} + - name: Archive Test Results + if: always() + uses: actions/upload-artifact@v3 + with: + name: test_output + retention-days: 1 + path: | + **/TestResults/* + **/logs/* test-postgres: name: PostgreSQL provider tests runs-on: ubuntu-latest diff --git a/Directory.Packages.props b/Directory.Packages.props index 334880af6d..d8ca234e88 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -61,6 +61,7 @@ + @@ -95,6 +96,7 @@ + diff --git a/Orleans.sln b/Orleans.sln index 2b92cc4246..5def748ca4 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -219,6 +219,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extens EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Cassandra", "Cassandra", "{42C80201-3AC6-4C10-9809-3315A9FDD7A1}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Clustering.Cassandra", "src\Cassandra\Orleans.Clustering.Cassandra\Orleans.Clustering.Cassandra.csproj", "{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Cassandra", "test\Extensions\Tester.Cassandra\Tester.Cassandra.csproj", "{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -581,6 +587,14 @@ Global {A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU {A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU {A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU + {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.Build.0 = Release|Any CPU + {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -688,6 +702,9 @@ Global {8FC6457C-6273-4338-AD2A-ECAA8FE2C5D7} = {082D25DB-70CA-48F4-93E0-EC3455F494B8} {F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8} {A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23} + {42C80201-3AC6-4C10-9809-3315A9FDD7A1} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A} + {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171} = {42C80201-3AC6-4C10-9809-3315A9FDD7A1} + {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE} = {082D25DB-70CA-48F4-93E0-EC3455F494B8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952} diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs new file mode 100644 index 0000000000..dfc7eb7b9c --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs @@ -0,0 +1,153 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Cassandra; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Runtime; + +namespace Orleans.Clustering.Cassandra; + +public class CassandraClusteringTable : IMembershipTable +{ + private readonly ClusterOptions _options; + private readonly ISession _session; + private OrleansQueries? _queries; + private readonly string _identifier; + + + public CassandraClusteringTable(IOptions options, ISession session) + { + _options = options.Value; + _identifier = $"{_options.ServiceId}-{_options.ClusterId}"; + _session = session; + } + + async Task IMembershipTable.InitializeMembershipTable(bool tryInitTableVersion) + { + _queries = await OrleansQueries.CreateInstance(_session); + + await _session.ExecuteAsync(_queries.EnsureTableExists()); + await _session.ExecuteAsync(_queries.EnsureIndexExists()); + + if (!tryInitTableVersion) + return; + + await _session.ExecuteAsync(await _queries.InsertMembershipVersion(_identifier)); + } + + async Task IMembershipTable.DeleteMembershipTableEntries(string clusterId) + { + if (string.Compare(clusterId, _options.ClusterId, + StringComparison.InvariantCultureIgnoreCase) != 0) + { + throw new ArgumentException( + $"cluster id {clusterId} does not match CassandraClusteringTable value of {_options.ClusterId}", + nameof(clusterId)); + } + await _session.ExecuteAsync(await _queries!.DeleteMembershipTableEntries(_identifier)); + } + + async Task IMembershipTable.InsertRow(MembershipEntry entry, TableVersion tableVersion) + { + var query = await _session.ExecuteAsync(await _queries!.InsertMembership(_identifier, entry, tableVersion.Version - 1)); + return (bool)query.First()["[applied]"]; + } + + async Task IMembershipTable.UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) + { + var query = await _session.ExecuteAsync(await _queries!.UpdateMembership(_identifier, entry, tableVersion.Version - 1)); + return (bool)query.First()["[applied]"]; + } + + private static MembershipEntry? GetMembershipEntry(Row row, SiloAddress? forAddress = null) + { + if (row["start_time"] == null) + return null; + + var result = new MembershipEntry + { + SiloAddress = forAddress ?? SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["port"]), (int)row["generation"]), + SiloName = (string)row["silo_name"], + HostName = (string)row["host_name"], + Status = (SiloStatus)(int)row["status"], + ProxyPort = (int)row["proxy_port"], + StartTime = ((DateTimeOffset)row["start_time"]).UtcDateTime, + IAmAliveTime = ((DateTimeOffset)row["i_am_alive_time"]).UtcDateTime + }; + + var suspectingSilos = (string)row["suspect_times"]; + if (!string.IsNullOrWhiteSpace(suspectingSilos)) + { + result.SuspectTimes = new List>(); + result.SuspectTimes.AddRange(suspectingSilos.Split('|').Select(s => + { + var split = s.Split(','); + return new Tuple(SiloAddress.FromParsableString(split[0]), LogFormatter.ParseDate(split[1])); + })); + } + + return result; + } + + private async Task GetMembershipTableData(RowSet rows, SiloAddress? forAddress = null) + { + int version; + + var firstRow = rows.FirstOrDefault(); + if (firstRow != null) + { + version = (int)firstRow["version"]; + + var entries = new List>(); + foreach (var row in new[] { firstRow }.Concat(rows)) + { + var entry = GetMembershipEntry(row, forAddress); + if (entry != null) + entries.Add(new Tuple(entry, string.Empty)); + } + + return new MembershipTableData(entries, new TableVersion(version, version.ToString())); + } + else + { + var result = (await _session.ExecuteAsync(await _queries!.MembershipReadVersion(_identifier))).FirstOrDefault(); + if (result is null) + { + return new MembershipTableData(new List>(), new TableVersion(0, "0")); + } + version = (int)result["version"]; + return new MembershipTableData(new List>(), new TableVersion(version, version.ToString())); + } + } + + async Task IMembershipTable.ReadAll() + { + return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier))); + } + + async Task IMembershipTable.ReadRow(SiloAddress key) + { + return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadRow(_identifier, key)), key); + } + + async Task IMembershipTable.UpdateIAmAlive(MembershipEntry entry) + { + await _session.ExecuteAsync(await _queries!.UpdateIAmAliveTime(_identifier, entry)); + } + + public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate) + { + var allEntries = + (await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier))) + .Select(r => GetMembershipEntry(r)) + .Where(e => e is not null) + .Cast(); + + foreach (var e in allEntries) + if (e is not { Status: SiloStatus.Active } && new DateTime(Math.Max(e.IAmAliveTime.Ticks, e.StartTime.Ticks)) < beforeDate) + await _session.ExecuteAsync(await _queries.DeleteMembershipEntry(_identifier, e)); + } +} \ No newline at end of file diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs new file mode 100644 index 0000000000..20ba2c6545 --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Cassandra; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Messaging; +using Orleans.Runtime; + +namespace Orleans.Clustering.Cassandra; + +public class CassandraGatewayListProvider : IGatewayListProvider +{ + private readonly ClusterOptions _options; + private readonly TimeSpan _maxStaleness; + private readonly ISession _session; + private OrleansQueries? _queries; + private DateTime _cacheUntil; + private List? _cachedResult; + private readonly string _identifier; + + + TimeSpan IGatewayListProvider.MaxStaleness => _maxStaleness; + + bool IGatewayListProvider.IsUpdatable => true; + + + public CassandraGatewayListProvider(IOptions options, IOptions gatewayOptions, ISession session) + { + _options = options.Value; + _identifier = $"{_options.ServiceId}-{_options.ClusterId}"; + + _maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod; + _session = session; + } + + async Task IGatewayListProvider.InitializeGatewayListProvider() + { + _queries = await OrleansQueries.CreateInstance(_session); + + await _session.ExecuteAsync(_queries.EnsureTableExists()); + await _session.ExecuteAsync(_queries.EnsureIndexExists()); + } + + async Task> IGatewayListProvider.GetGateways() + { + if (_cachedResult is not null && _cacheUntil > DateTime.UtcNow) + { + return _cachedResult.ToList(); + } + + var rows = await _session.ExecuteAsync(await _queries!.GatewaysQuery(_identifier, (int)SiloStatus.Active)); + var result = new List(); + + foreach (var row in rows) + result.Add(SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["proxy_port"]), (int)row["generation"]).ToGatewayUri()); + + _cacheUntil = DateTime.UtcNow + _maxStaleness; + _cachedResult = result; + return result.ToList(); + } +} \ No newline at end of file diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs new file mode 100644 index 0000000000..5356f89db4 --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs @@ -0,0 +1,7 @@ +namespace Orleans.Clustering.Cassandra.Hosting; + +public class CassandraClusteringOptions +{ + public required string ConnectionString { get; set; } + public string Keyspace { get; set; } = "orleans"; +} \ No newline at end of file diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs new file mode 100644 index 0000000000..c195d718cf --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs @@ -0,0 +1,62 @@ +using System; +using Cassandra; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Hosting; + +namespace Orleans.Clustering.Cassandra.Hosting; + +public static class CassandraMembershipHostingExtensions +{ + /// + /// Configures Orleans clustering using Cassandra + /// + /// + /// + /// Pulls of type and from the DI container + public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder) => + builder.ConfigureServices(services => + { + services.AddOptions().ValidateOnStart(); + services.AddSingleton(); + }); + + /// + /// Configures Orleans clustering using Cassandra + /// + /// + /// Configuration for the + /// Resolving method for + /// A newly created created with the provided and the resolved + public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, Func sessionProvider) => + builder.ConfigureServices(services => + { + services.AddSingleton(provider => + { + var session = sessionProvider(provider); + return new CassandraClusteringTable(Options.Create(clusterOptions), session); + }); + }); + + + /// + /// Configures Orleans clustering using Cassandra + /// + /// + /// Configuration for the + /// Configuration used to create a new + /// + public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, CassandraClusteringOptions cassandraOptions) => + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + { + var c = Cluster.Builder().WithConnectionString(cassandraOptions.ConnectionString) + .Build(); + + var session = c.Connect(cassandraOptions.Keyspace); + return new CassandraClusteringTable(Options.Create(clusterOptions), session); + }); + }); +} \ No newline at end of file diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj b/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj new file mode 100644 index 0000000000..14f851c5cf --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj @@ -0,0 +1,21 @@ + + + Microsoft.Orleans.Clustering.Cassandra + Microsoft Orleans Cassandra Clustering Provider + Microsoft Orleans clustering provider backed by Cassandra + $(PackageTags) Cassandra + $(DefaultTargetFrameworks) + Orleans.Clustering.Cassandra + Orleans.Clustering.Cassandra + true + $(DefineConstants);ORLEANS_CLUSTERING + enable + + + + + + + + + diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs b/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs new file mode 100644 index 0000000000..a9e09bc4f3 --- /dev/null +++ b/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs @@ -0,0 +1,335 @@ +using Cassandra; +using Orleans.Runtime; +using System.Linq; +using System.Threading.Tasks; + +namespace Orleans.Clustering.Cassandra; + +/// +/// This class is responsible for keeping a list of prepared queries and +/// knowing their parameters (including type and conversion to the target +/// type). +/// +public class OrleansQueries +{ + public ISession Session { get; } + + private PreparedStatement? _insertMembershipVersionPreparedStatement; + private PreparedStatement? _deleteMembershipTablePreparedStatement; + private PreparedStatement? _insertMembershipPreparedStatement; + private PreparedStatement? _membershipReadAllPreparedStatement; + private PreparedStatement? _membershipReadVersionPreparedStatement; + private PreparedStatement? _updateIAmAlivePreparedStatement; + private PreparedStatement? _deleteMembershipEntryPreparedStatement; + private PreparedStatement? _updateMembershipPreparedStatement; + + public static Task CreateInstance(ISession session) + { + string? dc = null; + var isMultiDataCenter = false; + foreach (var dataCenter in session.Cluster.AllHosts().Where(h => h?.Datacenter is not null).Select(h => h.Datacenter)) + { + dc ??= dataCenter; + if (dc != dataCenter) + { + isMultiDataCenter = true; + break; + } + } + + return Task.FromResult(new OrleansQueries(session, isMultiDataCenter)); + } + + private OrleansQueries(ISession session, bool isMultiDataCenter) + { + if (isMultiDataCenter) + { + MembershipReadConsistencyLevel = ConsistencyLevel.LocalQuorum; + MembershipWriteConsistencyLevel = ConsistencyLevel.EachQuorum; + } + else + { + MembershipReadConsistencyLevel = ConsistencyLevel.Quorum; + MembershipWriteConsistencyLevel = ConsistencyLevel.Quorum; + } + Session = session; + } + + public ConsistencyLevel MembershipWriteConsistencyLevel { get; set; } + + public ConsistencyLevel MembershipReadConsistencyLevel { get; set; } + + public IStatement EnsureTableExists() + { + return new SimpleStatement(""" + CREATE TABLE IF NOT EXISTS membership + ( + partition_key ascii, + version int static, + address ascii, + port int, + generation int, + silo_name text, + host_name text, + status int, + proxy_port int, + suspect_times ascii, + start_time timestamp, + i_am_alive_time timestamp, + + PRIMARY KEY(partition_key, address, port, generation) + ) WITH compression = { + 'class' : 'LZ4Compressor', + 'enabled' : true + }; + """); + } + public IStatement EnsureIndexExists() + { + return new SimpleStatement(""" + CREATE INDEX IF NOT EXISTS ix_membership_status ON membership(status); + """); + } + + public async ValueTask InsertMembership(string clusterIdentifier, MembershipEntry membershipEntry, int version) + { + _insertMembershipPreparedStatement ??= await PrepareStatementAsync(""" + UPDATE membership + SET + version = :new_version, + status = :status, + start_time = :start_time, + silo_name = :silo_name, + host_name = :host_name, + proxy_port = :proxy_port, + i_am_alive_time = :i_am_alive_time + WHERE + partition_key = :partition_key + AND address = :address + AND port = :port + AND generation = :generation + IF + version = :expected_version; + """, MembershipWriteConsistencyLevel); + return _insertMembershipPreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + address = membershipEntry.SiloAddress.Endpoint.Address.ToString(), + port = membershipEntry.SiloAddress.Endpoint.Port, + generation = membershipEntry.SiloAddress.Generation, + silo_name = membershipEntry.SiloName, + host_name = membershipEntry.HostName, + status = (int)membershipEntry.Status, + proxy_port = membershipEntry.ProxyPort, + start_time = membershipEntry.StartTime, + i_am_alive_time = membershipEntry.IAmAliveTime, + new_version = version + 1, + expected_version = version + }); + } + public async ValueTask InsertMembershipVersion(string clusterIdentifier) + { + _insertMembershipVersionPreparedStatement ??= await PrepareStatementAsync(""" + INSERT INTO membership( + partition_key, + version + ) + VALUES ( + :partition_key, + 0 + ) + IF NOT EXISTS; + """, MembershipWriteConsistencyLevel); + return _insertMembershipVersionPreparedStatement.Bind(clusterIdentifier); + } + + + public async ValueTask DeleteMembershipTableEntries(string clusterIdentifier) + { + _deleteMembershipTablePreparedStatement ??= await PrepareStatementAsync(""" + DELETE FROM membership WHERE partition_key = :partition_key; + """, + MembershipWriteConsistencyLevel); + return _deleteMembershipTablePreparedStatement.Bind(clusterIdentifier); + } + + public async ValueTask UpdateIAmAliveTime(string clusterIdentifier, MembershipEntry membershipEntry) + { + _updateIAmAlivePreparedStatement ??= await PrepareStatementAsync(""" + UPDATE membership + SET + i_am_alive_time = :i_am_alive_time + WHERE + partition_key = :partition_key + AND address = :address + AND port = :port + AND generation = :generation; + """, + ConsistencyLevel.Any); + + return _updateIAmAlivePreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + i_am_alive_time = membershipEntry.IAmAliveTime, + address = membershipEntry.SiloAddress.Endpoint.Address.ToString(), + port = membershipEntry.SiloAddress.Endpoint.Port, + generation = membershipEntry.SiloAddress.Generation + }); + } + + public async ValueTask DeleteMembershipEntry(string clusterIdentifier, MembershipEntry membershipEntry) + { + _deleteMembershipEntryPreparedStatement ??= await PrepareStatementAsync(""" + DELETE FROM + membership + WHERE + partition_key = :partition_key + AND address = :address + AND port = :port + AND generation = :generation; + """, MembershipWriteConsistencyLevel); + return _deleteMembershipEntryPreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + address = membershipEntry.SiloAddress.Endpoint.Address.ToString(), + port = membershipEntry.SiloAddress.Endpoint.Port, + generation = membershipEntry.SiloAddress.Generation + }); + } + + public async ValueTask UpdateMembership(string clusterIdentifier, MembershipEntry membershipEntry, int version) + { + _updateMembershipPreparedStatement ??= await PrepareStatementAsync(""" + UPDATE membership + SET + version = :new_version, + status = :status, + suspect_times = :suspect_times, + i_am_alive_time = :i_am_alive_time + WHERE + partition_key = :partition_key + AND address = :address + AND port = :port + AND generation = :generation + IF + version = :expected_version; + """, MembershipWriteConsistencyLevel); + return _updateMembershipPreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + new_version = version + 1, + expected_version = version, + status = (int)membershipEntry.Status, + suspect_times = + membershipEntry.SuspectTimes == null + ? null + : string.Join("|", + membershipEntry.SuspectTimes.Select(s => + $"{s.Item1.ToParsableString()},{LogFormatter.PrintDate(s.Item2)}")), + i_am_alive_time = membershipEntry.IAmAliveTime, + address = membershipEntry.SiloAddress.Endpoint.Address.ToString(), + port = membershipEntry.SiloAddress.Endpoint.Port, + generation = membershipEntry.SiloAddress.Generation + }); + } + + public async ValueTask MembershipReadVersion(string clusterIdentifier) + { + _membershipReadVersionPreparedStatement ??= await PrepareStatementAsync(""" + SELECT + version + FROM + membership + WHERE + partition_key = :partition_key; + """, + MembershipReadConsistencyLevel); + return _membershipReadVersionPreparedStatement.Bind(clusterIdentifier); + } + + public async ValueTask MembershipReadAll(string clusterIdentifier) + { + _membershipReadAllPreparedStatement ??= await PrepareStatementAsync(""" + SELECT + version, + address, + port, + generation, + silo_name, + host_name, + status, + proxy_port, + suspect_times, + start_time, + i_am_alive_time + FROM + membership + WHERE + partition_key = :partition_key; + """, + MembershipReadConsistencyLevel); + return _membershipReadAllPreparedStatement.Bind(clusterIdentifier); + } + + public async ValueTask MembershipReadRow(string clusterIdentifier, SiloAddress siloAddress) + { + _membershipReadAllPreparedStatement ??= await PrepareStatementAsync(""" + SELECT + version, + silo_name, + host_name, + status, + proxy_port, + suspect_times, + start_time, + i_am_alive_time + FROM + membership + WHERE + partition_key = :partition_key + AND address = :address + AND port = :port + AND generation = :generation; + """, + MembershipReadConsistencyLevel); + return _membershipReadAllPreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + address = siloAddress.Endpoint.Address.ToString(), + port = siloAddress.Endpoint.Port, + generation = siloAddress.Generation + }); + } + + public async ValueTask GatewaysQuery(string clusterIdentifier, int status) + { + // Filtering is only for the `proxy_port` filtering. We're already hitting the partition + // and secondary index on status which both don't need "ALLOW FILTERING" + _membershipReadAllPreparedStatement ??= await PrepareStatementAsync(""" + SELECT + address, + proxy_port, + generation + FROM + membership + WHERE + partition_key = :partition_key + AND status = :status + AND proxy_port > 0 + ALLOW FILTERING; + """, + MembershipReadConsistencyLevel); + return _membershipReadAllPreparedStatement.Bind(new + { + partition_key = clusterIdentifier, + status = status + }); + } + + private async ValueTask PrepareStatementAsync(string cql, ConsistencyLevel consistencyLevel) + { + var statement = await Session.PrepareAsync(cql); + statement.SetConsistencyLevel(consistencyLevel); + return statement; + } +} diff --git a/test/Extensions/Tester.Cassandra/Cassandra.dockerfile b/test/Extensions/Tester.Cassandra/Cassandra.dockerfile new file mode 100644 index 0000000000..87037174c8 --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Cassandra.dockerfile @@ -0,0 +1,20 @@ +ARG CASSANDRAVERSION=4.1 +FROM cassandra:${CASSANDRAVERSION} + +RUN sed -i 's/auto_snapshot: true/auto_snapshot: false/g' /etc/cassandra/cassandra.yaml + +# Disable virtual nodes +RUN sed -i -e "s/num_tokens/\#num_tokens/" /etc/cassandra/cassandra.yaml + +# With virtual nodes disabled, we have to configure initial_token +RUN sed -i -e "s/\# initial_token:/initial_token: 0/" /etc/cassandra/cassandra.yaml +RUN echo "JVM_OPTS=\"\$JVM_OPTS -Dcassandra.initial_token=0\"" >> /etc/cassandra/cassandra-env.sh + +# set 0.0.0.0 Listens on all configured interfaces +RUN sed -i -e "s/^rpc_address.*/rpc_address: 0.0.0.0/" /etc/cassandra/cassandra.yaml + +# Be your own seed +RUN sed -i -e "s/- seeds: \"127.0.0.1\"/- seeds: \"$SEEDS\"/" /etc/cassandra/cassandra.yaml + +# Disable gossip, no need in one node cluster +RUN echo "JVM_OPTS=\"\$JVM_OPTS -Dcassandra.skip_wait_for_gossip_to_settle=0\"" >> /etc/cassandra/cassandra-env.sh \ No newline at end of file diff --git a/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs b/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs new file mode 100644 index 0000000000..f9b0324121 --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs @@ -0,0 +1,695 @@ +using System.Net; +using Cassandra; +using Microsoft.Extensions.Options; +using Orleans.Clustering.Cassandra; +using Orleans.Configuration; +using Orleans.Messaging; +using Orleans.Runtime; +using Tester.Cassandra.Utility; +using Xunit; +using Xunit.Abstractions; + +namespace Tester.Cassandra.Clustering; + +public sealed class Cassandra : IClassFixture +{ + private readonly CassandraContainer _cassandraContainer; + private readonly ITestOutputHelper _testOutputHelper; + private static readonly string HostName = Dns.GetHostName(); + private static int _generation; + + public Cassandra(CassandraContainer cassandraContainer, ITestOutputHelper testOutputHelper) + { + _cassandraContainer = cassandraContainer; + _cassandraContainer.Name = nameof(Cassandra); + _testOutputHelper = testOutputHelper; + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_GetGateways() + { + var (membershipTable, gatewayListProvider) = await CreateNewMembershipTableAsync(); + + var membershipEntries = Enumerable.Range(0, 10).Select(_ => CreateMembershipEntryForTest()).ToArray(); + + membershipEntries[3].Status = SiloStatus.Active; + membershipEntries[3].ProxyPort = 0; + membershipEntries[5].Status = SiloStatus.Active; + membershipEntries[9].Status = SiloStatus.Active; + + var data = await membershipTable.ReadAll(); + Assert.NotNull(data); + Assert.Empty(data.Members); + + var version = data.Version; + foreach (var membershipEntry in membershipEntries) + { + Assert.True(await membershipTable.InsertRow(membershipEntry, version.Next())); + version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version; + } + + var gateways = await gatewayListProvider.GetGateways(); + + var entries = new List(gateways.Select(g => g.ToString())); + + // only members with a non-zero Gateway port + Assert.DoesNotContain(membershipEntries[3].SiloAddress.ToGatewayUri().ToString(), entries); + + // only Active members + Assert.Contains(membershipEntries[5].SiloAddress.ToGatewayUri().ToString(), entries); + Assert.Contains(membershipEntries[9].SiloAddress.ToGatewayUri().ToString(), entries); + Assert.Equal(2, entries.Count); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_ReadAll_EmptyTable() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var data = await membershipTable.ReadAll(); + Assert.NotNull(data); + + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Empty(data.Members); + Assert.NotNull(data.Version.VersionEtag); + Assert.Equal(0, data.Version.Version); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_InsertRow() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var membershipEntry = CreateMembershipEntryForTest(); + + var data = await membershipTable.ReadAll(); + Assert.NotNull(data); + Assert.Empty(data.Members); + + var nextTableVersion = data.Version.Next(); + + var ok = await membershipTable.InsertRow(membershipEntry, nextTableVersion); + Assert.True(ok, "InsertRow failed"); + + data = await membershipTable.ReadAll(); + + Assert.Equal(1, data.Version.Version); + + Assert.Single(data.Members); + } + + // Cassandra doesn't have the capability to prevent a duplicate insert in the way this test requires + /*[Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_ReadRow_Insert_Read() + { + var (membershipTable, gatewayProvider) = await CreateNewMembershipTableAsync("Phalanx", "blu"); + + MembershipTableData data = await membershipTable.ReadAll(); + + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Empty(data.Members); + + TableVersion newTableVersion = data.Version.Next(); + + MembershipEntry newEntry = CreateMembershipEntryForTest(); + bool ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.True(ok, "InsertRow failed"); + + ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.False(ok, "InsertRow should have failed - same entry, old table version"); + + + ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion); + Assert.False(ok, "InsertRow should have failed - new entry, old table version"); + + data = await membershipTable.ReadAll(); + + Assert.Equal(1, data.Version.Version); + + TableVersion nextTableVersion = data.Version.Next(); + + ok = await membershipTable.InsertRow(newEntry, nextTableVersion); + Assert.False(ok, "InsertRow should have failed - duplicate entry"); + + data = await membershipTable.ReadAll(); + Assert.Single(data.Members); + + data = await membershipTable.ReadRow(newEntry.SiloAddress); + Assert.Equal(newTableVersion.Version, data.Version.Version); + + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Single(data.Members); + Assert.NotNull(data.Version.VersionEtag); + + Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); + Assert.Equal(newTableVersion.Version, data.Version.Version); + + var membershipEntry = data.Members[0].Item1; + string eTag = data.Members[0].Item2; + _testOutputHelper.WriteLine("Membership.ReadRow returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry); + + Assert.NotNull(eTag); + Assert.NotNull(membershipEntry); + }*/ + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_ReadRow_Insert_Read_modified() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var data = await membershipTable.ReadAll(); + + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Empty(data.Members); + + var newTableVersion = data.Version.Next(); + + var newEntry = CreateMembershipEntryForTest(); + var ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.True(ok, "InsertRow failed"); + + ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.False(ok, "InsertRow should have failed - same entry, old table version"); + + + ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion); + Assert.False(ok, "InsertRow should have failed - new entry, old table version"); + + data = await membershipTable.ReadAll(); + + Assert.Equal(1, data.Version.Version); + + var nextTableVersion = data.Version.Next(); + + ok = await membershipTable.InsertRow(newEntry, nextTableVersion); + + //Assert.False(ok, "InsertRow should have failed - duplicate entry"); + // Cassandra doesn't provide a way to prevent this insert + // adding discard assignment here to avoid "unused variable" warning + _ = ok; + + + data = await membershipTable.ReadAll(); + Assert.Single(data.Members); + + data = await membershipTable.ReadRow(newEntry.SiloAddress); + + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Single(data.Members); + Assert.NotNull(data.Version.VersionEtag); + + var membershipEntry = data.Members[0].Item1; + var eTag = data.Members[0].Item2; + _testOutputHelper.WriteLine("Membership.ReadRow returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry); + + Assert.NotNull(eTag); + Assert.NotNull(membershipEntry); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_ReadAll_Insert_ReadAll() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var data = await membershipTable.ReadAll(); + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Empty(data.Members); + + var newTableVersion = data.Version.Next(); + + var newEntry = CreateMembershipEntryForTest(); + var ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.True(ok, "InsertRow failed"); + + data = await membershipTable.ReadAll(); + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Single(data.Members); + Assert.NotNull(data.Version.VersionEtag); + + Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag); + Assert.Equal(newTableVersion.Version, data.Version.Version); + + var membershipEntry = data.Members[0].Item1; + var eTag = data.Members[0].Item2; + _testOutputHelper.WriteLine("Membership.ReadAll returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry); + + Assert.NotNull(eTag); + Assert.NotNull(membershipEntry); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_UpdateRow() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var tableData = await membershipTable.ReadAll(); + Assert.NotNull(tableData.Version); + + Assert.Equal(0, tableData.Version.Version); + Assert.Empty(tableData.Members); + + for (var i = 1; i < 10; i++) + { + var siloEntry = CreateMembershipEntryForTest(); + + siloEntry.SuspectTimes = + [ + new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(1)), + new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(2)) + ]; + + var tableVersion = tableData.Version.Next(); + + _testOutputHelper.WriteLine("Calling InsertRow with Entry = {0} TableVersion = {1}", siloEntry, tableVersion); + var ok = await membershipTable.InsertRow(siloEntry, tableVersion); + Assert.True(ok, "InsertRow failed"); + + tableData = await membershipTable.ReadAll(); + + var etagBefore = tableData.TryGet(siloEntry.SiloAddress)?.Item2; + + Assert.NotNull(etagBefore); + + _testOutputHelper.WriteLine( + "Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", + siloEntry, + etagBefore, + tableVersion?.ToString() ?? "null"); + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + tableData = await membershipTable.ReadAll(); + + tableVersion = tableData.Version.Next(); + + _testOutputHelper.WriteLine( + "Calling UpdateRow with Entry = {0} correct eTag = {1} correct version={2}", + siloEntry, + etagBefore, + tableVersion?.ToString() ?? "null"); + + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + + Assert.True(ok, $"UpdateRow failed - Table Data = {tableData}"); + + _testOutputHelper.WriteLine( + "Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}", + siloEntry, + etagBefore, + tableVersion?.ToString() ?? "null"); + ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion); + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + + tableData = await membershipTable.ReadAll(); + + var tuple = tableData.TryGet(siloEntry.SiloAddress); + + Assert.Equal(tuple.Item1.ToFullString(), siloEntry.ToFullString()); + + var etagAfter = tuple.Item2; + + _testOutputHelper.WriteLine( + "Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}", + siloEntry, + etagAfter, + tableVersion?.ToString() ?? "null"); + + ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion); + + Assert.False(ok, $"row update should have failed - Table Data = {tableData}"); + + tableData = await membershipTable.ReadAll(); + + etagBefore = etagAfter; + + etagAfter = tableData.TryGet(siloEntry.SiloAddress)?.Item2; + + Assert.Equal(etagBefore, etagAfter); + Assert.NotNull(tableData.Version); + Assert.Equal(tableVersion!.Version, tableData.Version.Version); + + Assert.Equal(i, tableData.Members.Count); + } + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_UpdateRowInParallel() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var tableData = await membershipTable.ReadAll(); + + var data = CreateMembershipEntryForTest(); + + var newTableVer = tableData.Version.Next(); + + var insertions = Task.WhenAll(Enumerable.Range(1, 20).Select(async _ => { try { return await membershipTable.InsertRow(data, newTableVer); } catch { return false; } })); + + Assert.True((await insertions).Single(x => x), "InsertRow failed"); + + await Task.WhenAll(Enumerable.Range(1, 19).Select(async _ => + { + var done = false; + do + { + var updatedTableData = await membershipTable.ReadAll(); + var updatedRow = updatedTableData.TryGet(data.SiloAddress); + + await Task.Delay(10); + if (updatedRow is null) continue; + + var tableVersion = updatedTableData.Version.Next(); + try + { + done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion); + } + catch + { + done = false; + } + } while (!done); + })).WithTimeout(TimeSpan.FromSeconds(30)); + + + tableData = await membershipTable.ReadAll(); + Assert.NotNull(tableData.Version); + + Assert.Equal(20, tableData.Version.Version); + + Assert.Single(tableData.Members); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_UpdateIAmAlive() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var tableData = await membershipTable.ReadAll(); + + var newTableVersion = tableData.Version.Next(); + var newEntry = CreateMembershipEntryForTest(); + var ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.True(ok); + + var amAliveTime = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); + + // This mimics the arguments MembershipOracle.OnIAmAliveUpdateInTableTimer passes in + var entry = new MembershipEntry + { + SiloAddress = newEntry.SiloAddress, + IAmAliveTime = amAliveTime + }; + + await membershipTable.UpdateIAmAlive(entry); + + tableData = await membershipTable.ReadAll(); + var member = tableData.Members.First(e => e.Item1.SiloAddress.Equals(newEntry.SiloAddress)); + + // compare that the value is close to what we passed in, but not exactly, as the underlying store can set its own precision settings + // (ie: in SQL Server this is defined as datetime2(3), so we don't expect precision to account for less than 0.001s values) + Assert.True((amAliveTime - member.Item1.IAmAliveTime).Duration() < TimeSpan.FromSeconds(2), "Expected time around " + amAliveTime + " but got " + member.Item1.IAmAliveTime + " that is off by " + (amAliveTime - member.Item1.IAmAliveTime).Duration().ToString()); + Assert.Equal(newTableVersion.Version, tableData.Version.Version); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task MembershipTable_CleanupDefunctSiloEntries() + { + var (membershipTable, _) = await CreateNewMembershipTableAsync(); + + var data = await membershipTable.ReadAll(); + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Empty(data.Members); + + var newTableVersion = data.Version.Next(); + + var oldEntryDead = CreateMembershipEntryForTest(); + oldEntryDead.IAmAliveTime = oldEntryDead.IAmAliveTime.AddDays(-10); + oldEntryDead.StartTime = oldEntryDead.StartTime.AddDays(-10); + oldEntryDead.Status = SiloStatus.Dead; + var ok = await membershipTable.InsertRow(oldEntryDead, newTableVersion); + var table = await membershipTable.ReadAll(); + + Assert.True(ok, "InsertRow Dead failed"); + + newTableVersion = table.Version.Next(); + var oldEntryJoining = CreateMembershipEntryForTest(); + oldEntryJoining.IAmAliveTime = oldEntryJoining.IAmAliveTime.AddDays(-10); + oldEntryJoining.StartTime = oldEntryJoining.StartTime.AddDays(-10); + oldEntryJoining.Status = SiloStatus.Joining; + ok = await membershipTable.InsertRow(oldEntryJoining, newTableVersion); + table = await membershipTable.ReadAll(); + + Assert.True(ok, "InsertRow Joining failed"); + + newTableVersion = table.Version.Next(); + var newEntry = CreateMembershipEntryForTest(); + ok = await membershipTable.InsertRow(newEntry, newTableVersion); + Assert.True(ok, "InsertRow failed"); + + data = await membershipTable.ReadAll(); + newTableVersion = data.Version.Next(); + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Equal(3, data.Members.Count); + + // Every status other than Active should get cleared out if old + foreach (var siloStatus in Enum.GetValues()) + { + var oldEntry = CreateMembershipEntryForTest(); + oldEntry.IAmAliveTime = oldEntry.IAmAliveTime.AddDays(-10); + oldEntry.StartTime = oldEntry.StartTime.AddDays(-10); + oldEntry.Status = siloStatus; + ok = await membershipTable.InsertRow(oldEntry, newTableVersion); + table = await membershipTable.ReadAll(); + + Assert.True(ok, "InsertRow failed"); + + newTableVersion = table.Version.Next(); + } + + await membershipTable.CleanupDefunctSiloEntries(oldEntryDead.IAmAliveTime.AddDays(3)); + + data = await membershipTable.ReadAll(); + _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data); + + Assert.Equal(2, data.Members.Count); + } + + // Utility methods + private static MembershipEntry CreateMembershipEntryForTest() + { + var siloAddress = CreateSiloAddressForTest(); + + var membershipEntry = new MembershipEntry + { + SiloAddress = siloAddress, + HostName = HostName, + SiloName = "TestSiloName", + Status = SiloStatus.Joining, + ProxyPort = siloAddress.Endpoint.Port, + StartTime = GetUtcNowWithSecondsResolution(), + IAmAliveTime = GetUtcNowWithSecondsResolution() + }; + + return membershipEntry; + } + + private static DateTime GetUtcNowWithSecondsResolution() + { + var now = DateTime.UtcNow; + return new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute, now.Second, DateTimeKind.Utc); + } + + private static SiloAddress CreateSiloAddressForTest() + { + var siloAddress = SiloAddressUtils.NewLocalSiloAddress(Interlocked.Increment(ref _generation)); + siloAddress.Endpoint.Port = 12345; + return siloAddress; + } + + private async Task<(IMembershipTable, IGatewayListProvider)> CreateNewMembershipTableAsync(string serviceId, string clusterId) + { + var session = await CreateSession(); + + var cassandraClusteringOptions = new ClusterOptions { ServiceId = serviceId, ClusterId = clusterId }; + var options = Options.Create(cassandraClusteringOptions); + IMembershipTable membershipTable = new CassandraClusteringTable(options, session); + await membershipTable.InitializeMembershipTable(true); + + IGatewayListProvider gatewayProvider = new CassandraGatewayListProvider(options, + Options.Create(new GatewayOptions { GatewayListRefreshPeriod = TimeSpan.FromSeconds(15) }), session); + await gatewayProvider.InitializeGatewayListProvider(); + + return (membershipTable, gatewayProvider); + } + + private async Task CreateSession() + { + var container = await _cassandraContainer.RunImage(); + + return container.session; + } + + private Task<(IMembershipTable, IGatewayListProvider)> CreateNewMembershipTableAsync() + { + var serviceId = $"Service_{Guid.NewGuid()}"; + var clusterId = $"Cluster_{Guid.NewGuid()}"; + + return CreateNewMembershipTableAsync(serviceId, clusterId); + } + + [Fact] + [Trait("Category", nameof(Cassandra))] + public async Task A_Test() + { + var serviceId = $"Service_{Guid.NewGuid()}"; + var clusterId = $"Cluster_{Guid.NewGuid()}"; + var clusterOptions = new ClusterOptions { ServiceId = serviceId, ClusterId = clusterId + "_1" }; + var clusterIdentifier = clusterOptions.ServiceId + "-" + clusterOptions.ClusterId; + var (membershipTable, gatewayProvider) = await CreateNewMembershipTableAsync(serviceId, clusterId + "_1"); + + var (otherMembershipTable, _) = await CreateNewMembershipTableAsync(serviceId, clusterId + "_2"); + + var tableData = await membershipTable.ReadAll(); + + await membershipTable.InsertRow( + new MembershipEntry + { + HostName = "host1", + IAmAliveTime = DateTime.UtcNow, + ProxyPort = 2345, + SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1), + SiloName = "silo1", + Status = SiloStatus.Created, + StartTime = DateTime.UtcNow + }, tableData.Version.Next()); + + tableData = await membershipTable.ReadAll(); + + await membershipTable.InsertRow( + new MembershipEntry + { + HostName = "host1", + IAmAliveTime = DateTime.UtcNow, + ProxyPort = 2345, + SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1), + SiloName = "silo1", + Status = SiloStatus.Joining, + StartTime = DateTime.UtcNow + }, tableData.Version.Next()); + + tableData = await otherMembershipTable.ReadAll(); + await otherMembershipTable.InsertRow( + new MembershipEntry + { + HostName = "host1", + IAmAliveTime = DateTime.UtcNow, + ProxyPort = 2345, + SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1), + SiloName = "silo1", + Status = SiloStatus.Joining, + StartTime = DateTime.UtcNow + }, tableData.Version.Next()); + + tableData = await membershipTable.ReadAll(); + + var membershipEntry = new MembershipEntry + { + HostName = "host1", + IAmAliveTime = DateTime.UtcNow, + ProxyPort = 2345, + SiloAddress = SiloAddress.New(IPAddress.Loopback, 2346, 1), + SiloName = "silo1", + Status = SiloStatus.Active, + StartTime = DateTime.UtcNow + }; + await membershipTable.InsertRow(membershipEntry, tableData.Version.Next()); + + var readAll = await membershipTable.ReadAll(); + + _testOutputHelper.WriteLine(readAll.Version.Version.ToString()); + foreach (var row in readAll.Members) + { + var entry = row.Item1; + _testOutputHelper.WriteLine(clusterIdentifier); + _testOutputHelper.WriteLine(" " + entry.HostName); + _testOutputHelper.WriteLine(" " + entry.SiloName); + _testOutputHelper.WriteLine(" " + entry.StartTime); + _testOutputHelper.WriteLine(" " + entry.IAmAliveTime); + _testOutputHelper.WriteLine(" " + entry.SiloAddress); + _testOutputHelper.WriteLine(" " + entry.ProxyPort); + _testOutputHelper.WriteLine(" " + entry.Status); + } + + membershipEntry.IAmAliveTime = DateTime.UtcNow + TimeSpan.FromSeconds(10); + await membershipTable.UpdateIAmAlive(membershipEntry); + + readAll = await membershipTable.ReadAll(); + + _testOutputHelper.WriteLine(readAll.Version.Version.ToString()); + foreach (var row in readAll.Members) + { + var entry = row.Item1; + _testOutputHelper.WriteLine(clusterIdentifier); + _testOutputHelper.WriteLine(" " + entry.HostName); + _testOutputHelper.WriteLine(" " + entry.SiloName); + _testOutputHelper.WriteLine(" " + entry.StartTime); + _testOutputHelper.WriteLine(" " + entry.IAmAliveTime); + _testOutputHelper.WriteLine(" " + entry.SiloAddress); + _testOutputHelper.WriteLine(" " + entry.ProxyPort); + _testOutputHelper.WriteLine(" " + entry.Status); + } + + await gatewayProvider.InitializeGatewayListProvider(); + + _ = await gatewayProvider.GetGateways(); + var gateways = await gatewayProvider.GetGateways(); + + foreach (var gateway in gateways) + { + _testOutputHelper.WriteLine(gateway.ToString()); + } + + var queriedEntry = await membershipTable.ReadRow(membershipEntry.SiloAddress); + foreach (var queriedEntryMember in queriedEntry.Members) + { + _testOutputHelper.WriteLine(queriedEntryMember.Item1.SiloAddress.ToParsableString()); + } + + await membershipTable.DeleteMembershipTableEntries(clusterOptions.ClusterId); + + readAll = await membershipTable.ReadAll(); + + _testOutputHelper.WriteLine(readAll.Version.Version.ToString()); + foreach (var row in readAll.Members) + { + var entry = row.Item1; + _testOutputHelper.WriteLine(clusterIdentifier); + _testOutputHelper.WriteLine(" " + entry.HostName); + _testOutputHelper.WriteLine(" " + entry.SiloName); + _testOutputHelper.WriteLine(" " + entry.StartTime); + _testOutputHelper.WriteLine(" " + entry.IAmAliveTime); + _testOutputHelper.WriteLine(" " + entry.SiloAddress); + _testOutputHelper.WriteLine(" " + entry.ProxyPort); + _testOutputHelper.WriteLine(" " + entry.Status); + } + } + +} \ No newline at end of file diff --git a/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs b/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs new file mode 100644 index 0000000000..2d3821d528 --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs @@ -0,0 +1,52 @@ +using System.Net; +using Cassandra; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; + +namespace Tester.Cassandra.Clustering; + +public class CassandraContainer +{ + public Task<(IContainer container, ushort exposedPort, Cluster cluster, ISession session)> RunImage() => _innerRunImage.Value; + + private readonly Lazy> _innerRunImage = + new(async () => + { + var cassandraImage = new ImageFromDockerfileBuilder() + .WithDockerfileDirectory(CommonDirectoryPath.GetProjectDirectory(), string.Empty) + .WithDockerfile("Cassandra.dockerfile") + .WithBuildArgument("CASSANDRAVERSION", Environment.GetEnvironmentVariable("CASSANDRAVERSION")) + .Build(); + + var imageTask = cassandraImage.CreateAsync(); + + await imageTask; + + var containerPort = 9042; + + var builder = new ContainerBuilder() + .WithImage(cassandraImage) + .WithPortBinding(containerPort, true) + .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(containerPort)); + + var container = builder.Build(); + + await container.StartAsync(); + + var exposedPort = container.GetMappedPublicPort(containerPort); + + var cluster = Cluster.Builder() + .WithDefaultKeyspace("orleans") + .AddContactPoints(new IPEndPoint(IPAddress.Loopback, exposedPort)) + .Build(); + + // Connect to the nodes using a keyspace + var session = + cluster.ConnectAndCreateDefaultKeyspaceIfNotExists(ReplicationStrategies + .CreateSimpleStrategyReplicationProperty(1)); + + return (container, exposedPort, cluster, session); + }); + + public string Name { get; set; } = string.Empty; +} \ No newline at end of file diff --git a/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs b/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs new file mode 100644 index 0000000000..96376e1fe7 --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs @@ -0,0 +1,14 @@ +using System.Net; +using Orleans.Runtime; + +namespace Tester.Cassandra.Clustering; + +public static class SiloAddressUtils +{ + private static readonly IPEndPoint s_localEndpoint = new(IPAddress.Loopback, 0); + + public static SiloAddress NewLocalSiloAddress(int gen) + { + return SiloAddress.New(s_localEndpoint, gen); + } +} \ No newline at end of file diff --git a/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj b/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj new file mode 100644 index 0000000000..2278d6da1c --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj @@ -0,0 +1,19 @@ + + + + $(TestTargetFrameworks) + true + + + + + + + + + + + + + + diff --git a/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs b/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs new file mode 100644 index 0000000000..5a6a2ccd1d --- /dev/null +++ b/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs @@ -0,0 +1,47 @@ +namespace Tester.Cassandra.Utility +{ + public static class TestExtensions + { + public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeout) + { + if (taskToComplete.IsCompleted) + { + await taskToComplete; + return; + } + + var timeoutCancellationTokenSource = new CancellationTokenSource(); + var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeout, timeoutCancellationTokenSource.Token)); + + if (taskToComplete == completedTask) + { + timeoutCancellationTokenSource.Cancel(); + await taskToComplete; + return; + } + + taskToComplete.Ignore(); + throw new TimeoutException(string.Format("WithTimeout has timed out after {0}.", timeout)); + } + + public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeout) + { + if (taskToComplete.IsCompleted) + { + return await taskToComplete; + } + + var timeoutCancellationTokenSource = new CancellationTokenSource(); + var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeout, timeoutCancellationTokenSource.Token)); + + if (taskToComplete == completedTask) + { + timeoutCancellationTokenSource.Cancel(); + return await taskToComplete; + } + + taskToComplete.Ignore(); + throw new TimeoutException(string.Format("WithTimeout has timed out after {0}.", timeout)); + } + } +} \ No newline at end of file