Skip to content

Commit

Permalink
add json and datacontract variations with memoryInstanceStore
Browse files Browse the repository at this point in the history
  • Loading branch information
mihainradu committed May 3, 2024
1 parent 9a01688 commit 6df2a97
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 23 deletions.
13 changes: 12 additions & 1 deletion src/Test/TestCases.Runtime/ParallelBranchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
using System.Activities.ParallelTracking;
using System.Linq;
using WorkflowApplicationTestExtensions;
using WorkflowApplicationTestExtensions.Persistence;
using Xunit;

namespace TestCases.Runtime;


public class ParallelBranchTestsJson : ParallelBranchTests
{
protected override IWorkflowSerializer Serializer => new JsonWorkflowSerializer();
}

public class ParallelBranchTests
{
public class TestCodeActivity(Action<CodeActivityContext> onExecute) : CodeActivity
Expand All @@ -18,12 +25,16 @@ protected override void Execute(CodeActivityContext context)
}
}

protected virtual IWorkflowSerializer Serializer => new DataContractWorkflowSerializer();
ParallelBranch parentLevel = default;
private void Run(params Action<CodeActivityContext>[] onExecute)
{
var execs = new Action<CodeActivityContext>[] { new(SetParent) }
.Concat(onExecute);
new WorkflowApplication(new SuspendingWrapper(execs.Select(c => new TestCodeActivity(c))))
new WorkflowApplication(new SuspendingWrapper(execs.Select(c => new TestCodeActivity(c))))
{
InstanceStore = new MemoryInstanceStore(Serializer)
}
.RunUntilCompletion();

void SetParent(CodeActivityContext context)
Expand Down
16 changes: 14 additions & 2 deletions src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@
using System.Linq;
using WorkflowApplicationTestExtensions;
using Xunit;
using WorkflowApplicationTestExtensions.Persistence;

namespace TestCases.Runtime
{
public class ParallelTrackingExtensionsTestsJson : ParallelTrackingExtensionsTests
{
protected override IWorkflowSerializer Serializer => new JsonWorkflowSerializer();
}

public class ParallelTrackingExtensionsTests
{
protected virtual IWorkflowSerializer Serializer => new DataContractWorkflowSerializer();

[Fact]
public void ParallelActivity()
{
Expand Down Expand Up @@ -89,8 +97,12 @@ public void PickActivity()
trigger1Id.ShouldNotBe(trigger2Id);
}

private static void Run(Activity activity) =>
new WorkflowApplication(activity).RunUntilCompletion();
private void Run(Activity activity) =>
new WorkflowApplication(activity)
{
InstanceStore = new MemoryInstanceStore(Serializer)
}
.RunUntilCompletion();

private static void ValidateId(string id, int expectedNesting, string shouldStartWith = null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,55 +41,67 @@ public static XInstanceDictionary ToNameDictionary(object source) => ((InstanceD

public abstract class AbstractInstanceStore(IWorkflowSerializer instanceSerializer) : InstanceStore
{
private Guid _storageInstanceId = Guid.NewGuid();
private Guid _lockId = Guid.NewGuid();
private readonly Guid _storageInstanceId = Guid.NewGuid();
private readonly Guid _lockId = Guid.NewGuid();
private readonly IWorkflowSerializer _instanceSerializer = instanceSerializer;

private class StreamWrapperWithDiposeEvent(Stream stream, Action onDispose) : Stream
private class StreamWrapperWithDisposeEvent : Stream
{
private readonly Stream _stream;
public Action<Guid, Stream> OnDispose { get; init; }
private readonly Guid _instanceId;

public StreamWrapperWithDisposeEvent(Stream stream, Guid instanceId)
{
_stream = stream;
_instanceId = instanceId;
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
onDispose?.Invoke();
_stream.Dispose();
OnDispose?.Invoke(_instanceId, _stream);
}

public override bool CanRead => stream.CanRead;
public override bool CanRead => _stream.CanRead;

public override bool CanSeek => stream.CanSeek;
public override bool CanSeek => _stream.CanSeek;

public override bool CanWrite => stream.CanWrite;
public override bool CanWrite => _stream.CanWrite;

public override long Length => stream.Length;
public override long Length => _stream.Length;

public override long Position { get => stream.Position; set => stream.Position = value; }
public override long Position { get => _stream.Position; set => _stream.Position = value; }

public override void Flush()
{
stream.Flush();
_stream.Flush();
}

public override int Read(byte[] buffer, int offset, int count)
{
return stream.Read(buffer, offset, count);
return _stream.Read(buffer, offset, count);
}

public override long Seek(long offset, SeekOrigin origin)
{
return stream.Seek(offset, origin);
return _stream.Seek(offset, origin);
}

public override void SetLength(long value)
{
stream.SetLength(value);
_stream.SetLength(value);
}

public override void Write(byte[] buffer, int offset, int count)
{
stream.Write(buffer, offset, count);
_stream.Write(buffer, offset, count);
}
}

protected Stream OnStreamDispose(Stream stream, Action onDispose) => new StreamWrapperWithDiposeEvent(stream, onDispose);
protected virtual void OnReadStreamDisposed(Guid instanceId, Stream stream) { }
protected virtual void OnWriteStreamDisposed(Guid instanceId, Stream stream) { }

protected abstract Task<Stream> GetReadStream(Guid instanceId);
protected abstract Task<Stream> GetWriteStream(Guid instanceId);
Expand Down Expand Up @@ -155,13 +167,21 @@ private async Task<bool> TryCommandAsync(InstancePersistenceContext context, Ins

private async Task LoadWorkflow(InstancePersistenceContext context)
{
using var stream = await GetReadStream(context.InstanceView.InstanceId);
var originalStream = await GetReadStream(context.InstanceView.InstanceId);
using var stream = new StreamWrapperWithDisposeEvent(originalStream, context.InstanceView.InstanceId)
{
OnDispose = OnReadStreamDisposed
};
context.LoadedInstance(InstanceState.Initialized, _instanceSerializer.LoadWorkflowInstance(stream), null, null, null);
}

private async Task SaveWorkflow(InstancePersistenceContext context, SaveWorkflowCommand command)
{
using var stream = await GetWriteStream(context.InstanceView.InstanceId);
var originalStream = await GetWriteStream(context.InstanceView.InstanceId);
using var stream = new StreamWrapperWithDisposeEvent(originalStream, context.InstanceView.InstanceId)
{
OnDispose = OnWriteStreamDisposed
};
_instanceSerializer.SaveWorkflowInstance(command.InstanceData, stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ public FileInstanceStore(IWorkflowSerializer workflowSerializer, string storeDir

protected override Task<Stream> GetReadStream(Guid instanceId)
{
return Task.FromResult<Stream>(File.OpenRead(_storeDirectoryPath + "\\" + instanceId + "-InstanceData"));
return Task.FromResult<Stream>(File.OpenRead(GetFilePath(instanceId)));
}

protected override Task<Stream> GetWriteStream(Guid instanceId)
{
var filePath = _storeDirectoryPath + "\\" + instanceId + "-InstanceData";
string filePath = GetFilePath(instanceId);
File.Delete(filePath);
return Task.FromResult<Stream>(File.OpenWrite(filePath));
}

private string GetFilePath(Guid instanceId)
{
return _storeDirectoryPath + "\\" + instanceId + "-InstanceData";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

namespace WorkflowApplicationTestExtensions.Persistence;

public class MemoryInstanceStore(IWorkflowSerializer workflowSerializer) : AbstractInstanceStore(workflowSerializer)
{
private readonly ConcurrentDictionary<Guid, MemoryStream> _cache = new();

public MemoryInstanceStore() : this(new JsonWorkflowSerializer()) { }

protected override void OnReadStreamDisposed(Guid instanceId, Stream stream)
=> _cache.Remove(instanceId, out _);

protected override Task<Stream> GetReadStream(Guid instanceId)
{
_cache[instanceId].TryGetBuffer(out var buffer);
return Task.FromResult<Stream>(new MemoryStream(buffer.Array, buffer.Offset, buffer.Count));
}

protected override Task<Stream> GetWriteStream(Guid instanceId)
=> Task.FromResult<Stream>(_cache[instanceId] = new MemoryStream());
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static WorkflowApplicationResult RunUntilCompletion(this WorkflowApplicat
output.TrySetException(args.Reason);
};

application.InstanceStore ??= new FileInstanceStore(Environment.CurrentDirectory);
application.InstanceStore ??= new MemoryInstanceStore(new DataContractWorkflowSerializer());
application.Unloaded += uargs =>
{
Debug.WriteLine("Unloaded");
Expand Down

0 comments on commit 6df2a97

Please sign in to comment.