Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/KubeOps/Operator/Caching/IResourceCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ internal interface IResourceCache<TEntity>

TEntity Upsert(TEntity resource, out CacheComparisonResult result);

bool Exists(TEntity resource);

void Fill(IEnumerable<TEntity> resources);

void Remove(TEntity resource);
Expand Down
4 changes: 2 additions & 2 deletions src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void Clear()
_metrics.CachedItemsSummary.Observe(_cache.Count);
}

public bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);

private CacheComparisonResult CompareCache(TEntity resource)
{
if (!Exists(resource))
Expand Down Expand Up @@ -101,8 +103,6 @@ private CacheComparisonResult CompareCache(TEntity resource)
return CacheComparisonResult.Other;
}

private bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);

private void Remove(string resourceUid)
{
_cache.TryRemove(resourceUid, out _);
Expand Down
6 changes: 4 additions & 2 deletions src/KubeOps/Operator/Controller/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ public EventQueue(
.GroupBy(e => e.Resource.Uid())
.Select(
group => group
.Select(ProcessDelay)
.Switch())
.GroupBy(e => e.Type)
.Select(typedGroup => typedGroup
.Select(ProcessDelay).Switch())
.Merge())
.Merge()
.Select(UpdateResourceData)
.Merge()
Expand Down
14 changes: 14 additions & 0 deletions src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using k8s.Models;

using KubeOps.KubernetesClient;
using KubeOps.Operator.Caching;
using KubeOps.Operator.DevOps;

namespace KubeOps.Operator.Kubernetes;
Expand All @@ -21,6 +22,7 @@ internal class ResourceWatcher<TEntity> : IDisposable, IResourceWatcher<TEntity>
private readonly IKubernetesClient _client;
private readonly ILogger<ResourceWatcher<TEntity>> _logger;
private readonly IResourceWatcherMetrics<TEntity> _metrics;
private readonly IResourceCache<TEntity> _resourceCache;
private readonly OperatorSettings _settings;
private readonly Subject<TimeSpan> _reconnectHandler = new();
private readonly IDisposable _reconnectSubscription;
Expand All @@ -34,11 +36,13 @@ public ResourceWatcher(
IKubernetesClient client,
ILogger<ResourceWatcher<TEntity>> logger,
IResourceWatcherMetrics<TEntity> metrics,
IResourceCache<TEntity> resourceCache,
OperatorSettings settings)
{
_client = client;
_logger = logger;
_metrics = metrics;
_resourceCache = resourceCache;
_settings = settings;
_reconnectSubscription =
_reconnectHandler
Expand Down Expand Up @@ -131,6 +135,16 @@ private void OnWatcherEvent(WatchEventType type, TEntity resource)

_metrics.WatchedEvents.Inc();

if (_resourceCache.Exists(resource) && type == WatchEventType.Added)
{
_logger.LogTrace(
@"The resource ""{kind}/{name}"" binded to the watcher event already exist in cache. Skipping ""{watchEvent}"" event",
resource.Kind,
resource.Name(),
type);
return;
}

switch (type)
{
case WatchEventType.Added:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

using KubeOps.KubernetesClient;
using KubeOps.Operator;
using KubeOps.Operator.Caching;
using KubeOps.Operator.DevOps;
using KubeOps.Operator.Kubernetes;
using KubeOps.Test.TestEntities;
using KubeOps.TestOperator.Entities;

using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Reactive.Testing;
Expand All @@ -35,6 +38,10 @@ public class TestResource : IKubernetesObject<V1ObjectMeta>

private readonly Mock<IKubernetesClient> _client = new();
private readonly Mock<IResourceWatcherMetrics<TestResource>> _metrics = new();
private readonly Mock<IResourceCache<TestResource>> _resourceCacheMock;

public ResourceWatcherTest() =>
_resourceCacheMock = new Mock<IResourceCache<TestResource>>(MockBehavior.Strict);

[Fact]
public async Task Should_Restart_Watcher_On_Exception()
Expand All @@ -50,6 +57,7 @@ public async Task Should_Restart_Watcher_On_Exception()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var testScheduler = new TestScheduler();
Expand Down Expand Up @@ -105,6 +113,7 @@ public async Task Should_Not_Throw_Overflow_Exception()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var testScheduler = new TestScheduler();
Expand Down Expand Up @@ -139,7 +148,12 @@ public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -166,7 +180,12 @@ public async Task Should_Not_Restart_On_Serialization_Exception()
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -188,7 +207,12 @@ public async Task Should_Be_Restarted_After_TaskCanceledException_IOException()

SetupResourceWatcherMetrics();

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -212,6 +236,9 @@ public async Task Should_Publish_On_Watcher_Event()

Action<WatchEventType, TestResource> onWatcherEvent = null!;

_resourceCacheMock.Setup(c => c.Exists(It.IsAny<TestResource>()))
.Returns(false);

_client.Setup(
c => c.Watch(
It.IsAny<TimeSpan>(),
Expand All @@ -234,6 +261,7 @@ public async Task Should_Publish_On_Watcher_Event()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var watchEvents = resourceWatcher.WatchEvents.Replay(1);
Expand Down Expand Up @@ -273,7 +301,12 @@ public async Task Should_Restart_Watcher_On_Close()
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherClosed).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand Down