Skip to content

Commit

Permalink
Implemented Proto cluster provider for Azure Container Apps (#1889)
Browse files Browse the repository at this point in the history
* Implemented Proto Cluster for Azure Container Apps

* Updated props and references

* Fixed issues found during testing

- Manage tags on a container app resource level as replicas cannot have tags
- Find correct ip address of the replica

Co-authored-by: Gürkan Güran <gurkanguran@gurkanguransMBP.domain_not_set.invalid>
  • Loading branch information
gurkanguran and Gürkan Güran committed Dec 22, 2022
1 parent abc58cf commit aaf264a
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 0 deletions.
118 changes: 118 additions & 0 deletions Proto.Cluster.AzureContainerApps/ArmClientUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using Azure.ResourceManager;
using Azure.ResourceManager.AppContainers;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.Resources.Models;
using Microsoft.Extensions.Logging;

namespace Proto.Cluster.AzureContainerApps;

public static class ArmClientUtils
{
private static readonly ILogger Logger = Log.CreateLogger(nameof(ArmClientUtils));

public static async Task<Member[]> GetClusterMembers(this ArmClient client, string resourceGroupName, string containerAppName)
{
var members = new List<Member>();

var containerApp = await (await client.GetResourceGroupByName(resourceGroupName)).Value.GetContainerAppAsync(containerAppName);

if (containerApp is null || !containerApp.HasValue)
{
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} is not found", containerApp, resourceGroupName);
return members.ToArray();
}

var containerAppRevisions = GetActiveRevisionsWithTraffic(containerApp).ToList();
if (!containerAppRevisions.Any())
{
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} does not contain any active revisions with traffic", containerAppName, resourceGroupName);
return members.ToArray();
}

var replicasWithTraffic = containerAppRevisions.SelectMany(r => r.GetContainerAppReplicas());

var allTags = (await containerApp.Value.GetTagResource().GetAsync()).Value.Data.TagValues;

foreach (var replica in replicasWithTraffic)
{
var replicaNameTag = allTags.FirstOrDefault(kvp => kvp.Value == replica.Data.Name);
if (replicaNameTag.Key == null)
{
Logger.LogWarning("Skipping Replica with name: {Name}, no Proto Tags found", replica.Data.Name);
continue;
}

var replicaNameTagPrefix = replicaNameTag.Key.Replace(ResourceTagLabels.LabelReplicaNameWithoutPrefix, string.Empty);
var currentReplicaTags = allTags.Where(kvp => kvp.Key.StartsWith(replicaNameTagPrefix)).ToDictionary(x => x.Key, x => x.Value);

var memberId = currentReplicaTags.FirstOrDefault(kvp => kvp.Key.ToString().Contains(ResourceTagLabels.LabelMemberIdWithoutPrefix)).Value;

var kinds = currentReplicaTags
.Where(kvp => kvp.Key.StartsWith(ResourceTagLabels.LabelKind(memberId)))
.Select(kvp => kvp.Key[(ResourceTagLabels.LabelKind(memberId).Length + 1)..])
.ToArray();

var member = new Member
{
Id = currentReplicaTags[ResourceTagLabels.LabelMemberId(memberId)],
Port = int.Parse(currentReplicaTags[ResourceTagLabels.LabelPort(memberId)]),
Host = currentReplicaTags[ResourceTagLabels.LabelHost(memberId)],
Kinds = { kinds }
};

members.Add(member);
}

return members.ToArray();
}

public static async Task AddMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, Dictionary<string, string> newTags)
{
var resourceTag = new Tag();
foreach (var tag in newTags)
{
resourceTag.TagValues.Add(tag);
}

var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
var tagResource = containerApp.Value.GetTagResource();

var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;
foreach (var tag in existingTags)
{
resourceTag.TagValues.Add(tag);
}
await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
}

