Skip to content

Commit

Permalink
Fixed issued #14 - [bug] Remove reflections from in-memory maps when …
Browse files Browse the repository at this point in the history
…a namespace is deleted. (#15)
  • Loading branch information
winromulus committed Jul 12, 2019
1 parent a5c43cb commit 6543059
Showing 1 changed file with 74 additions and 25 deletions.
99 changes: 74 additions & 25 deletions ES.Kubernetes.Reflector.Core/Mirroring/ResourceMirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ namespace ES.Kubernetes.Reflector.Core.Mirroring
{
public abstract class ResourceMirror<T> where T : class, IKubernetesObject
{
/// <summary>
/// Keeps track of resources with auto-reflection turned on.
/// </summary>
private readonly ConcurrentDictionary<KubernetesObjectId, string> _autoReflections =
new ConcurrentDictionary<KubernetesObjectId, string>();

private readonly IKubernetes _client;
private readonly FeederQueue<WatcherEvent> _eventQueue;
private readonly ILogger _logger;
private readonly ManagedWatcher<V1Namespace> _namespaceWatcher;

/// <summary>
/// Maps reflections to sources
/// </summary>
private readonly ConcurrentDictionary<KubernetesObjectId, KubernetesObjectId> _reflections =
new ConcurrentDictionary<KubernetesObjectId, KubernetesObjectId>();

Expand Down Expand Up @@ -121,13 +125,40 @@ private async Task OnEvent(WatcherEvent e)

protected async Task OnNamespaceWatcherEvent(WatcherEvent<V1Namespace> request)
{
if (request.Type != WatchEventType.Added) return;
var sources = _autoReflections.Keys;
foreach (var source in sources)
switch (request.Type)
{
var item = await OnResourceGet(_client, source.Name, source.Namespace);
await CheckAutoReflections(item);
case WatchEventType.Added:
{
var id = _autoReflections.Keys.ToList();
foreach (var source in id)
{
var item = await OnResourceGet(_client, source.Name, source.Namespace);
await CheckAutoReflections(item);
}
}
break;
case WatchEventType.Deleted:
{
var toRemove = _reflections.Keys
.Where(s => s.Namespace.Equals(request.Item.Metadata.Name))
.ToList();
foreach (var id in toRemove)
{
_reflections.TryRemove(id, out _);
}

toRemove = _autoReflections.Keys
.Where(s => s.Namespace.Equals(request.Item.Metadata.Name))
.ToList();
foreach (var id in toRemove)
{
_autoReflections.TryRemove(id, out _);
}
}
break;
}
if (request.Type != WatchEventType.Added) return;

}

protected async Task OnResourceWatcherEvent(WatcherEvent<T> request)
Expand All @@ -141,28 +172,28 @@ protected async Task OnResourceWatcherEvent(WatcherEvent<T> request)
{
case WatchEventType.Added:
case WatchEventType.Modified:
{
//Check if the source for auto reflection is still valid.
if (await CheckAutoReflectionSource(item)) return;
{
//Check if the source for auto reflection is still valid.
if (await CheckAutoReflectionSource(item)) return;

//Ensure auto reflections
await CheckAutoReflections(item);
//Ensure auto reflections
await CheckAutoReflections(item);

//Update current item. If updated, return (Update will be picked up by watcher)
if (await ReflectSourceToSelf(item)) return;
//Update current item. If updated, return (Update will be picked up by watcher)
if (await ReflectSourceToSelf(item)) return;

//Update all child reflections
await ReflectSelfToReflections(item);
}
//Update all child reflections
await ReflectSelfToReflections(item);
}
break;
case WatchEventType.Deleted:
{
_reflections.TryRemove(id, out _);
_autoReflections.TryRemove(id, out _);
{
_reflections.TryRemove(id, out _);
_autoReflections.TryRemove(id, out _);

//Remove all auto-reflections
await UpdateAutoReflections(item, new List<string>());
}
//Remove all auto-reflections
await UpdateAutoReflections(item, new List<string>());
}
break;

case WatchEventType.Error: break;
Expand Down Expand Up @@ -311,8 +342,26 @@ private async Task ReflectSelfToReflections(T item)
var reflections = _reflections.Where(s => s.Value.Equals(id)).Select(s => s.Key).ToList();
foreach (var reflectionId in reflections)
{
var target = await OnResourceGet(_client, reflectionId.Name, reflectionId.Namespace);
await Reflect(item, target);
try
{
var target = await OnResourceGet(_client, reflectionId.Name, reflectionId.Namespace);
await Reflect(item, target);

}
catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogWarning(
"Could not reflect {@sourceId} to {@targetId}. Reflection {@targetId} not found.",
id, reflectionId, reflectionId);
_reflections.TryRemove(reflectionId, out _);
}
catch (Exception exception)
{
_logger.LogError(exception,
"Could not reflect {@sourceId} to {@targetId} due to exception",
id, reflectionId, reflectionId);
}

}
}

Expand Down

0 comments on commit 6543059

Please sign in to comment.