Skip to content

Commit

Permalink
Fix steam stream definition start handling (#107)
Browse files Browse the repository at this point in the history
* Fix steam stream definition start handling

Resolves #106

* Add unit tests

* Fix

* Fix formatting issues
  • Loading branch information
s-vitaliy committed Jul 4, 2024
1 parent 8624956 commit 3cd8524
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 33 deletions.
93 changes: 61 additions & 32 deletions src/Services/CommandHandlers/StreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Extensions;
using Arcane.Operator.Models.Base;
using Arcane.Operator.Models.Commands;
using Arcane.Operator.Models.Resources.JobTemplates.Base;
using Arcane.Operator.Models.Resources.Status.V1Alpha1;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.CommandHandlers;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Google.Protobuf.WellKnownTypes;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Kubernetes;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class StreamingJobCommandHandler : ICommandHandler<StreamingJobCommand>
{
StartJob startJob => this.streamClassRepository
.Get(startJob.streamDefinition.Namespace(), startJob.streamDefinition.Kind)
.Map(maybeSc => maybeSc switch
.TryMap(maybeSc => maybeSc switch
{
{ HasValue: true, Value: var sc } => this.StartJob(startJob.streamDefinition, startJob.IsBackfilling, sc),
{ HasValue: false } => throw new InvalidOperationException($"Stream class not found for {startJob.streamDefinition.Kind}"),
Expand All @@ -60,38 +61,66 @@ public class StreamingJobCommandHandler : ICommandHandler<StreamingJobCommand>
private Task StartJob(IStreamDefinition streamDefinition, bool isBackfilling, IStreamClass streamClass)
{
var template = streamDefinition.GetJobTemplate(isBackfilling);

return this.streamingJobTemplateRepository
.GetStreamingJobTemplate(template.Kind, streamDefinition.Namespace(), template.Name)
.Map(jobTemplate =>
.FlatMap(t => this.TryStartJobFromTemplate(t, streamDefinition, streamClass, isBackfilling, template))
.FlatMap(async command =>
{
if (!jobTemplate.HasValue)
{
var message = $"Failed to find job template with kind {template.Kind} and name {template.Name}";
var command = new SetInternalErrorStatus(streamDefinition,
V1Alpha1StreamCondition.CustomErrorCondition(message));
return this.updateStatusCommandHandler.Handle(command);
}
var job = jobTemplate
.Value
.GetJob()
.WithStreamingJobLabels(streamDefinition.StreamId, isBackfilling, streamDefinition.Kind)
.WithStreamingJobAnnotations(streamDefinition.GetConfigurationChecksum())
.WithMetadataAnnotations(streamClass)
.WithCustomEnvironment(streamDefinition.ToV1EnvFromSources(streamClass))
.WithCustomEnvironment(streamDefinition.ToEnvironment(isBackfilling, streamClass))
.WithOwnerReference(streamDefinition)
.WithName(streamDefinition.StreamId);
this.logger.LogInformation("Starting a new stream job with an id {streamId}",
streamDefinition.StreamId);
return this.kubeCluster
.SendJob(job, streamDefinition.Metadata.Namespace(), CancellationToken.None)
.TryMap(result => result.AsOption(),
exception =>
{
this.logger.LogError(exception, "Failed to send job");
return Option<V1JobStatus>.None;
});
await this.updateStatusCommandHandler.Handle(command);
return NotUsed.Instance;
});
}

private Task<UpdateStatusCommand> TryStartJobFromTemplate(Option<IStreamingJobTemplate> jobTemplate,
IStreamDefinition streamDefinition,
IStreamClass streamClass,
bool isBackfilling,
V1TypedLocalObjectReference reference)
{
if (!jobTemplate.HasValue)
{
var message = $"Failed to find job template with kind {reference.Kind} and name {reference.Name}";
var condition = V1Alpha1StreamCondition.CustomErrorCondition(message);
var command = new SetInternalErrorStatus(streamDefinition, condition);
return Task.FromResult<UpdateStatusCommand>(command);
}


try
{
var job = this.BuildJob(jobTemplate, streamDefinition, streamClass, isBackfilling);
this.logger.LogInformation("Starting a new stream job with an id {streamId}", streamDefinition.StreamId);
return this.kubeCluster
.SendJob(job, streamDefinition.Metadata.Namespace(), CancellationToken.None)
.TryMap(
_ => isBackfilling ? new Reloading(streamDefinition) : new Running(streamDefinition),
ex => this.HandleError(ex, streamDefinition));
}
catch (Exception ex)
{
var condition = V1Alpha1StreamCondition.CustomErrorCondition($"Failed to build job: {ex.Message}");
return Task.FromResult<UpdateStatusCommand>(new SetInternalErrorStatus(streamDefinition, condition));
}
}

private UpdateStatusCommand HandleError(Exception exception, IStreamDefinition streamDefinition)
{
this.logger.LogError(exception, "Failed to send job");
var condition = V1Alpha1StreamCondition.CustomErrorCondition($"Failed to start job: {exception.Message}");
return new SetInternalErrorStatus(streamDefinition, condition);
}

private V1Job BuildJob(Option<IStreamingJobTemplate> jobTemplate, IStreamDefinition streamDefinition,
IStreamClass streamClass, bool isBackfilling) =>
jobTemplate
.Value
.GetJob()
.WithStreamingJobLabels(streamDefinition.StreamId, isBackfilling, streamDefinition.Kind)
.WithStreamingJobAnnotations(streamDefinition.GetConfigurationChecksum())
.WithMetadataAnnotations(streamClass)
.WithCustomEnvironment(streamDefinition.ToV1EnvFromSources(streamClass))
.WithCustomEnvironment(streamDefinition.ToEnvironment(isBackfilling, streamClass))
.WithOwnerReference(streamDefinition)
.WithName(streamDefinition.StreamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class StreamingJobTemplateRepository : IStreamingJobTemplateRepository
jobTemplateResourceConfiguration.Plural,
jobNamespace,
templateName)
.Map(resource => resource.AsOption<IStreamingJobTemplate>());
.TryMap(resource => resource.AsOption<IStreamingJobTemplate>(),
exception =>
{
this.logger.LogError("Failed to get job template {templateName} for kind {kind} in namespace {jobNamespace}",
templateName, kind, jobNamespace);
return Option<IStreamingJobTemplate>.None;
});
}
}
163 changes: 163 additions & 0 deletions test/Services/StreamingJobCommandHandlerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util.Extensions;
using Arcane.Operator.Models.Commands;
using Arcane.Operator.Models.Resources.JobTemplates.Base;
using Arcane.Operator.Models.Resources.Status.V1Alpha1;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.CommandHandlers;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Arcane.Operator.Services.CommandHandlers;
using Arcane.Operator.Tests.Extensions;
using Arcane.Operator.Tests.Fixtures;
using Arcane.Operator.Tests.Services.TestCases;
using k8s.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Moq;
using Snd.Sdk.Kubernetes.Base;
using Xunit;
using static Arcane.Operator.Tests.Services.TestCases.StreamDefinitionTestCases;
using static Arcane.Operator.Tests.Services.TestCases.StreamClassTestCases;
using static Arcane.Operator.Tests.Services.TestCases.StreamingJobTemplateTestCases;

namespace Arcane.Operator.Tests.Services;

public class StreamingJobCommandHandlerTests(LoggerFixture loggerFixture) : IClassFixture<LoggerFixture>,
IClassFixture<AkkaFixture>
{
// Akka service and test helpers

// Mocks
private readonly Mock<IKubeCluster> kubeClusterMock = new();
private readonly Mock<IStreamClassRepository> streamClassRepositoryMock = new();
private readonly Mock<IStreamingJobTemplateRepository> streamingJobTemplateRepositoryMock = new();

[Fact]
public async Task HandleStreamStopCommand()
{
// Arrange
var command = new StopJob("job-name", "job-namespace");
var service = this.CreateServiceProvider().GetRequiredService<ICommandHandler<StreamingJobCommand>>();

// Act
await service.Handle(command);

// Assert
this.kubeClusterMock.Verify(k => k.DeleteJob(command.name,
command.nameSpace,
It.IsAny<CancellationToken>(),
It.IsAny<PropagationPolicy>()));
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task HandleStreamStartCommand(bool isBackfilling)
{
// Arrange
var command = new StartJob(StreamDefinition, isBackfilling);
var service = this.CreateServiceProvider().GetRequiredService<ICommandHandler<StreamingJobCommand>>();
this.streamClassRepositoryMock
.Setup(scr => scr.Get(It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(StreamClass.AsOption());
this.streamingJobTemplateRepositoryMock
.Setup(sjtr => sjtr.GetStreamingJobTemplate(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(((IStreamingJobTemplate)StreamingJobTemplate).AsOption());
var expectedState = isBackfilling ? StreamPhase.RELOADING.ToString() : StreamPhase.RUNNING.ToString();

// Act
await service.Handle(command);

// Assert
this.kubeClusterMock.Verify(k =>
k.SendJob(It.Is<V1Job>(job => job.IsBackfilling() == isBackfilling), It.IsAny<string>(), It.IsAny<CancellationToken>()));
this.kubeClusterMock.Verify(k => k.UpdateCustomResourceStatus(It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
StreamDefinition.Namespace(),
StreamDefinition.Name(),
It.Is<V1Alpha1StreamStatus>(s => s.Phase == expectedState),
It.IsAny<Func<JsonElement, It.IsAnyType>>()),
Times.Once);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task HandleFailedStreamTemplate(bool isBackfilling)
{
// Arrange
var command = new StartJob(StreamDefinition, isBackfilling);
var service = this.CreateServiceProvider().GetRequiredService<ICommandHandler<StreamingJobCommand>>();
this.streamClassRepositoryMock
.Setup(scr => scr.Get(It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(StreamClass.AsOption());
this.streamingJobTemplateRepositoryMock
.Setup(sjtr => sjtr.GetStreamingJobTemplate(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(((IStreamingJobTemplate)new FailedStreamingJobTemplate(new Exception())).AsOption());

// Act
await service.Handle(command);

// Assert
this.kubeClusterMock.Verify(k => k.UpdateCustomResourceStatus(It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
StreamDefinition.Namespace(),
StreamDefinition.Name(),
It.Is<V1Alpha1StreamStatus>(s => s.Phase == StreamPhase.FAILED.ToString()),
It.IsAny<Func<JsonElement, It.IsAnyType>>()),
Times.Once);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task HandleFailedSendJob(bool isBackfilling)
{
// Arrange
var command = new StartJob(StreamDefinition, isBackfilling);
var service = this.CreateServiceProvider().GetRequiredService<ICommandHandler<StreamingJobCommand>>();

this.streamClassRepositoryMock
.Setup(scr => scr.Get(It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(StreamClass.AsOption());

this.streamingJobTemplateRepositoryMock
.Setup(sjtr => sjtr.GetStreamingJobTemplate(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(((IStreamingJobTemplate)new FailedStreamingJobTemplate(new Exception())).AsOption());

this.kubeClusterMock
.Setup(k => k.SendJob(It.IsAny<V1Job>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Throws<Exception>();

// Act
await service.Handle(command);

// Assert
this.kubeClusterMock.Verify(k => k.UpdateCustomResourceStatus(It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
StreamDefinition.Namespace(),
StreamDefinition.Name(),
It.Is<V1Alpha1StreamStatus>(s => s.Phase == StreamPhase.FAILED.ToString()),
It.IsAny<Func<JsonElement, It.IsAnyType>>()),
Times.Once);
}

private ServiceProvider CreateServiceProvider()
{
return new ServiceCollection()
.AddSingleton(this.kubeClusterMock.Object)
.AddSingleton(this.streamClassRepositoryMock.Object)
.AddSingleton(this.streamingJobTemplateRepositoryMock.Object)
.AddSingleton(loggerFixture.Factory.CreateLogger<StreamingJobCommandHandler>())
.AddSingleton(loggerFixture.Factory.CreateLogger<UpdateStatusCommandHandler>())
.AddSingleton<ICommandHandler<UpdateStatusCommand>, UpdateStatusCommandHandler>()
.AddSingleton<ICommandHandler<StreamingJobCommand>, StreamingJobCommandHandler>()
.BuildServiceProvider();
}
}
38 changes: 38 additions & 0 deletions test/Services/TestCases/FailedStreamingJobTemplate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using Arcane.Operator.Models.Resources.JobTemplates.Base;
using k8s.Models;

namespace Arcane.Operator.Tests.Services.TestCases;

/// <summary>
/// A streaming job templatethat throws an exception (for tests)
/// </summary>
public class FailedStreamingJobTemplate : IStreamingJobTemplate
{
private readonly Exception exception;

public FailedStreamingJobTemplate(Exception exception)
{
this.exception = exception;
}

public string ApiVersion
{
get => throw this.exception;
set => throw this.exception;
}

public string Kind
{
get => throw this.exception;
set => throw this.exception;
}

public V1ObjectMeta Metadata
{
get => throw this.exception;
set => throw this.exception;
}

public V1Job GetJob() => throw this.exception;
}

0 comments on commit 3cd8524

Please sign in to comment.