public static async Task ClearMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, string memberId)
{
var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
var tagResource = containerApp.Value.GetTagResource();

var resourceTag = new Tag();
var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;

foreach (var tag in existingTags)
{
if (!tag.Key.StartsWith(ResourceTagLabels.LabelPrefix(memberId)))
{
resourceTag.TagValues.Add(tag);
}
}

await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
}

public static async Task<Response<ResourceGroupResource>> GetResourceGroupByName(this ArmClient client, string resourceGroupName) =>
await (await client.GetDefaultSubscriptionAsync()).GetResourceGroups().GetAsync(resourceGroupName);

private static IEnumerable<ContainerAppRevisionResource> GetActiveRevisionsWithTraffic(ContainerAppResource containerApp) =>
containerApp.GetContainerAppRevisions().Where(r => r.HasData && r.Data.Active.GetValueOrDefault(false) && r.Data.TrafficWeight > 0);
}
193 changes: 193 additions & 0 deletions Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.ResourceManager;
using Azure.ResourceManager.AppContainers;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Proto.Utils;

namespace Proto.Cluster.AzureContainerApps;

public class AzureContainerAppsProvider : IClusterProvider
{
public readonly string AdvertisedHost;

private readonly ArmClient _client;
private readonly string _resourceGroup;
private readonly string _containerAppName;
private readonly string _revisionName;
private readonly string _replicaName;

private string _memberId = null!;
private string _address = null!;
private Cluster _cluster = null!;
private string _clusterName = null!;
private string[] _kinds = null!;
private string _host = null!;
private int _port;

private readonly IConfiguration _configuration;
private static readonly ILogger Logger = Log.CreateLogger<AzureContainerAppsProvider>();
private static readonly TimeSpan PollIntervalInSeconds = TimeSpan.FromSeconds(5);

public AzureContainerAppsProvider(
IConfiguration configuration,
ArmClient client,
string resourceGroup,
string containerAppName,
string revisionName,
string replicaName,
string advertisedHost = default)
{
_configuration = configuration;
_client = client;
_resourceGroup = resourceGroup;
_containerAppName = containerAppName;
_revisionName = revisionName;
_replicaName = replicaName;
AdvertisedHost = advertisedHost;

if (string.IsNullOrEmpty(AdvertisedHost))
{
AdvertisedHost = ConfigUtils.FindIpAddress().ToString();
}
}

public async Task StartMemberAsync(Cluster cluster)
{
var clusterName = cluster.Config.ClusterName;
var (host, port) = cluster.System.GetAddress();
var kinds = cluster.GetClusterKinds();
_cluster = cluster;
_clusterName = clusterName;
_memberId = cluster.System.Id;
_port = port;
_host = host;
_kinds = kinds;
_address = $"{host}:{port}";

await RegisterMemberAsync();
StartClusterMonitor();
}

public Task StartClientAsync(Cluster cluster)
{
var clusterName = cluster.Config.ClusterName;
var (host, port) = cluster.System.GetAddress();
_cluster = cluster;
_clusterName = clusterName;
_memberId = cluster.System.Id;
_port = port;
_host = host;
_kinds = Array.Empty<string>();

StartClusterMonitor();
return Task.CompletedTask;
}

public async Task ShutdownAsync(bool graceful) => await DeregisterMemberAsync();

private async Task RegisterMemberAsync()
{
await Retry.Try(RegisterMemberInner, onError: OnError, onFailed: OnFailed, retryCount: Retry.Forever);

static void OnError(int attempt, Exception exception) =>
Logger.LogWarning(exception, "Failed to register service");

static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to register service");
}

private async Task RegisterMemberInner()
{
var resourceGroup = await _client.GetResourceGroupByName(_resourceGroup);
var containerApp = await resourceGroup.Value.GetContainerAppAsync(_containerAppName);
var revision = await containerApp.Value.GetContainerAppRevisionAsync(_revisionName);

if (revision.Value.Data.TrafficWeight.GetValueOrDefault(0) == 0)
{
return;
}

Logger.LogInformation(
"[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}",
_replicaName,
_address);

var tags = new Dictionary<string, string>
{
[ResourceTagLabels.LabelCluster(_memberId)] = _clusterName,
[ResourceTagLabels.LabelHost(_memberId)] = AdvertisedHost,
[ResourceTagLabels.LabelPort(_memberId)] = _port.ToString(),
[ResourceTagLabels.LabelMemberId(_memberId)] = _memberId,
[ResourceTagLabels.LabelReplicaName(_memberId)] = _replicaName
};

foreach (var kind in _kinds)
{
var labelKey = $"{ResourceTagLabels.LabelKind(_memberId)}-{kind}";
tags.TryAdd(labelKey, "true");
}

try
{
await _client.AddMemberTags(_resourceGroup, _containerAppName, tags);
}
catch (Exception x)
{
Logger.LogError(x, "Failed to update metadata");
}
}

private void StartClusterMonitor() =>
_ = SafeTask.Run(async () =>
{
while (!_cluster.System.Shutdown.IsCancellationRequested)
{
Logger.LogInformation("Calling ECS API");
try
{
var members = await _client.GetClusterMembers(_resourceGroup, _containerAppName);
if (members.Any())
{
Logger.LogInformation("Got members {Members}", members.Length);
_cluster.MemberList.UpdateClusterTopology(members);
}
else
{
Logger.LogWarning("Failed to get members from Azure Container Apps");
}
}
catch (Exception x)
{
Logger.LogError(x, "Failed to get members from Azure Container Apps");
}
await Task.Delay(PollIntervalInSeconds);
}
}
);

private async Task DeregisterMemberAsync()
{
await Retry.Try(DeregisterMemberInner, onError: OnError, onFailed: OnFailed);

static void OnError(int attempt, Exception exception) =>
Logger.LogWarning(exception, "Failed to deregister service");

static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to deregister service");
}

private async Task DeregisterMemberInner()
{
Logger.LogInformation(
"[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}",
_replicaName,
_address);

await _client.ClearMemberTags(_resourceGroup, _containerAppName, _memberId);
}
}
52 changes: 52 additions & 0 deletions Proto.Cluster.AzureContainerApps/ConfigUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;

