Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal class ManagedResourceController<TEntity> : IManagedResourceController
private readonly OperatorSettings _settings;
private readonly IFinalizerManager<TEntity> _finalizerManager;

private readonly Subject<(TEntity Resource, TimeSpan Delay)>
private readonly Subject<RequeuedEvent>
_requeuedEvents = new();

private readonly Subject<QueuedEvent>
Expand Down Expand Up @@ -85,9 +85,17 @@ public ManagedResourceController(
private IObservable<Unit> RequeuedEvents => _requeuedEvents
.Do(_ => _metrics.RequeuedEvents.Inc())
.Select(
data => Observable.Return(data.Resource).Delay(data.Delay))
data => Observable.Return(data).Delay(data.Delay))
.Switch()
.Select(data => Observable.FromAsync(() => UpdateResourceData(data)))
.Select(data =>
Observable.FromAsync(async () =>
{
var queuedEvent = await UpdateResourceData(data.Resource);

return data.ResourceEvent.HasValue && queuedEvent != null
? queuedEvent with { ResourceEvent = data.ResourceEvent.Value }
: queuedEvent;
}))
.Switch()
.Where(data => data != null)
.Do(
Expand Down Expand Up @@ -269,13 +277,32 @@ protected async Task HandleResourceEvent(QueuedEvent? data)
resource.Name());
return;
case RequeueEventResult requeue:
_logger.LogInformation(
@"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested with delay ""{requeue}"".",
@event,
resource.Kind,
resource.Name(),
requeue.RequeueIn);
_requeuedEvents.OnNext((resource, requeue.RequeueIn));
if (_settings.DefaultRequeueAsSameType)
{
requeue = new RequeueEventResult(requeue.RequeueIn, @event);
}

if (requeue.EventType.HasValue)
{
_logger.LogInformation(
@"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested as type ""{requeueType}"" with delay ""{requeue}"".",
@event,
resource.Kind,
resource.Name(),
requeue.EventType,
requeue.RequeueIn);
}
else
{
_logger.LogInformation(
@"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested with delay ""{requeue}"".",
@event,
resource.Kind,
resource.Name(),
requeue.RequeueIn);
}

_requeuedEvents.OnNext(new RequeuedEvent(requeue.EventType, resource, requeue.RequeueIn));
break;
}
}
Expand Down Expand Up @@ -417,5 +444,7 @@ private TimeSpan ExponentialBackoff(int retryCount) => TimeSpan
.Add(TimeSpan.FromMilliseconds(_rnd.Next(0, 1000)));

internal record QueuedEvent(ResourceEventType ResourceEvent, TEntity Resource, int RetryCount = 0);

private record RequeuedEvent(ResourceEventType? ResourceEvent, TEntity Resource, TimeSpan Delay);
}
}
6 changes: 6 additions & 0 deletions src/KubeOps/Operator/Controller/Results/RequeueEventResult.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using KubeOps.Operator.Kubernetes;

namespace KubeOps.Operator.Controller.Results
{
Expand All @@ -8,5 +9,10 @@ public RequeueEventResult(TimeSpan requeueIn)
: base(requeueIn)
{
}

public RequeueEventResult(TimeSpan requeueIn, ResourceEventType eventType)
: base(requeueIn, eventType)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using KubeOps.Operator.Events;
using KubeOps.Operator.Kubernetes;

namespace KubeOps.Operator.Controller.Results
Expand All @@ -10,13 +11,27 @@ namespace KubeOps.Operator.Controller.Results
/// </summary>
public abstract class ResourceControllerResult
{
internal ResourceControllerResult(TimeSpan delay) => RequeueIn = delay;
internal ResourceControllerResult(TimeSpan delay)
{
RequeueIn = delay;
}

internal ResourceControllerResult(TimeSpan delay, ResourceEventType eventType)
{
RequeueIn = delay;
EventType = eventType;
}

/// <summary>
/// Time that should be waited for a requeue.
/// </summary>
public TimeSpan RequeueIn { get; }

/// <summary>
/// Type of the event to be queued.
/// </summary>
public ResourceEventType? EventType { get; }

/// <summary>
/// Create a <see cref="ResourceControllerResult"/> that requeues a resource
/// with a given delay. When the event fires (after the delay) the resource
Expand All @@ -31,6 +46,21 @@ public abstract class ResourceControllerResult
public static ResourceControllerResult RequeueEvent(TimeSpan delay)
=> new RequeueEventResult(delay);

// TODO: Requeue with forced event method
/// <summary>
/// Create a <see cref="ResourceControllerResult"/> that requeues a resource
/// with a given delay. When the event fires (after the delay) the resource
/// cache is ignored in favor the specified <see cref="ResourceEventType"/>.
/// Based on the specified type, the new event triggers the according function.
/// </summary>
/// <param name="delay">
/// The delay. Please note, that a delay of <see cref="TimeSpan.Zero"/>
/// will result in an immediate trigger of the function. This can lead to infinite circles.
/// </param>
/// <param name="eventType">
/// The event type to queue.
/// </param>
/// <returns>The <see cref="ResourceControllerResult"/> with the configured delay and event type.</returns>
public static ResourceControllerResult RequeueEvent(TimeSpan delay, ResourceEventType eventType)
=> new RequeueEventResult(delay, eventType);
}
}
12 changes: 12 additions & 0 deletions src/KubeOps/Operator/OperatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,17 @@ public sealed class OperatorSettings
/// <para>The search will be performed on each "Start" of the controller.</para>
/// </summary>
public bool PreloadCache { get; set; }

/// <summary>
/// <para>
/// If set to true, returning `ResourceControllerResult.RequeueEvent` will
/// automatically requeue the event as the same type.
/// </para>
/// <para>
/// For example, if done from a "Created" event, the event will be queued
/// again as "Created" instead of (for example) "NotModified".
/// </para>
/// </summary>
public bool DefaultRequeueAsSameType { get; set; } = false;
}
}