Skip to content

Commit

Permalink
Cassandra Clustering implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rkargMsft committed Mar 25, 2024
1 parent ff6de7e commit 884e6b4
Show file tree
Hide file tree
Showing 15 changed files with 1,535 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ jobs:
path: |
**/TestResults/*
**/logs/*
test-cassandra:
name: Cassandra provider tests
runs-on: ubuntu-latest
strategy:
matrix:
provider: ["Cassandra"]
dbversion: ["3.11", "4.0", "4.1", "5.0"]
steps:
- uses: actions/checkout@v2
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: |
8.0.x
- name: Test
run: dotnet test --filter "Category=${{ matrix.provider }}&(Category=BVT|Category=SlowBVT|Category=Functional)" --blame-hang-timeout 10m --logger "trx" -- -parallel none -noshadow
env:
CASSANDRAVERSION: ${{ matrix.dbversion }}
- name: Archive Test Results
if: always()
uses: actions/upload-artifact@v3
with:
name: test_output
retention-days: 1
path: |
**/TestResults/*
**/logs/*
test-postgres:
name: PostgreSQL provider tests
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<PackageVersion Include="ZooKeeperNetEx" Version="3.4.12.4" />
<PackageVersion Include="StackExchange.Redis" Version="2.6.122" />
<PackageVersion Include="KubernetesClient" Version="12.1.1" />
<PackageVersion Include="CassandraCSharpDriver" Version="3.20.1" />
<!-- Test related packages -->
<PackageVersion Include="FluentAssertions" Version="6.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0-preview-23503-02" />
Expand Down Expand Up @@ -95,6 +96,7 @@
<PackageVersion Include="SpanJson" Version="4.0.1" />
<PackageVersion Include="Hyperion" Version="0.12.2" />
<PackageVersion Include="Grpc.Tools" Version="2.58.0" />
<PackageVersion Include="Testcontainers" Version="3.7.0" />
<!-- Tooling related packages -->
<PackageVersion Include="Microsoft.SourceLink.AzureRepos.Git" Version="8.0.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
Expand Down
17 changes: 17 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extens
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Cassandra", "Cassandra", "{42C80201-3AC6-4C10-9809-3315A9FDD7A1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Clustering.Cassandra", "src\Cassandra\Orleans.Clustering.Cassandra\Orleans.Clustering.Cassandra.csproj", "{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Cassandra", "test\Extensions\Tester.Cassandra\Tester.Cassandra.csproj", "{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -581,6 +587,14 @@ Global
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171}.Release|Any CPU.Build.0 = Release|Any CPU
{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -688,6 +702,9 @@ Global
{8FC6457C-6273-4338-AD2A-ECAA8FE2C5D7} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{42C80201-3AC6-4C10-9809-3315A9FDD7A1} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
{7A1A2ECE-DC4B-4DE7-AEF7-855C50895171} = {42C80201-3AC6-4C10-9809-3315A9FDD7A1}
{15A1777E-D1B6-4DC8-81D4-998A4CBA63FE} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
153 changes: 153 additions & 0 deletions src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Cassandra;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Runtime;

namespace Orleans.Clustering.Cassandra;

public class CassandraClusteringTable : IMembershipTable
{
private readonly ClusterOptions _options;
private readonly ISession _session;
private OrleansQueries? _queries;
private readonly string _identifier;


public CassandraClusteringTable(IOptions<ClusterOptions> options, ISession session)
{
_options = options.Value;
_identifier = $"{_options.ServiceId}-{_options.ClusterId}";
_session = session;
}

async Task IMembershipTable.InitializeMembershipTable(bool tryInitTableVersion)
{
_queries = await OrleansQueries.CreateInstance(_session);

await _session.ExecuteAsync(_queries.EnsureTableExists());
await _session.ExecuteAsync(_queries.EnsureIndexExists());

if (!tryInitTableVersion)
return;

await _session.ExecuteAsync(await _queries.InsertMembershipVersion(_identifier));
}

async Task IMembershipTable.DeleteMembershipTableEntries(string clusterId)
{
if (string.Compare(clusterId, _options.ClusterId,
StringComparison.InvariantCultureIgnoreCase) != 0)
{
throw new ArgumentException(
$"cluster id {clusterId} does not match CassandraClusteringTable value of {_options.ClusterId}",
nameof(clusterId));
}
await _session.ExecuteAsync(await _queries!.DeleteMembershipTableEntries(_identifier));
}

async Task<bool> IMembershipTable.InsertRow(MembershipEntry entry, TableVersion tableVersion)
{
var query = await _session.ExecuteAsync(await _queries!.InsertMembership(_identifier, entry, tableVersion.Version - 1));
return (bool)query.First()["[applied]"];
}

async Task<bool> IMembershipTable.UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
{
var query = await _session.ExecuteAsync(await _queries!.UpdateMembership(_identifier, entry, tableVersion.Version - 1));
return (bool)query.First()["[applied]"];
}

private static MembershipEntry? GetMembershipEntry(Row row, SiloAddress? forAddress = null)
{
if (row["start_time"] == null)
return null;

var result = new MembershipEntry
{
SiloAddress = forAddress ?? SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["port"]), (int)row["generation"]),
SiloName = (string)row["silo_name"],
HostName = (string)row["host_name"],
Status = (SiloStatus)(int)row["status"],
ProxyPort = (int)row["proxy_port"],
StartTime = ((DateTimeOffset)row["start_time"]).UtcDateTime,
IAmAliveTime = ((DateTimeOffset)row["i_am_alive_time"]).UtcDateTime
};

var suspectingSilos = (string)row["suspect_times"];
if (!string.IsNullOrWhiteSpace(suspectingSilos))
{
result.SuspectTimes = new List<Tuple<SiloAddress, DateTime>>();
result.SuspectTimes.AddRange(suspectingSilos.Split('|').Select(s =>
{
var split = s.Split(',');
return new Tuple<SiloAddress, DateTime>(SiloAddress.FromParsableString(split[0]), LogFormatter.ParseDate(split[1]));
}));
}

return result;
}

private async Task<MembershipTableData> GetMembershipTableData(RowSet rows, SiloAddress? forAddress = null)
{
int version;

var firstRow = rows.FirstOrDefault();
if (firstRow != null)
{
version = (int)firstRow["version"];

var entries = new List<Tuple<MembershipEntry, string>>();
foreach (var row in new[] { firstRow }.Concat(rows))
{
var entry = GetMembershipEntry(row, forAddress);
if (entry != null)
entries.Add(new Tuple<MembershipEntry, string>(entry, string.Empty));
}

return new MembershipTableData(entries, new TableVersion(version, version.ToString()));
}
else
{
var result = (await _session.ExecuteAsync(await _queries!.MembershipReadVersion(_identifier))).FirstOrDefault();
if (result is null)
{
return new MembershipTableData(new List<Tuple<MembershipEntry, string>>(), new TableVersion(0, "0"));
}
version = (int)result["version"];
return new MembershipTableData(new List<Tuple<MembershipEntry, string>>(), new TableVersion(version, version.ToString()));
}
}

async Task<MembershipTableData> IMembershipTable.ReadAll()
{
return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier)));
}

async Task<MembershipTableData> IMembershipTable.ReadRow(SiloAddress key)
{
return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadRow(_identifier, key)), key);
}

async Task IMembershipTable.UpdateIAmAlive(MembershipEntry entry)
{
await _session.ExecuteAsync(await _queries!.UpdateIAmAliveTime(_identifier, entry));
}

public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
var allEntries =
(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier)))
.Select(r => GetMembershipEntry(r))
.Where(e => e is not null)
.Cast<MembershipEntry>();

foreach (var e in allEntries)
if (e is not { Status: SiloStatus.Active } && new DateTime(Math.Max(e.IAmAliveTime.Ticks, e.StartTime.Ticks)) < beforeDate)
await _session.ExecuteAsync(await _queries.DeleteMembershipEntry(_identifier, e));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Cassandra;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Messaging;
using Orleans.Runtime;

namespace Orleans.Clustering.Cassandra;

public class CassandraGatewayListProvider : IGatewayListProvider
{
private readonly ClusterOptions _options;
private readonly TimeSpan _maxStaleness;
private readonly ISession _session;
private OrleansQueries? _queries;
private DateTime _cacheUntil;
private List<Uri>? _cachedResult;
private readonly string _identifier;


TimeSpan IGatewayListProvider.MaxStaleness => _maxStaleness;

bool IGatewayListProvider.IsUpdatable => true;


public CassandraGatewayListProvider(IOptions<ClusterOptions> options, IOptions<GatewayOptions> gatewayOptions, ISession session)
{
_options = options.Value;
_identifier = $"{_options.ServiceId}-{_options.ClusterId}";

_maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod;
_session = session;
}

async Task IGatewayListProvider.InitializeGatewayListProvider()
{
_queries = await OrleansQueries.CreateInstance(_session);

await _session.ExecuteAsync(_queries.EnsureTableExists());
await _session.ExecuteAsync(_queries.EnsureIndexExists());
}

async Task<IList<Uri>> IGatewayListProvider.GetGateways()
{
if (_cachedResult is not null && _cacheUntil > DateTime.UtcNow)
{
return _cachedResult.ToList();
}

var rows = await _session.ExecuteAsync(await _queries!.GatewaysQuery(_identifier, (int)SiloStatus.Active));
var result = new List<Uri>();

foreach (var row in rows)
result.Add(SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["proxy_port"]), (int)row["generation"]).ToGatewayUri());

_cacheUntil = DateTime.UtcNow + _maxStaleness;
_cachedResult = result;
return result.ToList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Orleans.Clustering.Cassandra.Hosting;

public class CassandraClusteringOptions
{
public required string ConnectionString { get; set; }
public string Keyspace { get; set; } = "orleans";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using Cassandra;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Hosting;

namespace Orleans.Clustering.Cassandra.Hosting;

public static class CassandraMembershipHostingExtensions
{
/// <summary>
/// Configures Orleans clustering using Cassandra
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
/// <remarks>Pulls <see cref="IOptions{TOptions}"/> of type <see cref="ClusterOptions"/> and <see cref="ISession"/> from the DI container</remarks>
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder) =>
builder.ConfigureServices(services =>
{
services.AddOptions<ClusterOptions>().ValidateOnStart();
services.AddSingleton<IMembershipTable, CassandraClusteringTable>();
});

/// <summary>
/// Configures Orleans clustering using Cassandra
/// </summary>
/// <param name="builder"></param>
/// <param name="clusterOptions">Configuration for the <see cref="CassandraClusteringTable"/></param>
/// <param name="sessionProvider">Resolving method for <see cref="ISession"/></param>
/// <returns>A newly created <see cref="CassandraClusteringTable"/> created with the provided <see cref="ClusterOptions"/> and the resolved <see cref="ISession"/></returns>
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, Func<IServiceProvider, ISession> sessionProvider) =>
builder.ConfigureServices(services =>
{
services.AddSingleton<IMembershipTable>(provider =>
{
var session = sessionProvider(provider);
return new CassandraClusteringTable(Options.Create(clusterOptions), session);
});
});


/// <summary>
/// Configures Orleans clustering using Cassandra
/// </summary>
/// <param name="builder"></param>
/// <param name="clusterOptions">Configuration for the <see cref="CassandraClusteringTable"/></param>
/// <param name="cassandraOptions">Configuration used to create a new <see cref="ISession"/></param>
/// <returns></returns>
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, CassandraClusteringOptions cassandraOptions) =>
builder.ConfigureServices(services =>
{
services.AddSingleton<IMembershipTable>(_ =>
{
var c = Cluster.Builder().WithConnectionString(cassandraOptions.ConnectionString)
.Build();
var session = c.Connect(cassandraOptions.Keyspace);
return new CassandraClusteringTable(Options.Create(clusterOptions), session);
});
});
}

0 comments on commit 884e6b4

Please sign in to comment.