namespace Proto.Cluster.AzureContainerApps;

public static class ConfigUtils
{
internal static IPAddress FindIpAddress(AddressFamily family = AddressFamily.InterNetwork)
{
var addressCandidates = NetworkInterface.GetAllNetworkInterfaces()
.Where(nif => nif.OperationalStatus == OperationalStatus.Up)
.SelectMany(nif => nif.GetIPProperties().UnicastAddresses.Select(a => a.Address))
.Where(addr => addr.AddressFamily == family && !IPAddress.IsLoopback(addr))
.ToList();

return PickSmallestIpAddress(addressCandidates);
}

private static IPAddress PickSmallestIpAddress(IEnumerable<IPAddress> candidates)
{
IPAddress result = null!;
foreach (var addr in candidates)
{
if (CompareIpAddresses(addr, result))
result = addr;
}
return result;

static bool CompareIpAddresses(IPAddress lhs, IPAddress rhs)
{
if (rhs == null)
return true;

var lbytes = lhs.GetAddressBytes();
var rbytes = rhs.GetAddressBytes();

if (lbytes.Length != rbytes.Length) return lbytes.Length < rbytes.Length;

for (var i = 0; i < lbytes.Length; i++)
{
if (lbytes[i] != rbytes[i])
{
return lbytes[i] < rbytes[i];
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<LangVersion>10</LangVersion>
<TargetFrameworks>netcoreapp3.1;net6.0;net7.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.ResourceManager.AppContainers" Version="1.0.0-beta.1" />
<PackageReference Include="Azure.ResourceManager.Resources" Version="1.3.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\src\Proto.Cluster\Proto.Cluster.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit aaf264a

Please sign in to comment.