Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ dotnet test
## Upcoming features

- Configurable policy for ResourceChangeTracker (flag skipSameGenerationEvents)
- [Bookmark event](https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks) support
- Leader election support
- Dynamic custom resource sample
- Single-instance check
Expand Down
7 changes: 7 additions & 0 deletions src/k8s.Operators/Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public async Task ProcessEventAsync(CustomResourceEvent resourceEvent, Cancellat
return;
}

if (resourceEvent.Type == WatchEventType.Bookmark)
{
// Skip Bookmark events since there is nothing else to do
_logger.LogDebug($"Skip ProcessEvent, received Bookmark event, {resourceEvent.Resource}");
return;
}

// Enqueue the event
_eventManager.Enqueue(resourceEvent);

Expand Down
25 changes: 7 additions & 18 deletions tests/k8s.Operators.Tests/ControllerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public ControllerTests()
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task ProcessEventAsync_AddOrModifyIsCalled(WatchEventType eventType)
{
// Arrange
Expand All @@ -42,6 +41,7 @@ public async Task ProcessEventAsync_AddOrModifyIsCalled(WatchEventType eventType
[Theory]
[InlineData(WatchEventType.Error)]
[InlineData(WatchEventType.Deleted)]
[InlineData(WatchEventType.Bookmark)]
public async Task ProcessEventAsync_AddOrModifyIsNotCalled(WatchEventType eventType)
{
// Arrange
Expand All @@ -60,7 +60,6 @@ public async Task ProcessEventAsync_AddOrModifyIsNotCalled(WatchEventType eventT
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task ProcessEventAsync_AddOrModifyIsNotCalledIfResourceIsAlreadyProcessed(WatchEventType eventType)
{
// Arrange
Expand Down Expand Up @@ -129,7 +128,6 @@ public async Task ProcessEventAsync_UpdateStatusCallsPatchApi()
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task ProcessEventAsync_MissingFinalizerIsAdded(WatchEventType eventType)
{
// Arrange
Expand All @@ -146,12 +144,13 @@ public async Task ProcessEventAsync_MissingFinalizerIsAdded(WatchEventType event

[Theory]
[InlineData(WatchEventType.Error, true)]
[InlineData(WatchEventType.Deleted, true)]
[InlineData(WatchEventType.Error, false)]
[InlineData(WatchEventType.Deleted, true)]
[InlineData(WatchEventType.Deleted, false)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Bookmark, false)]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
public async Task ProcessEventAsync_FinalizerIsNotAdded(WatchEventType eventType, bool withFinalizer)
{
// Arrange
Expand Down Expand Up @@ -249,10 +248,8 @@ public void ProcessEventAsync_EventsForDifferentResourceAreProcessedConcurrently
[Theory]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Added, false)]
[InlineData(WatchEventType.Modified, false)]
[InlineData(WatchEventType.Bookmark, false)]
public async Task ProcessEventAsync_RetryOnFailure(WatchEventType eventType, bool delete)
{
// Arrange
Expand All @@ -270,10 +267,8 @@ public async Task ProcessEventAsync_RetryOnFailure(WatchEventType eventType, boo
[Theory]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Added, false)]
[InlineData(WatchEventType.Modified, false)]
[InlineData(WatchEventType.Bookmark, false)]
public async Task ProcessEventAsync_NoRetryAfterMaxAttempts(WatchEventType eventType, bool delete)
{
// Arrange
Expand All @@ -291,10 +286,8 @@ public async Task ProcessEventAsync_NoRetryAfterMaxAttempts(WatchEventType event
[Theory]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Added, false)]
[InlineData(WatchEventType.Modified, false)]
[InlineData(WatchEventType.Bookmark, false)]
public void ProcessEventAsync_NoRetryIfNewEventForSameResourceIsQueued(WatchEventType eventType, bool delete)
{
// Arrange
Expand All @@ -321,17 +314,15 @@ public void ProcessEventAsync_NoRetryIfNewEventForSameResourceIsQueued(WatchEven
[Theory]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Added, false)]
[InlineData(WatchEventType.Modified, false)]
[InlineData(WatchEventType.Bookmark, false)]
public void ProcessEventAsync_RetryIfNewEventForAnotherResourceIsQueued(WatchEventType eventType, bool delete)
{
// Arrange
var resource1 = CreateCustomResource(uid: "1", deletionTimeStamp: delete ? DateTime.Now : (DateTime?) null);
var resource2 = CreateCustomResource(uid: "2", deletionTimeStamp: delete ? DateTime.Now : (DateTime?) null);
_controller.ThrowExceptionOnNextEvents(1);
var block = _controller.BlockNextEvent();
var block1 = _controller.BlockNextEvent();

// Event #1, will block
var task1 = _controller.ProcessEventAsync(new CustomResourceEvent(eventType, resource1), DUMMY_TOKEN);
Expand All @@ -340,21 +331,19 @@ public void ProcessEventAsync_RetryIfNewEventForAnotherResourceIsQueued(WatchEve
var task2 = _controller.ProcessEventAsync(new CustomResourceEvent(eventType, resource2), DUMMY_TOKEN);

// Unblock #1
_controller.UnblockEvent(block);
_controller.UnblockEvent(block1);

Task.WaitAll(task2, task1);

// Assert
VerifyCompletedEvents(_controller, (resource2, deleteEvent: delete), (resource1, deleteEvent: delete));
VerifyCompletedEvents(_controller, (resource1, deleteEvent: delete), (resource2, deleteEvent: delete));
}

[Theory]
[InlineData(WatchEventType.Added, true)]
[InlineData(WatchEventType.Modified, true)]
[InlineData(WatchEventType.Bookmark, true)]
[InlineData(WatchEventType.Added, false)]
[InlineData(WatchEventType.Modified, false)]
[InlineData(WatchEventType.Bookmark, false)]
public void ProcessEventAsync_NoRetryIfCancelIsRequested(WatchEventType eventType, bool delete)
{
// Arrange
Expand Down
26 changes: 21 additions & 5 deletions tests/k8s.Operators.Tests/OperatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public async Task AddControllerOfType_CreatesAndAddController()
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task AddController_EventIsDispatchedToGenericController(WatchEventType eventType)
{
// Arrange
Expand All @@ -120,7 +119,6 @@ public async Task AddController_EventIsDispatchedToGenericController(WatchEventT
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task AddController_WatchDefaultNamespaceIfNotSpecified(WatchEventType eventType)
{
// Arrange
Expand All @@ -142,7 +140,6 @@ public async Task AddController_WatchDefaultNamespaceIfNotSpecified(WatchEventTy
[Theory]
[InlineData(WatchEventType.Added)]
[InlineData(WatchEventType.Modified)]
[InlineData(WatchEventType.Bookmark)]
public async Task OnIncomingEvent_EventIsDiscardedIfNoControllerIsAssociated(WatchEventType eventType)
{
// Arrange
Expand All @@ -162,10 +159,8 @@ public async Task OnIncomingEvent_EventIsDiscardedIfNoControllerIsAssociated(Wat
[Theory]
[InlineData(WatchEventType.Added, "")]
[InlineData(WatchEventType.Modified, "")]
[InlineData(WatchEventType.Bookmark, "")]
[InlineData(WatchEventType.Added, null)]
[InlineData(WatchEventType.Modified, null)]
[InlineData(WatchEventType.Bookmark, null)]
public async Task OnIncomingEvent_EventsAreDispatchedToAssociatedControllers(WatchEventType eventType, string allNamespaceVariant)
{
// Arrange
Expand All @@ -187,5 +182,26 @@ public async Task OnIncomingEvent_EventsAreDispatchedToAssociatedControllers(Wat
VerifyAddOrModifyIsCalledWith(namespaceController, resource1);
VerifyAddOrModifyIsCalledWith(genericController, resource2);
}

[Theory]
[InlineData(WatchEventType.Error)]
[InlineData(WatchEventType.Deleted)]
[InlineData(WatchEventType.Bookmark)]
public async Task OnIncomingEvent_EventsAreDispatchedAndIgnored(WatchEventType eventType)
{
// Arrange
var resource = CreateCustomResource(ns: "namespace1");
var controller = new TestableController(_client);
_operator.AddController(controller, "namespace1");
var task =_operator.StartAsync();

// Act
_operator.Exposed_OnIncomingEvent(eventType, resource);

// Assert
_operator.Stop(); await task;
VerifyAddOrModifyIsNotCalled(controller);
VerifyDeleteIsNotCalled(controller);
}
}
}
21 changes: 11 additions & 10 deletions tests/k8s.Operators.Tests/TestableController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ public TestableController(OperatorConfiguration configuration, IKubernetes clien
public List<TestableCustomResource> Invocations_Delete = new List<TestableCustomResource>();
public List<(TestableCustomResource resource, bool deleteEvent)> Invocations = new List<(TestableCustomResource resource, bool deleteEvent)>();
public List<(TestableCustomResource resource, bool deleteEvent)> CompletedEvents = new List<(TestableCustomResource resource, bool deleteEvent)>();
private TaskCompletionSource<object> _tcs = null;

private Queue<TaskCompletionSource<object>> _signals = new Queue<TaskCompletionSource<object>>();

protected override async Task AddOrModifyAsync(TestableCustomResource resource, CancellationToken cancellationToken)
{
Invocations_AddOrModify.Add(resource);
Invocations.Add((resource, false));

if (_tcs != null)
if (_signals.TryDequeue(out var signal))
{
// Wait for UnblockEvent()
await _tcs?.Task;
await signal?.Task;
}

if (_exceptionsToThrow > 0)
Expand All @@ -53,10 +53,10 @@ protected override async Task DeleteAsync(TestableCustomResource resource, Cance
Invocations_Delete.Add(resource);
Invocations.Add((resource, true));

if (_tcs != null)
if (_signals.TryDequeue(out var signal))
{
// Wait for UnblockEvent()
await _tcs?.Task;
await signal?.Task;
}

if (_exceptionsToThrow > 0)
Expand Down Expand Up @@ -94,16 +94,17 @@ public void ThrowExceptionOnNextEvents(int count)
/// </summary>
public TaskCompletionSource<object> BlockNextEvent()
{
_tcs = new TaskCompletionSource<object>();
return _tcs;
var signal = new TaskCompletionSource<object>();
_signals.Enqueue(signal);
return signal;
}

/// <summary>
/// Unblock the next call to AddOrModifyAsync or DeleteAsync
/// </summary>
public void UnblockEvent(TaskCompletionSource<object> tcs)
public void UnblockEvent(TaskCompletionSource<object> signal)
{
tcs.SetResult(true);
signal.SetResult(true);
}
}
}