Skip to content

Commit

Permalink
fix: use resource version in watcher to get newest events from API.
Browse files Browse the repository at this point in the history
This fixes [bug]: Deletion events seem to be missing after ResourceWatcher reconnect #675.

Signed-off-by: Christoph Bühler <cbuehler@rootd.ch>
  • Loading branch information
buehler committed Jan 17, 2024
1 parent 86b1f94 commit 7ce6a9b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
11 changes: 11 additions & 0 deletions src/KubeOps.KubernetesClient/IKubernetesClient.cs
Expand Up @@ -353,6 +353,10 @@ void Delete<TEntity>(string name, string? @namespace = null)
/// If the namespace is omitted, all entities on the cluster are watched.
/// </param>
/// <param name="timeout">The timeout which the watcher has (after this timeout, the server will close the connection).</param>
/// <param name="resourceVersion">
/// When specified with a watch call, shows changes that occur after that particular version of a resource.
/// Defaults to changes from the beginning of history.
/// </param>
/// <param name="cancellationToken">Cancellation-Token.</param>
/// <param name="labelSelectors">A list of label-selectors to apply to the search.</param>
/// <returns>A entity watcher for the given entity.</returns>
Expand All @@ -362,6 +366,7 @@ void Delete<TEntity>(string name, string? @namespace = null)
Action? onClose = null,
string? @namespace = null,
TimeSpan? timeout = null,
string? resourceVersion = null,
CancellationToken cancellationToken = default,
params LabelSelector[] labelSelectors)
where TEntity : IKubernetesObject<V1ObjectMeta>
Expand All @@ -371,6 +376,7 @@ void Delete<TEntity>(string name, string? @namespace = null)
onClose,
@namespace,
timeout,
resourceVersion,
labelSelectors.ToExpression(),
cancellationToken);

Expand All @@ -388,6 +394,10 @@ void Delete<TEntity>(string name, string? @namespace = null)
/// If the namespace is omitted, all entities on the cluster are watched.
/// </param>
/// <param name="timeout">The timeout which the watcher has (after this timeout, the server will close the connection).</param>
/// <param name="resourceVersion">
/// When specified with a watch call, shows changes that occur after that particular version of a resource.
/// Defaults to changes from the beginning of history.
/// </param>
/// <param name="labelSelector">A string, representing an optional label selector for filtering watched objects.</param>
/// <param name="cancellationToken">Cancellation-Token.</param>
/// <returns>A entity watcher for the given entity.</returns>
Expand All @@ -397,6 +407,7 @@ void Delete<TEntity>(string name, string? @namespace = null)
Action? onClose = null,
string? @namespace = null,
TimeSpan? timeout = null,
string? resourceVersion = null,
string? labelSelector = null,
CancellationToken cancellationToken = default)
where TEntity : IKubernetesObject<V1ObjectMeta>;
Expand Down
3 changes: 3 additions & 0 deletions src/KubeOps.KubernetesClient/KubernetesClient.cs
Expand Up @@ -295,6 +295,7 @@ public async Task DeleteAsync<TEntity>(string name, string? @namespace = null)
Action? onClose = null,
string? @namespace = null,
TimeSpan? timeout = null,
string? resourceVersion = null,
string? labelSelector = null,
CancellationToken cancellationToken = default)
where TEntity : IKubernetesObject<V1ObjectMeta>
Expand All @@ -308,6 +309,7 @@ public async Task DeleteAsync<TEntity>(string name, string? @namespace = null)
@namespace,
metadata.PluralName,
labelSelector: labelSelector,
resourceVersion: resourceVersion,
timeoutSeconds: timeout switch
{
null => null,
Expand All @@ -320,6 +322,7 @@ public async Task DeleteAsync<TEntity>(string name, string? @namespace = null)
metadata.Version,
metadata.PluralName,
labelSelector: labelSelector,
resourceVersion: resourceVersion,
timeoutSeconds: timeout switch
{
null => null,
Expand Down
14 changes: 11 additions & 3 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Expand Up @@ -32,6 +32,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
private uint _watcherReconnectRetries;

private Watcher<TEntity>? _watcher;
private string? _lastResourceVersion;

public ResourceWatcher(
ILogger<ResourceWatcher<TEntity>> logger,
Expand Down Expand Up @@ -82,7 +83,12 @@ private void WatchResource()
}

_logger.LogDebug("""Create watcher for entity of type "{type}".""", typeof(TEntity));
_watcher = _client.Watch<TEntity>(OnEvent, OnError, OnClosed, @namespace: _settings.Namespace);
_watcher = _client.Watch<TEntity>(
OnEvent,
OnError,
OnClosed,
@namespace: _settings.Namespace,
resourceVersion: _lastResourceVersion);
}

private void StopWatching()
Expand Down Expand Up @@ -158,12 +164,14 @@ private void OnClosed()
private async void OnEvent(WatchEventType type, TEntity entity)
{
_watcherReconnectRetries = 0;
_lastResourceVersion = entity.ResourceVersion();

_logger.LogTrace(
"""Received watch event "{eventType}" for "{kind}/{name}".""",
"""Received watch event "{eventType}" for "{kind}/{name}", last observed resource version: {resourceVersion}.""",
type,
entity.Kind,
entity.Name());
entity.Name(),
_lastResourceVersion);

_queue.RemoveIfQueued(entity);

Expand Down

0 comments on commit 7ce6a9b

Please sign in to comment.