diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 82827c2bf8..28d3af2bc1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -70,6 +70,33 @@ jobs:
path: |
**/TestResults/*
**/logs/*
+ test-cassandra:
+ name: Cassandra provider tests
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ provider: ["Cassandra"]
+ dbversion: ["3.11", "4.0", "4.1", "5.0"]
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-version: |
+ 8.0.x
+ - name: Test
+ run: dotnet test --filter "Category=${{ matrix.provider }}&(Category=BVT|Category=SlowBVT|Category=Functional)" --blame-hang-timeout 10m --logger "trx" -- -parallel none -noshadow
+ env:
+ CASSANDRAVERSION: ${{ matrix.dbversion }}
+ - name: Archive Test Results
+ if: always()
+ uses: actions/upload-artifact@v3
+ with:
+ name: test_output
+ retention-days: 1
+ path: |
+ **/TestResults/*
+ **/logs/*
test-postgres:
name: PostgreSQL provider tests
runs-on: ubuntu-latest
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 334880af6d..d8ca234e88 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -61,6 +61,7 @@
+
@@ -95,6 +96,7 @@
+
diff --git a/Orleans.sln b/Orleans.sln
index 2b92cc4246..5def748ca4 100644
--- a/Orleans.sln
+++ b/Orleans.sln
@@ -219,6 +219,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extens
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Cassandra", "Cassandra", "{42C80201-3AC6-4C10-9809-3315A9FDD7A1}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Clustering.Cassandra", "src\Cassandra\Orleans.Clustering.Cassandra\Orleans.Clustering.Cassandra.csproj", "{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Cassandra", "test\Extensions\Tester.Cassandra\Tester.Cassandra.csproj", "{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -581,6 +587,14 @@ Global
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.Build.0 = Release|Any CPU
+ {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -688,6 +702,9 @@ Global
{8FC6457C-6273-4338-AD2A-ECAA8FE2C5D7} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
+ {42C80201-3AC6-4C10-9809-3315A9FDD7A1} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
+ {7A1A2ECE-DC4B-4DE7-AEF7-855C50895171} = {42C80201-3AC6-4C10-9809-3315A9FDD7A1}
+ {15A1777E-D1B6-4DC8-81D4-998A4CBA63FE} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs
new file mode 100644
index 0000000000..dfc7eb7b9c
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs
@@ -0,0 +1,153 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Threading.Tasks;
+using Cassandra;
+using Microsoft.Extensions.Options;
+using Orleans.Configuration;
+using Orleans.Runtime;
+
+namespace Orleans.Clustering.Cassandra;
+
+public class CassandraClusteringTable : IMembershipTable
+{
+ private readonly ClusterOptions _options;
+ private readonly ISession _session;
+ private OrleansQueries? _queries;
+ private readonly string _identifier;
+
+
+ public CassandraClusteringTable(IOptions options, ISession session)
+ {
+ _options = options.Value;
+ _identifier = $"{_options.ServiceId}-{_options.ClusterId}";
+ _session = session;
+ }
+
+ async Task IMembershipTable.InitializeMembershipTable(bool tryInitTableVersion)
+ {
+ _queries = await OrleansQueries.CreateInstance(_session);
+
+ await _session.ExecuteAsync(_queries.EnsureTableExists());
+ await _session.ExecuteAsync(_queries.EnsureIndexExists());
+
+ if (!tryInitTableVersion)
+ return;
+
+ await _session.ExecuteAsync(await _queries.InsertMembershipVersion(_identifier));
+ }
+
+ async Task IMembershipTable.DeleteMembershipTableEntries(string clusterId)
+ {
+ if (string.Compare(clusterId, _options.ClusterId,
+ StringComparison.InvariantCultureIgnoreCase) != 0)
+ {
+ throw new ArgumentException(
+ $"cluster id {clusterId} does not match CassandraClusteringTable value of {_options.ClusterId}",
+ nameof(clusterId));
+ }
+ await _session.ExecuteAsync(await _queries!.DeleteMembershipTableEntries(_identifier));
+ }
+
+ async Task IMembershipTable.InsertRow(MembershipEntry entry, TableVersion tableVersion)
+ {
+ var query = await _session.ExecuteAsync(await _queries!.InsertMembership(_identifier, entry, tableVersion.Version - 1));
+ return (bool)query.First()["[applied]"];
+ }
+
+ async Task IMembershipTable.UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
+ {
+ var query = await _session.ExecuteAsync(await _queries!.UpdateMembership(_identifier, entry, tableVersion.Version - 1));
+ return (bool)query.First()["[applied]"];
+ }
+
+ private static MembershipEntry? GetMembershipEntry(Row row, SiloAddress? forAddress = null)
+ {
+ if (row["start_time"] == null)
+ return null;
+
+ var result = new MembershipEntry
+ {
+ SiloAddress = forAddress ?? SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["port"]), (int)row["generation"]),
+ SiloName = (string)row["silo_name"],
+ HostName = (string)row["host_name"],
+ Status = (SiloStatus)(int)row["status"],
+ ProxyPort = (int)row["proxy_port"],
+ StartTime = ((DateTimeOffset)row["start_time"]).UtcDateTime,
+ IAmAliveTime = ((DateTimeOffset)row["i_am_alive_time"]).UtcDateTime
+ };
+
+ var suspectingSilos = (string)row["suspect_times"];
+ if (!string.IsNullOrWhiteSpace(suspectingSilos))
+ {
+ result.SuspectTimes = new List>();
+ result.SuspectTimes.AddRange(suspectingSilos.Split('|').Select(s =>
+ {
+ var split = s.Split(',');
+ return new Tuple(SiloAddress.FromParsableString(split[0]), LogFormatter.ParseDate(split[1]));
+ }));
+ }
+
+ return result;
+ }
+
+ private async Task GetMembershipTableData(RowSet rows, SiloAddress? forAddress = null)
+ {
+ int version;
+
+ var firstRow = rows.FirstOrDefault();
+ if (firstRow != null)
+ {
+ version = (int)firstRow["version"];
+
+ var entries = new List>();
+ foreach (var row in new[] { firstRow }.Concat(rows))
+ {
+ var entry = GetMembershipEntry(row, forAddress);
+ if (entry != null)
+ entries.Add(new Tuple(entry, string.Empty));
+ }
+
+ return new MembershipTableData(entries, new TableVersion(version, version.ToString()));
+ }
+ else
+ {
+ var result = (await _session.ExecuteAsync(await _queries!.MembershipReadVersion(_identifier))).FirstOrDefault();
+ if (result is null)
+ {
+ return new MembershipTableData(new List>(), new TableVersion(0, "0"));
+ }
+ version = (int)result["version"];
+ return new MembershipTableData(new List>(), new TableVersion(version, version.ToString()));
+ }
+ }
+
+ async Task IMembershipTable.ReadAll()
+ {
+ return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier)));
+ }
+
+ async Task IMembershipTable.ReadRow(SiloAddress key)
+ {
+ return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadRow(_identifier, key)), key);
+ }
+
+ async Task IMembershipTable.UpdateIAmAlive(MembershipEntry entry)
+ {
+ await _session.ExecuteAsync(await _queries!.UpdateIAmAliveTime(_identifier, entry));
+ }
+
+ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
+ {
+ var allEntries =
+ (await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier)))
+ .Select(r => GetMembershipEntry(r))
+ .Where(e => e is not null)
+ .Cast();
+
+ foreach (var e in allEntries)
+ if (e is not { Status: SiloStatus.Active } && new DateTime(Math.Max(e.IAmAliveTime.Ticks, e.StartTime.Ticks)) < beforeDate)
+ await _session.ExecuteAsync(await _queries.DeleteMembershipEntry(_identifier, e));
+ }
+}
\ No newline at end of file
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs
new file mode 100644
index 0000000000..20ba2c6545
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Threading.Tasks;
+using Cassandra;
+using Microsoft.Extensions.Options;
+using Orleans.Configuration;
+using Orleans.Messaging;
+using Orleans.Runtime;
+
+namespace Orleans.Clustering.Cassandra;
+
+public class CassandraGatewayListProvider : IGatewayListProvider
+{
+ private readonly ClusterOptions _options;
+ private readonly TimeSpan _maxStaleness;
+ private readonly ISession _session;
+ private OrleansQueries? _queries;
+ private DateTime _cacheUntil;
+ private List? _cachedResult;
+ private readonly string _identifier;
+
+
+ TimeSpan IGatewayListProvider.MaxStaleness => _maxStaleness;
+
+ bool IGatewayListProvider.IsUpdatable => true;
+
+
+ public CassandraGatewayListProvider(IOptions options, IOptions gatewayOptions, ISession session)
+ {
+ _options = options.Value;
+ _identifier = $"{_options.ServiceId}-{_options.ClusterId}";
+
+ _maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod;
+ _session = session;
+ }
+
+ async Task IGatewayListProvider.InitializeGatewayListProvider()
+ {
+ _queries = await OrleansQueries.CreateInstance(_session);
+
+ await _session.ExecuteAsync(_queries.EnsureTableExists());
+ await _session.ExecuteAsync(_queries.EnsureIndexExists());
+ }
+
+ async Task> IGatewayListProvider.GetGateways()
+ {
+ if (_cachedResult is not null && _cacheUntil > DateTime.UtcNow)
+ {
+ return _cachedResult.ToList();
+ }
+
+ var rows = await _session.ExecuteAsync(await _queries!.GatewaysQuery(_identifier, (int)SiloStatus.Active));
+ var result = new List();
+
+ foreach (var row in rows)
+ result.Add(SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["proxy_port"]), (int)row["generation"]).ToGatewayUri());
+
+ _cacheUntil = DateTime.UtcNow + _maxStaleness;
+ _cachedResult = result;
+ return result.ToList();
+ }
+}
\ No newline at end of file
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs
new file mode 100644
index 0000000000..5356f89db4
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs
@@ -0,0 +1,7 @@
+namespace Orleans.Clustering.Cassandra.Hosting;
+
+public class CassandraClusteringOptions
+{
+ public required string ConnectionString { get; set; }
+ public string Keyspace { get; set; } = "orleans";
+}
\ No newline at end of file
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs
new file mode 100644
index 0000000000..c195d718cf
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs
@@ -0,0 +1,62 @@
+using System;
+using Cassandra;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Orleans.Configuration;
+using Orleans.Hosting;
+
+namespace Orleans.Clustering.Cassandra.Hosting;
+
+public static class CassandraMembershipHostingExtensions
+{
+ ///
+ /// Configures Orleans clustering using Cassandra
+ ///
+ ///
+ ///
+ /// Pulls of type and from the DI container
+ public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder) =>
+ builder.ConfigureServices(services =>
+ {
+ services.AddOptions().ValidateOnStart();
+ services.AddSingleton();
+ });
+
+ ///
+ /// Configures Orleans clustering using Cassandra
+ ///
+ ///
+ /// Configuration for the
+ /// Resolving method for
+ /// A newly created created with the provided and the resolved
+ public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, Func sessionProvider) =>
+ builder.ConfigureServices(services =>
+ {
+ services.AddSingleton(provider =>
+ {
+ var session = sessionProvider(provider);
+ return new CassandraClusteringTable(Options.Create(clusterOptions), session);
+ });
+ });
+
+
+ ///
+ /// Configures Orleans clustering using Cassandra
+ ///
+ ///
+ /// Configuration for the
+ /// Configuration used to create a new
+ ///
+ public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, CassandraClusteringOptions cassandraOptions) =>
+ builder.ConfigureServices(services =>
+ {
+ services.AddSingleton(_ =>
+ {
+ var c = Cluster.Builder().WithConnectionString(cassandraOptions.ConnectionString)
+ .Build();
+
+ var session = c.Connect(cassandraOptions.Keyspace);
+ return new CassandraClusteringTable(Options.Create(clusterOptions), session);
+ });
+ });
+}
\ No newline at end of file
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj b/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj
new file mode 100644
index 0000000000..14f851c5cf
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj
@@ -0,0 +1,21 @@
+
+
+ Microsoft.Orleans.Clustering.Cassandra
+ Microsoft Orleans Cassandra Clustering Provider
+ Microsoft Orleans clustering provider backed by Cassandra
+ $(PackageTags) Cassandra
+ $(DefaultTargetFrameworks)
+ Orleans.Clustering.Cassandra
+ Orleans.Clustering.Cassandra
+ true
+ $(DefineConstants);ORLEANS_CLUSTERING
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs b/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs
new file mode 100644
index 0000000000..a9e09bc4f3
--- /dev/null
+++ b/src/Cassandra/Orleans.Clustering.Cassandra/OrleansQueries.cs
@@ -0,0 +1,335 @@
+using Cassandra;
+using Orleans.Runtime;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Orleans.Clustering.Cassandra;
+
+///
+/// This class is responsible for keeping a list of prepared queries and
+/// knowing their parameters (including type and conversion to the target
+/// type).
+///
+public class OrleansQueries
+{
+ public ISession Session { get; }
+
+ private PreparedStatement? _insertMembershipVersionPreparedStatement;
+ private PreparedStatement? _deleteMembershipTablePreparedStatement;
+ private PreparedStatement? _insertMembershipPreparedStatement;
+ private PreparedStatement? _membershipReadAllPreparedStatement;
+ private PreparedStatement? _membershipReadVersionPreparedStatement;
+ private PreparedStatement? _updateIAmAlivePreparedStatement;
+ private PreparedStatement? _deleteMembershipEntryPreparedStatement;
+ private PreparedStatement? _updateMembershipPreparedStatement;
+
+ public static Task CreateInstance(ISession session)
+ {
+ string? dc = null;
+ var isMultiDataCenter = false;
+ foreach (var dataCenter in session.Cluster.AllHosts().Where(h => h?.Datacenter is not null).Select(h => h.Datacenter))
+ {
+ dc ??= dataCenter;
+ if (dc != dataCenter)
+ {
+ isMultiDataCenter = true;
+ break;
+ }
+ }
+
+ return Task.FromResult(new OrleansQueries(session, isMultiDataCenter));
+ }
+
+ private OrleansQueries(ISession session, bool isMultiDataCenter)
+ {
+ if (isMultiDataCenter)
+ {
+ MembershipReadConsistencyLevel = ConsistencyLevel.LocalQuorum;
+ MembershipWriteConsistencyLevel = ConsistencyLevel.EachQuorum;
+ }
+ else
+ {
+ MembershipReadConsistencyLevel = ConsistencyLevel.Quorum;
+ MembershipWriteConsistencyLevel = ConsistencyLevel.Quorum;
+ }
+ Session = session;
+ }
+
+ public ConsistencyLevel MembershipWriteConsistencyLevel { get; set; }
+
+ public ConsistencyLevel MembershipReadConsistencyLevel { get; set; }
+
+ public IStatement EnsureTableExists()
+ {
+ return new SimpleStatement("""
+ CREATE TABLE IF NOT EXISTS membership
+ (
+ partition_key ascii,
+ version int static,
+ address ascii,
+ port int,
+ generation int,
+ silo_name text,
+ host_name text,
+ status int,
+ proxy_port int,
+ suspect_times ascii,
+ start_time timestamp,
+ i_am_alive_time timestamp,
+
+ PRIMARY KEY(partition_key, address, port, generation)
+ ) WITH compression = {
+ 'class' : 'LZ4Compressor',
+ 'enabled' : true
+ };
+ """);
+ }
+ public IStatement EnsureIndexExists()
+ {
+ return new SimpleStatement("""
+ CREATE INDEX IF NOT EXISTS ix_membership_status ON membership(status);
+ """);
+ }
+
+ public async ValueTask InsertMembership(string clusterIdentifier, MembershipEntry membershipEntry, int version)
+ {
+ _insertMembershipPreparedStatement ??= await PrepareStatementAsync("""
+ UPDATE membership
+ SET
+ version = :new_version,
+ status = :status,
+ start_time = :start_time,
+ silo_name = :silo_name,
+ host_name = :host_name,
+ proxy_port = :proxy_port,
+ i_am_alive_time = :i_am_alive_time
+ WHERE
+ partition_key = :partition_key
+ AND address = :address
+ AND port = :port
+ AND generation = :generation
+ IF
+ version = :expected_version;
+ """, MembershipWriteConsistencyLevel);
+ return _insertMembershipPreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ address = membershipEntry.SiloAddress.Endpoint.Address.ToString(),
+ port = membershipEntry.SiloAddress.Endpoint.Port,
+ generation = membershipEntry.SiloAddress.Generation,
+ silo_name = membershipEntry.SiloName,
+ host_name = membershipEntry.HostName,
+ status = (int)membershipEntry.Status,
+ proxy_port = membershipEntry.ProxyPort,
+ start_time = membershipEntry.StartTime,
+ i_am_alive_time = membershipEntry.IAmAliveTime,
+ new_version = version + 1,
+ expected_version = version
+ });
+ }
+ public async ValueTask InsertMembershipVersion(string clusterIdentifier)
+ {
+ _insertMembershipVersionPreparedStatement ??= await PrepareStatementAsync("""
+ INSERT INTO membership(
+ partition_key,
+ version
+ )
+ VALUES (
+ :partition_key,
+ 0
+ )
+ IF NOT EXISTS;
+ """, MembershipWriteConsistencyLevel);
+ return _insertMembershipVersionPreparedStatement.Bind(clusterIdentifier);
+ }
+
+
+ public async ValueTask DeleteMembershipTableEntries(string clusterIdentifier)
+ {
+ _deleteMembershipTablePreparedStatement ??= await PrepareStatementAsync("""
+ DELETE FROM membership WHERE partition_key = :partition_key;
+ """,
+ MembershipWriteConsistencyLevel);
+ return _deleteMembershipTablePreparedStatement.Bind(clusterIdentifier);
+ }
+
+ public async ValueTask UpdateIAmAliveTime(string clusterIdentifier, MembershipEntry membershipEntry)
+ {
+ _updateIAmAlivePreparedStatement ??= await PrepareStatementAsync("""
+ UPDATE membership
+ SET
+ i_am_alive_time = :i_am_alive_time
+ WHERE
+ partition_key = :partition_key
+ AND address = :address
+ AND port = :port
+ AND generation = :generation;
+ """,
+ ConsistencyLevel.Any);
+
+ return _updateIAmAlivePreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ i_am_alive_time = membershipEntry.IAmAliveTime,
+ address = membershipEntry.SiloAddress.Endpoint.Address.ToString(),
+ port = membershipEntry.SiloAddress.Endpoint.Port,
+ generation = membershipEntry.SiloAddress.Generation
+ });
+ }
+
+ public async ValueTask DeleteMembershipEntry(string clusterIdentifier, MembershipEntry membershipEntry)
+ {
+ _deleteMembershipEntryPreparedStatement ??= await PrepareStatementAsync("""
+ DELETE FROM
+ membership
+ WHERE
+ partition_key = :partition_key
+ AND address = :address
+ AND port = :port
+ AND generation = :generation;
+ """, MembershipWriteConsistencyLevel);
+ return _deleteMembershipEntryPreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ address = membershipEntry.SiloAddress.Endpoint.Address.ToString(),
+ port = membershipEntry.SiloAddress.Endpoint.Port,
+ generation = membershipEntry.SiloAddress.Generation
+ });
+ }
+
+ public async ValueTask UpdateMembership(string clusterIdentifier, MembershipEntry membershipEntry, int version)
+ {
+ _updateMembershipPreparedStatement ??= await PrepareStatementAsync("""
+ UPDATE membership
+ SET
+ version = :new_version,
+ status = :status,
+ suspect_times = :suspect_times,
+ i_am_alive_time = :i_am_alive_time
+ WHERE
+ partition_key = :partition_key
+ AND address = :address
+ AND port = :port
+ AND generation = :generation
+ IF
+ version = :expected_version;
+ """, MembershipWriteConsistencyLevel);
+ return _updateMembershipPreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ new_version = version + 1,
+ expected_version = version,
+ status = (int)membershipEntry.Status,
+ suspect_times =
+ membershipEntry.SuspectTimes == null
+ ? null
+ : string.Join("|",
+ membershipEntry.SuspectTimes.Select(s =>
+ $"{s.Item1.ToParsableString()},{LogFormatter.PrintDate(s.Item2)}")),
+ i_am_alive_time = membershipEntry.IAmAliveTime,
+ address = membershipEntry.SiloAddress.Endpoint.Address.ToString(),
+ port = membershipEntry.SiloAddress.Endpoint.Port,
+ generation = membershipEntry.SiloAddress.Generation
+ });
+ }
+
+ public async ValueTask MembershipReadVersion(string clusterIdentifier)
+ {
+ _membershipReadVersionPreparedStatement ??= await PrepareStatementAsync("""
+ SELECT
+ version
+ FROM
+ membership
+ WHERE
+ partition_key = :partition_key;
+ """,
+ MembershipReadConsistencyLevel);
+ return _membershipReadVersionPreparedStatement.Bind(clusterIdentifier);
+ }
+
+ public async ValueTask MembershipReadAll(string clusterIdentifier)
+ {
+ _membershipReadAllPreparedStatement ??= await PrepareStatementAsync("""
+ SELECT
+ version,
+ address,
+ port,
+ generation,
+ silo_name,
+ host_name,
+ status,
+ proxy_port,
+ suspect_times,
+ start_time,
+ i_am_alive_time
+ FROM
+ membership
+ WHERE
+ partition_key = :partition_key;
+ """,
+ MembershipReadConsistencyLevel);
+ return _membershipReadAllPreparedStatement.Bind(clusterIdentifier);
+ }
+
+ public async ValueTask MembershipReadRow(string clusterIdentifier, SiloAddress siloAddress)
+ {
+ _membershipReadAllPreparedStatement ??= await PrepareStatementAsync("""
+ SELECT
+ version,
+ silo_name,
+ host_name,
+ status,
+ proxy_port,
+ suspect_times,
+ start_time,
+ i_am_alive_time
+ FROM
+ membership
+ WHERE
+ partition_key = :partition_key
+ AND address = :address
+ AND port = :port
+ AND generation = :generation;
+ """,
+ MembershipReadConsistencyLevel);
+ return _membershipReadAllPreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ address = siloAddress.Endpoint.Address.ToString(),
+ port = siloAddress.Endpoint.Port,
+ generation = siloAddress.Generation
+ });
+ }
+
+ public async ValueTask GatewaysQuery(string clusterIdentifier, int status)
+ {
+ // Filtering is only for the `proxy_port` filtering. We're already hitting the partition
+ // and secondary index on status which both don't need "ALLOW FILTERING"
+ _membershipReadAllPreparedStatement ??= await PrepareStatementAsync("""
+ SELECT
+ address,
+ proxy_port,
+ generation
+ FROM
+ membership
+ WHERE
+ partition_key = :partition_key
+ AND status = :status
+ AND proxy_port > 0
+ ALLOW FILTERING;
+ """,
+ MembershipReadConsistencyLevel);
+ return _membershipReadAllPreparedStatement.Bind(new
+ {
+ partition_key = clusterIdentifier,
+ status = status
+ });
+ }
+
+ private async ValueTask PrepareStatementAsync(string cql, ConsistencyLevel consistencyLevel)
+ {
+ var statement = await Session.PrepareAsync(cql);
+ statement.SetConsistencyLevel(consistencyLevel);
+ return statement;
+ }
+}
diff --git a/test/Extensions/Tester.Cassandra/Cassandra.dockerfile b/test/Extensions/Tester.Cassandra/Cassandra.dockerfile
new file mode 100644
index 0000000000..87037174c8
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Cassandra.dockerfile
@@ -0,0 +1,20 @@
+ARG CASSANDRAVERSION=4.1
+FROM cassandra:${CASSANDRAVERSION}
+
+RUN sed -i 's/auto_snapshot: true/auto_snapshot: false/g' /etc/cassandra/cassandra.yaml
+
+# Disable virtual nodes
+RUN sed -i -e "s/num_tokens/\#num_tokens/" /etc/cassandra/cassandra.yaml
+
+# With virtual nodes disabled, we have to configure initial_token
+RUN sed -i -e "s/\# initial_token:/initial_token: 0/" /etc/cassandra/cassandra.yaml
+RUN echo "JVM_OPTS=\"\$JVM_OPTS -Dcassandra.initial_token=0\"" >> /etc/cassandra/cassandra-env.sh
+
+# set 0.0.0.0 Listens on all configured interfaces
+RUN sed -i -e "s/^rpc_address.*/rpc_address: 0.0.0.0/" /etc/cassandra/cassandra.yaml
+
+# Be your own seed
+RUN sed -i -e "s/- seeds: \"127.0.0.1\"/- seeds: \"$SEEDS\"/" /etc/cassandra/cassandra.yaml
+
+# Disable gossip, no need in one node cluster
+RUN echo "JVM_OPTS=\"\$JVM_OPTS -Dcassandra.skip_wait_for_gossip_to_settle=0\"" >> /etc/cassandra/cassandra-env.sh
\ No newline at end of file
diff --git a/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs b/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs
new file mode 100644
index 0000000000..f9b0324121
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Clustering/Cassandra.cs
@@ -0,0 +1,695 @@
+using System.Net;
+using Cassandra;
+using Microsoft.Extensions.Options;
+using Orleans.Clustering.Cassandra;
+using Orleans.Configuration;
+using Orleans.Messaging;
+using Orleans.Runtime;
+using Tester.Cassandra.Utility;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tester.Cassandra.Clustering;
+
+public sealed class Cassandra : IClassFixture
+{
+ private readonly CassandraContainer _cassandraContainer;
+ private readonly ITestOutputHelper _testOutputHelper;
+ private static readonly string HostName = Dns.GetHostName();
+ private static int _generation;
+
+ public Cassandra(CassandraContainer cassandraContainer, ITestOutputHelper testOutputHelper)
+ {
+ _cassandraContainer = cassandraContainer;
+ _cassandraContainer.Name = nameof(Cassandra);
+ _testOutputHelper = testOutputHelper;
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_GetGateways()
+ {
+ var (membershipTable, gatewayListProvider) = await CreateNewMembershipTableAsync();
+
+ var membershipEntries = Enumerable.Range(0, 10).Select(_ => CreateMembershipEntryForTest()).ToArray();
+
+ membershipEntries[3].Status = SiloStatus.Active;
+ membershipEntries[3].ProxyPort = 0;
+ membershipEntries[5].Status = SiloStatus.Active;
+ membershipEntries[9].Status = SiloStatus.Active;
+
+ var data = await membershipTable.ReadAll();
+ Assert.NotNull(data);
+ Assert.Empty(data.Members);
+
+ var version = data.Version;
+ foreach (var membershipEntry in membershipEntries)
+ {
+ Assert.True(await membershipTable.InsertRow(membershipEntry, version.Next()));
+ version = (await membershipTable.ReadRow(membershipEntry.SiloAddress)).Version;
+ }
+
+ var gateways = await gatewayListProvider.GetGateways();
+
+ var entries = new List(gateways.Select(g => g.ToString()));
+
+ // only members with a non-zero Gateway port
+ Assert.DoesNotContain(membershipEntries[3].SiloAddress.ToGatewayUri().ToString(), entries);
+
+ // only Active members
+ Assert.Contains(membershipEntries[5].SiloAddress.ToGatewayUri().ToString(), entries);
+ Assert.Contains(membershipEntries[9].SiloAddress.ToGatewayUri().ToString(), entries);
+ Assert.Equal(2, entries.Count);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_ReadAll_EmptyTable()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var data = await membershipTable.ReadAll();
+ Assert.NotNull(data);
+
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Empty(data.Members);
+ Assert.NotNull(data.Version.VersionEtag);
+ Assert.Equal(0, data.Version.Version);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_InsertRow()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var membershipEntry = CreateMembershipEntryForTest();
+
+ var data = await membershipTable.ReadAll();
+ Assert.NotNull(data);
+ Assert.Empty(data.Members);
+
+ var nextTableVersion = data.Version.Next();
+
+ var ok = await membershipTable.InsertRow(membershipEntry, nextTableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ data = await membershipTable.ReadAll();
+
+ Assert.Equal(1, data.Version.Version);
+
+ Assert.Single(data.Members);
+ }
+
+ // Cassandra doesn't have the capability to prevent a duplicate insert in the way this test requires
+ /*[Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_ReadRow_Insert_Read()
+ {
+ var (membershipTable, gatewayProvider) = await CreateNewMembershipTableAsync("Phalanx", "blu");
+
+ MembershipTableData data = await membershipTable.ReadAll();
+
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Empty(data.Members);
+
+ TableVersion newTableVersion = data.Version.Next();
+
+ MembershipEntry newEntry = CreateMembershipEntryForTest();
+ bool ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.False(ok, "InsertRow should have failed - same entry, old table version");
+
+
+ ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion);
+ Assert.False(ok, "InsertRow should have failed - new entry, old table version");
+
+ data = await membershipTable.ReadAll();
+
+ Assert.Equal(1, data.Version.Version);
+
+ TableVersion nextTableVersion = data.Version.Next();
+
+ ok = await membershipTable.InsertRow(newEntry, nextTableVersion);
+ Assert.False(ok, "InsertRow should have failed - duplicate entry");
+
+ data = await membershipTable.ReadAll();
+ Assert.Single(data.Members);
+
+ data = await membershipTable.ReadRow(newEntry.SiloAddress);
+ Assert.Equal(newTableVersion.Version, data.Version.Version);
+
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Single(data.Members);
+ Assert.NotNull(data.Version.VersionEtag);
+
+ Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag);
+ Assert.Equal(newTableVersion.Version, data.Version.Version);
+
+ var membershipEntry = data.Members[0].Item1;
+ string eTag = data.Members[0].Item2;
+ _testOutputHelper.WriteLine("Membership.ReadRow returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry);
+
+ Assert.NotNull(eTag);
+ Assert.NotNull(membershipEntry);
+ }*/
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_ReadRow_Insert_Read_modified()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var data = await membershipTable.ReadAll();
+
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Empty(data.Members);
+
+ var newTableVersion = data.Version.Next();
+
+ var newEntry = CreateMembershipEntryForTest();
+ var ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.False(ok, "InsertRow should have failed - same entry, old table version");
+
+
+ ok = await membershipTable.InsertRow(CreateMembershipEntryForTest(), newTableVersion);
+ Assert.False(ok, "InsertRow should have failed - new entry, old table version");
+
+ data = await membershipTable.ReadAll();
+
+ Assert.Equal(1, data.Version.Version);
+
+ var nextTableVersion = data.Version.Next();
+
+ ok = await membershipTable.InsertRow(newEntry, nextTableVersion);
+
+ //Assert.False(ok, "InsertRow should have failed - duplicate entry");
+ // Cassandra doesn't provide a way to prevent this insert
+ // adding discard assignment here to avoid "unused variable" warning
+ _ = ok;
+
+
+ data = await membershipTable.ReadAll();
+ Assert.Single(data.Members);
+
+ data = await membershipTable.ReadRow(newEntry.SiloAddress);
+
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Single(data.Members);
+ Assert.NotNull(data.Version.VersionEtag);
+
+ var membershipEntry = data.Members[0].Item1;
+ var eTag = data.Members[0].Item2;
+ _testOutputHelper.WriteLine("Membership.ReadRow returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry);
+
+ Assert.NotNull(eTag);
+ Assert.NotNull(membershipEntry);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_ReadAll_Insert_ReadAll()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var data = await membershipTable.ReadAll();
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Empty(data.Members);
+
+ var newTableVersion = data.Version.Next();
+
+ var newEntry = CreateMembershipEntryForTest();
+ var ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ data = await membershipTable.ReadAll();
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Single(data.Members);
+ Assert.NotNull(data.Version.VersionEtag);
+
+ Assert.NotEqual(newTableVersion.VersionEtag, data.Version.VersionEtag);
+ Assert.Equal(newTableVersion.Version, data.Version.Version);
+
+ var membershipEntry = data.Members[0].Item1;
+ var eTag = data.Members[0].Item2;
+ _testOutputHelper.WriteLine("Membership.ReadAll returned MembershipEntry ETag={0} Entry={1}", eTag, membershipEntry);
+
+ Assert.NotNull(eTag);
+ Assert.NotNull(membershipEntry);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_UpdateRow()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var tableData = await membershipTable.ReadAll();
+ Assert.NotNull(tableData.Version);
+
+ Assert.Equal(0, tableData.Version.Version);
+ Assert.Empty(tableData.Members);
+
+ for (var i = 1; i < 10; i++)
+ {
+ var siloEntry = CreateMembershipEntryForTest();
+
+ siloEntry.SuspectTimes =
+ [
+ new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(1)),
+ new Tuple(CreateSiloAddressForTest(), GetUtcNowWithSecondsResolution().AddSeconds(2))
+ ];
+
+ var tableVersion = tableData.Version.Next();
+
+ _testOutputHelper.WriteLine("Calling InsertRow with Entry = {0} TableVersion = {1}", siloEntry, tableVersion);
+ var ok = await membershipTable.InsertRow(siloEntry, tableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ tableData = await membershipTable.ReadAll();
+
+ var etagBefore = tableData.TryGet(siloEntry.SiloAddress)?.Item2;
+
+ Assert.NotNull(etagBefore);
+
+ _testOutputHelper.WriteLine(
+ "Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}",
+ siloEntry,
+ etagBefore,
+ tableVersion?.ToString() ?? "null");
+ ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);
+ Assert.False(ok, $"row update should have failed - Table Data = {tableData}");
+ tableData = await membershipTable.ReadAll();
+
+ tableVersion = tableData.Version.Next();
+
+ _testOutputHelper.WriteLine(
+ "Calling UpdateRow with Entry = {0} correct eTag = {1} correct version={2}",
+ siloEntry,
+ etagBefore,
+ tableVersion?.ToString() ?? "null");
+
+ ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);
+
+ Assert.True(ok, $"UpdateRow failed - Table Data = {tableData}");
+
+ _testOutputHelper.WriteLine(
+ "Calling UpdateRow with Entry = {0} old eTag = {1} old version={2}",
+ siloEntry,
+ etagBefore,
+ tableVersion?.ToString() ?? "null");
+ ok = await membershipTable.UpdateRow(siloEntry, etagBefore, tableVersion);
+ Assert.False(ok, $"row update should have failed - Table Data = {tableData}");
+
+ tableData = await membershipTable.ReadAll();
+
+ var tuple = tableData.TryGet(siloEntry.SiloAddress);
+
+ Assert.Equal(tuple.Item1.ToFullString(), siloEntry.ToFullString());
+
+ var etagAfter = tuple.Item2;
+
+ _testOutputHelper.WriteLine(
+ "Calling UpdateRow with Entry = {0} correct eTag = {1} old version={2}",
+ siloEntry,
+ etagAfter,
+ tableVersion?.ToString() ?? "null");
+
+ ok = await membershipTable.UpdateRow(siloEntry, etagAfter, tableVersion);
+
+ Assert.False(ok, $"row update should have failed - Table Data = {tableData}");
+
+ tableData = await membershipTable.ReadAll();
+
+ etagBefore = etagAfter;
+
+ etagAfter = tableData.TryGet(siloEntry.SiloAddress)?.Item2;
+
+ Assert.Equal(etagBefore, etagAfter);
+ Assert.NotNull(tableData.Version);
+ Assert.Equal(tableVersion!.Version, tableData.Version.Version);
+
+ Assert.Equal(i, tableData.Members.Count);
+ }
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_UpdateRowInParallel()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var tableData = await membershipTable.ReadAll();
+
+ var data = CreateMembershipEntryForTest();
+
+ var newTableVer = tableData.Version.Next();
+
+ var insertions = Task.WhenAll(Enumerable.Range(1, 20).Select(async _ => { try { return await membershipTable.InsertRow(data, newTableVer); } catch { return false; } }));
+
+ Assert.True((await insertions).Single(x => x), "InsertRow failed");
+
+ await Task.WhenAll(Enumerable.Range(1, 19).Select(async _ =>
+ {
+ var done = false;
+ do
+ {
+ var updatedTableData = await membershipTable.ReadAll();
+ var updatedRow = updatedTableData.TryGet(data.SiloAddress);
+
+ await Task.Delay(10);
+ if (updatedRow is null) continue;
+
+ var tableVersion = updatedTableData.Version.Next();
+ try
+ {
+ done = await membershipTable.UpdateRow(updatedRow.Item1, updatedRow.Item2, tableVersion);
+ }
+ catch
+ {
+ done = false;
+ }
+ } while (!done);
+ })).WithTimeout(TimeSpan.FromSeconds(30));
+
+
+ tableData = await membershipTable.ReadAll();
+ Assert.NotNull(tableData.Version);
+
+ Assert.Equal(20, tableData.Version.Version);
+
+ Assert.Single(tableData.Members);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_UpdateIAmAlive()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var tableData = await membershipTable.ReadAll();
+
+ var newTableVersion = tableData.Version.Next();
+ var newEntry = CreateMembershipEntryForTest();
+ var ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.True(ok);
+
+ var amAliveTime = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
+
+ // This mimics the arguments MembershipOracle.OnIAmAliveUpdateInTableTimer passes in
+ var entry = new MembershipEntry
+ {
+ SiloAddress = newEntry.SiloAddress,
+ IAmAliveTime = amAliveTime
+ };
+
+ await membershipTable.UpdateIAmAlive(entry);
+
+ tableData = await membershipTable.ReadAll();
+ var member = tableData.Members.First(e => e.Item1.SiloAddress.Equals(newEntry.SiloAddress));
+
+ // compare that the value is close to what we passed in, but not exactly, as the underlying store can set its own precision settings
+ // (ie: in SQL Server this is defined as datetime2(3), so we don't expect precision to account for less than 0.001s values)
+ Assert.True((amAliveTime - member.Item1.IAmAliveTime).Duration() < TimeSpan.FromSeconds(2), "Expected time around " + amAliveTime + " but got " + member.Item1.IAmAliveTime + " that is off by " + (amAliveTime - member.Item1.IAmAliveTime).Duration().ToString());
+ Assert.Equal(newTableVersion.Version, tableData.Version.Version);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task MembershipTable_CleanupDefunctSiloEntries()
+ {
+ var (membershipTable, _) = await CreateNewMembershipTableAsync();
+
+ var data = await membershipTable.ReadAll();
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Empty(data.Members);
+
+ var newTableVersion = data.Version.Next();
+
+ var oldEntryDead = CreateMembershipEntryForTest();
+ oldEntryDead.IAmAliveTime = oldEntryDead.IAmAliveTime.AddDays(-10);
+ oldEntryDead.StartTime = oldEntryDead.StartTime.AddDays(-10);
+ oldEntryDead.Status = SiloStatus.Dead;
+ var ok = await membershipTable.InsertRow(oldEntryDead, newTableVersion);
+ var table = await membershipTable.ReadAll();
+
+ Assert.True(ok, "InsertRow Dead failed");
+
+ newTableVersion = table.Version.Next();
+ var oldEntryJoining = CreateMembershipEntryForTest();
+ oldEntryJoining.IAmAliveTime = oldEntryJoining.IAmAliveTime.AddDays(-10);
+ oldEntryJoining.StartTime = oldEntryJoining.StartTime.AddDays(-10);
+ oldEntryJoining.Status = SiloStatus.Joining;
+ ok = await membershipTable.InsertRow(oldEntryJoining, newTableVersion);
+ table = await membershipTable.ReadAll();
+
+ Assert.True(ok, "InsertRow Joining failed");
+
+ newTableVersion = table.Version.Next();
+ var newEntry = CreateMembershipEntryForTest();
+ ok = await membershipTable.InsertRow(newEntry, newTableVersion);
+ Assert.True(ok, "InsertRow failed");
+
+ data = await membershipTable.ReadAll();
+ newTableVersion = data.Version.Next();
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Equal(3, data.Members.Count);
+
+ // Every status other than Active should get cleared out if old
+ foreach (var siloStatus in Enum.GetValues())
+ {
+ var oldEntry = CreateMembershipEntryForTest();
+ oldEntry.IAmAliveTime = oldEntry.IAmAliveTime.AddDays(-10);
+ oldEntry.StartTime = oldEntry.StartTime.AddDays(-10);
+ oldEntry.Status = siloStatus;
+ ok = await membershipTable.InsertRow(oldEntry, newTableVersion);
+ table = await membershipTable.ReadAll();
+
+ Assert.True(ok, "InsertRow failed");
+
+ newTableVersion = table.Version.Next();
+ }
+
+ await membershipTable.CleanupDefunctSiloEntries(oldEntryDead.IAmAliveTime.AddDays(3));
+
+ data = await membershipTable.ReadAll();
+ _testOutputHelper.WriteLine("Membership.ReadAll returned TableVersion={0} Data={1}", data.Version, data);
+
+ Assert.Equal(2, data.Members.Count);
+ }
+
+ // Utility methods
+ private static MembershipEntry CreateMembershipEntryForTest()
+ {
+ var siloAddress = CreateSiloAddressForTest();
+
+ var membershipEntry = new MembershipEntry
+ {
+ SiloAddress = siloAddress,
+ HostName = HostName,
+ SiloName = "TestSiloName",
+ Status = SiloStatus.Joining,
+ ProxyPort = siloAddress.Endpoint.Port,
+ StartTime = GetUtcNowWithSecondsResolution(),
+ IAmAliveTime = GetUtcNowWithSecondsResolution()
+ };
+
+ return membershipEntry;
+ }
+
+ private static DateTime GetUtcNowWithSecondsResolution()
+ {
+ var now = DateTime.UtcNow;
+ return new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute, now.Second, DateTimeKind.Utc);
+ }
+
+ private static SiloAddress CreateSiloAddressForTest()
+ {
+ var siloAddress = SiloAddressUtils.NewLocalSiloAddress(Interlocked.Increment(ref _generation));
+ siloAddress.Endpoint.Port = 12345;
+ return siloAddress;
+ }
+
+ private async Task<(IMembershipTable, IGatewayListProvider)> CreateNewMembershipTableAsync(string serviceId, string clusterId)
+ {
+ var session = await CreateSession();
+
+ var cassandraClusteringOptions = new ClusterOptions { ServiceId = serviceId, ClusterId = clusterId };
+ var options = Options.Create(cassandraClusteringOptions);
+ IMembershipTable membershipTable = new CassandraClusteringTable(options, session);
+ await membershipTable.InitializeMembershipTable(true);
+
+ IGatewayListProvider gatewayProvider = new CassandraGatewayListProvider(options,
+ Options.Create(new GatewayOptions { GatewayListRefreshPeriod = TimeSpan.FromSeconds(15) }), session);
+ await gatewayProvider.InitializeGatewayListProvider();
+
+ return (membershipTable, gatewayProvider);
+ }
+
+ private async Task CreateSession()
+ {
+ var container = await _cassandraContainer.RunImage();
+
+ return container.session;
+ }
+
+ private Task<(IMembershipTable, IGatewayListProvider)> CreateNewMembershipTableAsync()
+ {
+ var serviceId = $"Service_{Guid.NewGuid()}";
+ var clusterId = $"Cluster_{Guid.NewGuid()}";
+
+ return CreateNewMembershipTableAsync(serviceId, clusterId);
+ }
+
+ [Fact]
+ [Trait("Category", nameof(Cassandra))]
+ public async Task A_Test()
+ {
+ var serviceId = $"Service_{Guid.NewGuid()}";
+ var clusterId = $"Cluster_{Guid.NewGuid()}";
+ var clusterOptions = new ClusterOptions { ServiceId = serviceId, ClusterId = clusterId + "_1" };
+ var clusterIdentifier = clusterOptions.ServiceId + "-" + clusterOptions.ClusterId;
+ var (membershipTable, gatewayProvider) = await CreateNewMembershipTableAsync(serviceId, clusterId + "_1");
+
+ var (otherMembershipTable, _) = await CreateNewMembershipTableAsync(serviceId, clusterId + "_2");
+
+ var tableData = await membershipTable.ReadAll();
+
+ await membershipTable.InsertRow(
+ new MembershipEntry
+ {
+ HostName = "host1",
+ IAmAliveTime = DateTime.UtcNow,
+ ProxyPort = 2345,
+ SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1),
+ SiloName = "silo1",
+ Status = SiloStatus.Created,
+ StartTime = DateTime.UtcNow
+ }, tableData.Version.Next());
+
+ tableData = await membershipTable.ReadAll();
+
+ await membershipTable.InsertRow(
+ new MembershipEntry
+ {
+ HostName = "host1",
+ IAmAliveTime = DateTime.UtcNow,
+ ProxyPort = 2345,
+ SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1),
+ SiloName = "silo1",
+ Status = SiloStatus.Joining,
+ StartTime = DateTime.UtcNow
+ }, tableData.Version.Next());
+
+ tableData = await otherMembershipTable.ReadAll();
+ await otherMembershipTable.InsertRow(
+ new MembershipEntry
+ {
+ HostName = "host1",
+ IAmAliveTime = DateTime.UtcNow,
+ ProxyPort = 2345,
+ SiloAddress = SiloAddress.New(IPAddress.Loopback, 2345, 1),
+ SiloName = "silo1",
+ Status = SiloStatus.Joining,
+ StartTime = DateTime.UtcNow
+ }, tableData.Version.Next());
+
+ tableData = await membershipTable.ReadAll();
+
+ var membershipEntry = new MembershipEntry
+ {
+ HostName = "host1",
+ IAmAliveTime = DateTime.UtcNow,
+ ProxyPort = 2345,
+ SiloAddress = SiloAddress.New(IPAddress.Loopback, 2346, 1),
+ SiloName = "silo1",
+ Status = SiloStatus.Active,
+ StartTime = DateTime.UtcNow
+ };
+ await membershipTable.InsertRow(membershipEntry, tableData.Version.Next());
+
+ var readAll = await membershipTable.ReadAll();
+
+ _testOutputHelper.WriteLine(readAll.Version.Version.ToString());
+ foreach (var row in readAll.Members)
+ {
+ var entry = row.Item1;
+ _testOutputHelper.WriteLine(clusterIdentifier);
+ _testOutputHelper.WriteLine(" " + entry.HostName);
+ _testOutputHelper.WriteLine(" " + entry.SiloName);
+ _testOutputHelper.WriteLine(" " + entry.StartTime);
+ _testOutputHelper.WriteLine(" " + entry.IAmAliveTime);
+ _testOutputHelper.WriteLine(" " + entry.SiloAddress);
+ _testOutputHelper.WriteLine(" " + entry.ProxyPort);
+ _testOutputHelper.WriteLine(" " + entry.Status);
+ }
+
+ membershipEntry.IAmAliveTime = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ await membershipTable.UpdateIAmAlive(membershipEntry);
+
+ readAll = await membershipTable.ReadAll();
+
+ _testOutputHelper.WriteLine(readAll.Version.Version.ToString());
+ foreach (var row in readAll.Members)
+ {
+ var entry = row.Item1;
+ _testOutputHelper.WriteLine(clusterIdentifier);
+ _testOutputHelper.WriteLine(" " + entry.HostName);
+ _testOutputHelper.WriteLine(" " + entry.SiloName);
+ _testOutputHelper.WriteLine(" " + entry.StartTime);
+ _testOutputHelper.WriteLine(" " + entry.IAmAliveTime);
+ _testOutputHelper.WriteLine(" " + entry.SiloAddress);
+ _testOutputHelper.WriteLine(" " + entry.ProxyPort);
+ _testOutputHelper.WriteLine(" " + entry.Status);
+ }
+
+ await gatewayProvider.InitializeGatewayListProvider();
+
+ _ = await gatewayProvider.GetGateways();
+ var gateways = await gatewayProvider.GetGateways();
+
+ foreach (var gateway in gateways)
+ {
+ _testOutputHelper.WriteLine(gateway.ToString());
+ }
+
+ var queriedEntry = await membershipTable.ReadRow(membershipEntry.SiloAddress);
+ foreach (var queriedEntryMember in queriedEntry.Members)
+ {
+ _testOutputHelper.WriteLine(queriedEntryMember.Item1.SiloAddress.ToParsableString());
+ }
+
+ await membershipTable.DeleteMembershipTableEntries(clusterOptions.ClusterId);
+
+ readAll = await membershipTable.ReadAll();
+
+ _testOutputHelper.WriteLine(readAll.Version.Version.ToString());
+ foreach (var row in readAll.Members)
+ {
+ var entry = row.Item1;
+ _testOutputHelper.WriteLine(clusterIdentifier);
+ _testOutputHelper.WriteLine(" " + entry.HostName);
+ _testOutputHelper.WriteLine(" " + entry.SiloName);
+ _testOutputHelper.WriteLine(" " + entry.StartTime);
+ _testOutputHelper.WriteLine(" " + entry.IAmAliveTime);
+ _testOutputHelper.WriteLine(" " + entry.SiloAddress);
+ _testOutputHelper.WriteLine(" " + entry.ProxyPort);
+ _testOutputHelper.WriteLine(" " + entry.Status);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs b/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs
new file mode 100644
index 0000000000..2d3821d528
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Clustering/CassandraContainer.cs
@@ -0,0 +1,52 @@
+using System.Net;
+using Cassandra;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
+
+namespace Tester.Cassandra.Clustering;
+
+public class CassandraContainer
+{
+ public Task<(IContainer container, ushort exposedPort, Cluster cluster, ISession session)> RunImage() => _innerRunImage.Value;
+
+ private readonly Lazy> _innerRunImage =
+ new(async () =>
+ {
+ var cassandraImage = new ImageFromDockerfileBuilder()
+ .WithDockerfileDirectory(CommonDirectoryPath.GetProjectDirectory(), string.Empty)
+ .WithDockerfile("Cassandra.dockerfile")
+ .WithBuildArgument("CASSANDRAVERSION", Environment.GetEnvironmentVariable("CASSANDRAVERSION"))
+ .Build();
+
+ var imageTask = cassandraImage.CreateAsync();
+
+ await imageTask;
+
+ var containerPort = 9042;
+
+ var builder = new ContainerBuilder()
+ .WithImage(cassandraImage)
+ .WithPortBinding(containerPort, true)
+ .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(containerPort));
+
+ var container = builder.Build();
+
+ await container.StartAsync();
+
+ var exposedPort = container.GetMappedPublicPort(containerPort);
+
+ var cluster = Cluster.Builder()
+ .WithDefaultKeyspace("orleans")
+ .AddContactPoints(new IPEndPoint(IPAddress.Loopback, exposedPort))
+ .Build();
+
+ // Connect to the nodes using a keyspace
+ var session =
+ cluster.ConnectAndCreateDefaultKeyspaceIfNotExists(ReplicationStrategies
+ .CreateSimpleStrategyReplicationProperty(1));
+
+ return (container, exposedPort, cluster, session);
+ });
+
+ public string Name { get; set; } = string.Empty;
+}
\ No newline at end of file
diff --git a/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs b/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs
new file mode 100644
index 0000000000..96376e1fe7
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Clustering/SiloAddressUtils.cs
@@ -0,0 +1,14 @@
+using System.Net;
+using Orleans.Runtime;
+
+namespace Tester.Cassandra.Clustering;
+
+public static class SiloAddressUtils
+{
+ private static readonly IPEndPoint s_localEndpoint = new(IPAddress.Loopback, 0);
+
+ public static SiloAddress NewLocalSiloAddress(int gen)
+ {
+ return SiloAddress.New(s_localEndpoint, gen);
+ }
+}
\ No newline at end of file
diff --git a/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj b/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj
new file mode 100644
index 0000000000..2278d6da1c
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Tester.Cassandra.csproj
@@ -0,0 +1,19 @@
+
+
+
+ $(TestTargetFrameworks)
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs b/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs
new file mode 100644
index 0000000000..5a6a2ccd1d
--- /dev/null
+++ b/test/Extensions/Tester.Cassandra/Utility/TestExtensions.cs
@@ -0,0 +1,47 @@
+namespace Tester.Cassandra.Utility
+{
+ public static class TestExtensions
+ {
+ public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeout)
+ {
+ if (taskToComplete.IsCompleted)
+ {
+ await taskToComplete;
+ return;
+ }
+
+ var timeoutCancellationTokenSource = new CancellationTokenSource();
+ var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
+
+ if (taskToComplete == completedTask)
+ {
+ timeoutCancellationTokenSource.Cancel();
+ await taskToComplete;
+ return;
+ }
+
+ taskToComplete.Ignore();
+ throw new TimeoutException(string.Format("WithTimeout has timed out after {0}.", timeout));
+ }
+
+ public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeout)
+ {
+ if (taskToComplete.IsCompleted)
+ {
+ return await taskToComplete;
+ }
+
+ var timeoutCancellationTokenSource = new CancellationTokenSource();
+ var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
+
+ if (taskToComplete == completedTask)
+ {
+ timeoutCancellationTokenSource.Cancel();
+ return await taskToComplete;
+ }
+
+ taskToComplete.Ignore();
+ throw new TimeoutException(string.Format("WithTimeout has timed out after {0}.", timeout));
+ }
+ }
+}
\ No newline at end of file