-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
15 changed files
with
1,535 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
153 changes: 153 additions & 0 deletions
153
src/Cassandra/Orleans.Clustering.Cassandra/CassandraClusteringTable.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Cassandra; | ||
using Microsoft.Extensions.Options; | ||
using Orleans.Configuration; | ||
using Orleans.Runtime; | ||
|
||
namespace Orleans.Clustering.Cassandra; | ||
|
||
public class CassandraClusteringTable : IMembershipTable | ||
{ | ||
private readonly ClusterOptions _options; | ||
private readonly ISession _session; | ||
private OrleansQueries? _queries; | ||
private readonly string _identifier; | ||
|
||
|
||
public CassandraClusteringTable(IOptions<ClusterOptions> options, ISession session) | ||
{ | ||
_options = options.Value; | ||
_identifier = $"{_options.ServiceId}-{_options.ClusterId}"; | ||
_session = session; | ||
} | ||
|
||
async Task IMembershipTable.InitializeMembershipTable(bool tryInitTableVersion) | ||
{ | ||
_queries = await OrleansQueries.CreateInstance(_session); | ||
|
||
await _session.ExecuteAsync(_queries.EnsureTableExists()); | ||
await _session.ExecuteAsync(_queries.EnsureIndexExists()); | ||
|
||
if (!tryInitTableVersion) | ||
return; | ||
|
||
await _session.ExecuteAsync(await _queries.InsertMembershipVersion(_identifier)); | ||
} | ||
|
||
async Task IMembershipTable.DeleteMembershipTableEntries(string clusterId) | ||
{ | ||
if (string.Compare(clusterId, _options.ClusterId, | ||
StringComparison.InvariantCultureIgnoreCase) != 0) | ||
{ | ||
throw new ArgumentException( | ||
$"cluster id {clusterId} does not match CassandraClusteringTable value of {_options.ClusterId}", | ||
nameof(clusterId)); | ||
} | ||
await _session.ExecuteAsync(await _queries!.DeleteMembershipTableEntries(_identifier)); | ||
} | ||
|
||
async Task<bool> IMembershipTable.InsertRow(MembershipEntry entry, TableVersion tableVersion) | ||
{ | ||
var query = await _session.ExecuteAsync(await _queries!.InsertMembership(_identifier, entry, tableVersion.Version - 1)); | ||
return (bool)query.First()["[applied]"]; | ||
} | ||
|
||
async Task<bool> IMembershipTable.UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) | ||
{ | ||
var query = await _session.ExecuteAsync(await _queries!.UpdateMembership(_identifier, entry, tableVersion.Version - 1)); | ||
return (bool)query.First()["[applied]"]; | ||
} | ||
|
||
private static MembershipEntry? GetMembershipEntry(Row row, SiloAddress? forAddress = null) | ||
{ | ||
if (row["start_time"] == null) | ||
return null; | ||
|
||
var result = new MembershipEntry | ||
{ | ||
SiloAddress = forAddress ?? SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["port"]), (int)row["generation"]), | ||
SiloName = (string)row["silo_name"], | ||
HostName = (string)row["host_name"], | ||
Status = (SiloStatus)(int)row["status"], | ||
ProxyPort = (int)row["proxy_port"], | ||
StartTime = ((DateTimeOffset)row["start_time"]).UtcDateTime, | ||
IAmAliveTime = ((DateTimeOffset)row["i_am_alive_time"]).UtcDateTime | ||
}; | ||
|
||
var suspectingSilos = (string)row["suspect_times"]; | ||
if (!string.IsNullOrWhiteSpace(suspectingSilos)) | ||
{ | ||
result.SuspectTimes = new List<Tuple<SiloAddress, DateTime>>(); | ||
result.SuspectTimes.AddRange(suspectingSilos.Split('|').Select(s => | ||
{ | ||
var split = s.Split(','); | ||
return new Tuple<SiloAddress, DateTime>(SiloAddress.FromParsableString(split[0]), LogFormatter.ParseDate(split[1])); | ||
})); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
private async Task<MembershipTableData> GetMembershipTableData(RowSet rows, SiloAddress? forAddress = null) | ||
{ | ||
int version; | ||
|
||
var firstRow = rows.FirstOrDefault(); | ||
if (firstRow != null) | ||
{ | ||
version = (int)firstRow["version"]; | ||
|
||
var entries = new List<Tuple<MembershipEntry, string>>(); | ||
foreach (var row in new[] { firstRow }.Concat(rows)) | ||
{ | ||
var entry = GetMembershipEntry(row, forAddress); | ||
if (entry != null) | ||
entries.Add(new Tuple<MembershipEntry, string>(entry, string.Empty)); | ||
} | ||
|
||
return new MembershipTableData(entries, new TableVersion(version, version.ToString())); | ||
} | ||
else | ||
{ | ||
var result = (await _session.ExecuteAsync(await _queries!.MembershipReadVersion(_identifier))).FirstOrDefault(); | ||
if (result is null) | ||
{ | ||
return new MembershipTableData(new List<Tuple<MembershipEntry, string>>(), new TableVersion(0, "0")); | ||
} | ||
version = (int)result["version"]; | ||
return new MembershipTableData(new List<Tuple<MembershipEntry, string>>(), new TableVersion(version, version.ToString())); | ||
} | ||
} | ||
|
||
async Task<MembershipTableData> IMembershipTable.ReadAll() | ||
{ | ||
return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier))); | ||
} | ||
|
||
async Task<MembershipTableData> IMembershipTable.ReadRow(SiloAddress key) | ||
{ | ||
return await GetMembershipTableData(await _session.ExecuteAsync(await _queries!.MembershipReadRow(_identifier, key)), key); | ||
} | ||
|
||
async Task IMembershipTable.UpdateIAmAlive(MembershipEntry entry) | ||
{ | ||
await _session.ExecuteAsync(await _queries!.UpdateIAmAliveTime(_identifier, entry)); | ||
} | ||
|
||
public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate) | ||
{ | ||
var allEntries = | ||
(await _session.ExecuteAsync(await _queries!.MembershipReadAll(_identifier))) | ||
.Select(r => GetMembershipEntry(r)) | ||
.Where(e => e is not null) | ||
.Cast<MembershipEntry>(); | ||
|
||
foreach (var e in allEntries) | ||
if (e is not { Status: SiloStatus.Active } && new DateTime(Math.Max(e.IAmAliveTime.Ticks, e.StartTime.Ticks)) < beforeDate) | ||
await _session.ExecuteAsync(await _queries.DeleteMembershipEntry(_identifier, e)); | ||
} | ||
} |
64 changes: 64 additions & 0 deletions
64
src/Cassandra/Orleans.Clustering.Cassandra/CassandraGatewayListProvider.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Cassandra; | ||
using Microsoft.Extensions.Options; | ||
using Orleans.Configuration; | ||
using Orleans.Messaging; | ||
using Orleans.Runtime; | ||
|
||
namespace Orleans.Clustering.Cassandra; | ||
|
||
public class CassandraGatewayListProvider : IGatewayListProvider | ||
{ | ||
private readonly ClusterOptions _options; | ||
private readonly TimeSpan _maxStaleness; | ||
private readonly ISession _session; | ||
private OrleansQueries? _queries; | ||
private DateTime _cacheUntil; | ||
private List<Uri>? _cachedResult; | ||
private readonly string _identifier; | ||
|
||
|
||
TimeSpan IGatewayListProvider.MaxStaleness => _maxStaleness; | ||
|
||
bool IGatewayListProvider.IsUpdatable => true; | ||
|
||
|
||
public CassandraGatewayListProvider(IOptions<ClusterOptions> options, IOptions<GatewayOptions> gatewayOptions, ISession session) | ||
{ | ||
_options = options.Value; | ||
_identifier = $"{_options.ServiceId}-{_options.ClusterId}"; | ||
|
||
_maxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod; | ||
_session = session; | ||
} | ||
|
||
async Task IGatewayListProvider.InitializeGatewayListProvider() | ||
{ | ||
_queries = await OrleansQueries.CreateInstance(_session); | ||
|
||
await _session.ExecuteAsync(_queries.EnsureTableExists()); | ||
await _session.ExecuteAsync(_queries.EnsureIndexExists()); | ||
} | ||
|
||
async Task<IList<Uri>> IGatewayListProvider.GetGateways() | ||
{ | ||
if (_cachedResult is not null && _cacheUntil > DateTime.UtcNow) | ||
{ | ||
return _cachedResult.ToList(); | ||
} | ||
|
||
var rows = await _session.ExecuteAsync(await _queries!.GatewaysQuery(_identifier, (int)SiloStatus.Active)); | ||
var result = new List<Uri>(); | ||
|
||
foreach (var row in rows) | ||
result.Add(SiloAddress.New(new IPEndPoint(IPAddress.Parse((string)row["address"]), (int)row["proxy_port"]), (int)row["generation"]).ToGatewayUri()); | ||
|
||
_cacheUntil = DateTime.UtcNow + _maxStaleness; | ||
_cachedResult = result; | ||
return result.ToList(); | ||
} | ||
} |
7 changes: 7 additions & 0 deletions
7
src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraClusteringOptions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
namespace Orleans.Clustering.Cassandra.Hosting; | ||
|
||
public class CassandraClusteringOptions | ||
{ | ||
public required string ConnectionString { get; set; } | ||
public string Keyspace { get; set; } = "orleans"; | ||
} |
62 changes: 62 additions & 0 deletions
62
src/Cassandra/Orleans.Clustering.Cassandra/Hosting/CassandraMembershipHostingExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
using System; | ||
using Cassandra; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Options; | ||
using Orleans.Configuration; | ||
using Orleans.Hosting; | ||
|
||
namespace Orleans.Clustering.Cassandra.Hosting; | ||
|
||
public static class CassandraMembershipHostingExtensions | ||
{ | ||
/// <summary> | ||
/// Configures Orleans clustering using Cassandra | ||
/// </summary> | ||
/// <param name="builder"></param> | ||
/// <returns></returns> | ||
/// <remarks>Pulls <see cref="IOptions{TOptions}"/> of type <see cref="ClusterOptions"/> and <see cref="ISession"/> from the DI container</remarks> | ||
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder) => | ||
builder.ConfigureServices(services => | ||
{ | ||
services.AddOptions<ClusterOptions>().ValidateOnStart(); | ||
services.AddSingleton<IMembershipTable, CassandraClusteringTable>(); | ||
}); | ||
|
||
/// <summary> | ||
/// Configures Orleans clustering using Cassandra | ||
/// </summary> | ||
/// <param name="builder"></param> | ||
/// <param name="clusterOptions">Configuration for the <see cref="CassandraClusteringTable"/></param> | ||
/// <param name="sessionProvider">Resolving method for <see cref="ISession"/></param> | ||
/// <returns>A newly created <see cref="CassandraClusteringTable"/> created with the provided <see cref="ClusterOptions"/> and the resolved <see cref="ISession"/></returns> | ||
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, Func<IServiceProvider, ISession> sessionProvider) => | ||
builder.ConfigureServices(services => | ||
{ | ||
services.AddSingleton<IMembershipTable>(provider => | ||
{ | ||
var session = sessionProvider(provider); | ||
return new CassandraClusteringTable(Options.Create(clusterOptions), session); | ||
}); | ||
}); | ||
|
||
|
||
/// <summary> | ||
/// Configures Orleans clustering using Cassandra | ||
/// </summary> | ||
/// <param name="builder"></param> | ||
/// <param name="clusterOptions">Configuration for the <see cref="CassandraClusteringTable"/></param> | ||
/// <param name="cassandraOptions">Configuration used to create a new <see cref="ISession"/></param> | ||
/// <returns></returns> | ||
public static ISiloBuilder UseCassandraClustering(this ISiloBuilder builder, ClusterOptions clusterOptions, CassandraClusteringOptions cassandraOptions) => | ||
builder.ConfigureServices(services => | ||
{ | ||
services.AddSingleton<IMembershipTable>(_ => | ||
{ | ||
var c = Cluster.Builder().WithConnectionString(cassandraOptions.ConnectionString) | ||
.Build(); | ||
var session = c.Connect(cassandraOptions.Keyspace); | ||
return new CassandraClusteringTable(Options.Create(clusterOptions), session); | ||
}); | ||
}); | ||
} |
Oops, something went wrong.