diff --git a/src/Test/TestCases.Runtime/ParallelBranchTests.cs b/src/Test/TestCases.Runtime/ParallelBranchTests.cs index 0aced170..15e93eef 100644 --- a/src/Test/TestCases.Runtime/ParallelBranchTests.cs +++ b/src/Test/TestCases.Runtime/ParallelBranchTests.cs @@ -4,10 +4,17 @@ using System.Activities.ParallelTracking; using System.Linq; using WorkflowApplicationTestExtensions; +using WorkflowApplicationTestExtensions.Persistence; using Xunit; namespace TestCases.Runtime; + +public class ParallelBranchTestsJson : ParallelBranchTests +{ + protected override IWorkflowSerializer Serializer => new JsonWorkflowSerializer(); +} + public class ParallelBranchTests { public class TestCodeActivity(Action onExecute) : CodeActivity @@ -18,12 +25,16 @@ protected override void Execute(CodeActivityContext context) } } + protected virtual IWorkflowSerializer Serializer => new DataContractWorkflowSerializer(); ParallelBranch parentLevel = default; private void Run(params Action[] onExecute) { var execs = new Action[] { new(SetParent) } .Concat(onExecute); - new WorkflowApplication(new SuspendingWrapper(execs.Select(c => new TestCodeActivity(c)))) + new WorkflowApplication(new SuspendingWrapper(execs.Select(c => new TestCodeActivity(c)))) + { + InstanceStore = new MemoryInstanceStore(Serializer) + } .RunUntilCompletion(); void SetParent(CodeActivityContext context) diff --git a/src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs b/src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs index 8cc4bab9..9de59402 100644 --- a/src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs +++ b/src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs @@ -7,11 +7,19 @@ using System.Linq; using WorkflowApplicationTestExtensions; using Xunit; +using WorkflowApplicationTestExtensions.Persistence; namespace TestCases.Runtime { + public class ParallelTrackingExtensionsTestsJson : ParallelTrackingExtensionsTests + { + protected override IWorkflowSerializer Serializer => new JsonWorkflowSerializer(); + } + public class ParallelTrackingExtensionsTests { + protected virtual IWorkflowSerializer Serializer => new DataContractWorkflowSerializer(); + [Fact] public void ParallelActivity() { @@ -89,8 +97,12 @@ public void PickActivity() trigger1Id.ShouldNotBe(trigger2Id); } - private static void Run(Activity activity) => - new WorkflowApplication(activity).RunUntilCompletion(); + private void Run(Activity activity) => + new WorkflowApplication(activity) + { + InstanceStore = new MemoryInstanceStore(Serializer) + } + .RunUntilCompletion(); private static void ValidateId(string id, int expectedNesting, string shouldStartWith = null) { diff --git a/src/Test/WorkflowApplicationTestExtensions/Persistence/AbstractInstanceStore.cs b/src/Test/WorkflowApplicationTestExtensions/Persistence/AbstractInstanceStore.cs index 52586dc5..dd34b541 100644 --- a/src/Test/WorkflowApplicationTestExtensions/Persistence/AbstractInstanceStore.cs +++ b/src/Test/WorkflowApplicationTestExtensions/Persistence/AbstractInstanceStore.cs @@ -41,55 +41,67 @@ public static XInstanceDictionary ToNameDictionary(object source) => ((InstanceD public abstract class AbstractInstanceStore(IWorkflowSerializer instanceSerializer) : InstanceStore { - private Guid _storageInstanceId = Guid.NewGuid(); - private Guid _lockId = Guid.NewGuid(); + private readonly Guid _storageInstanceId = Guid.NewGuid(); + private readonly Guid _lockId = Guid.NewGuid(); private readonly IWorkflowSerializer _instanceSerializer = instanceSerializer; - private class StreamWrapperWithDiposeEvent(Stream stream, Action onDispose) : Stream + private class StreamWrapperWithDisposeEvent : Stream { + private readonly Stream _stream; + public Action OnDispose { get; init; } + private readonly Guid _instanceId; + + public StreamWrapperWithDisposeEvent(Stream stream, Guid instanceId) + { + _stream = stream; + _instanceId = instanceId; + } + protected override void Dispose(bool disposing) { base.Dispose(disposing); - onDispose?.Invoke(); + _stream.Dispose(); + OnDispose?.Invoke(_instanceId, _stream); } - public override bool CanRead => stream.CanRead; + public override bool CanRead => _stream.CanRead; - public override bool CanSeek => stream.CanSeek; + public override bool CanSeek => _stream.CanSeek; - public override bool CanWrite => stream.CanWrite; + public override bool CanWrite => _stream.CanWrite; - public override long Length => stream.Length; + public override long Length => _stream.Length; - public override long Position { get => stream.Position; set => stream.Position = value; } + public override long Position { get => _stream.Position; set => _stream.Position = value; } public override void Flush() { - stream.Flush(); + _stream.Flush(); } public override int Read(byte[] buffer, int offset, int count) { - return stream.Read(buffer, offset, count); + return _stream.Read(buffer, offset, count); } public override long Seek(long offset, SeekOrigin origin) { - return stream.Seek(offset, origin); + return _stream.Seek(offset, origin); } public override void SetLength(long value) { - stream.SetLength(value); + _stream.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { - stream.Write(buffer, offset, count); + _stream.Write(buffer, offset, count); } } - protected Stream OnStreamDispose(Stream stream, Action onDispose) => new StreamWrapperWithDiposeEvent(stream, onDispose); + protected virtual void OnReadStreamDisposed(Guid instanceId, Stream stream) { } + protected virtual void OnWriteStreamDisposed(Guid instanceId, Stream stream) { } protected abstract Task GetReadStream(Guid instanceId); protected abstract Task GetWriteStream(Guid instanceId); @@ -155,13 +167,21 @@ private async Task TryCommandAsync(InstancePersistenceContext context, Ins private async Task LoadWorkflow(InstancePersistenceContext context) { - using var stream = await GetReadStream(context.InstanceView.InstanceId); + var originalStream = await GetReadStream(context.InstanceView.InstanceId); + using var stream = new StreamWrapperWithDisposeEvent(originalStream, context.InstanceView.InstanceId) + { + OnDispose = OnReadStreamDisposed + }; context.LoadedInstance(InstanceState.Initialized, _instanceSerializer.LoadWorkflowInstance(stream), null, null, null); } private async Task SaveWorkflow(InstancePersistenceContext context, SaveWorkflowCommand command) { - using var stream = await GetWriteStream(context.InstanceView.InstanceId); + var originalStream = await GetWriteStream(context.InstanceView.InstanceId); + using var stream = new StreamWrapperWithDisposeEvent(originalStream, context.InstanceView.InstanceId) + { + OnDispose = OnWriteStreamDisposed + }; _instanceSerializer.SaveWorkflowInstance(command.InstanceData, stream); } diff --git a/src/Test/WorkflowApplicationTestExtensions/Persistence/FileInstanceStore.cs b/src/Test/WorkflowApplicationTestExtensions/Persistence/FileInstanceStore.cs index d8a325b3..c7301e3a 100644 --- a/src/Test/WorkflowApplicationTestExtensions/Persistence/FileInstanceStore.cs +++ b/src/Test/WorkflowApplicationTestExtensions/Persistence/FileInstanceStore.cs @@ -18,14 +18,18 @@ public FileInstanceStore(IWorkflowSerializer workflowSerializer, string storeDir protected override Task GetReadStream(Guid instanceId) { - return Task.FromResult(File.OpenRead(_storeDirectoryPath + "\\" + instanceId + "-InstanceData")); + return Task.FromResult(File.OpenRead(GetFilePath(instanceId))); } protected override Task GetWriteStream(Guid instanceId) { - var filePath = _storeDirectoryPath + "\\" + instanceId + "-InstanceData"; + string filePath = GetFilePath(instanceId); File.Delete(filePath); return Task.FromResult(File.OpenWrite(filePath)); } + private string GetFilePath(Guid instanceId) + { + return _storeDirectoryPath + "\\" + instanceId + "-InstanceData"; + } } diff --git a/src/Test/WorkflowApplicationTestExtensions/Persistence/MemoryInstanceStore.cs b/src/Test/WorkflowApplicationTestExtensions/Persistence/MemoryInstanceStore.cs new file mode 100644 index 00000000..06bc27bc --- /dev/null +++ b/src/Test/WorkflowApplicationTestExtensions/Persistence/MemoryInstanceStore.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; + +namespace WorkflowApplicationTestExtensions.Persistence; + +public class MemoryInstanceStore(IWorkflowSerializer workflowSerializer) : AbstractInstanceStore(workflowSerializer) +{ + private readonly ConcurrentDictionary _cache = new(); + + public MemoryInstanceStore() : this(new JsonWorkflowSerializer()) { } + + protected override void OnReadStreamDisposed(Guid instanceId, Stream stream) + => _cache.Remove(instanceId, out _); + + protected override Task GetReadStream(Guid instanceId) + { + _cache[instanceId].TryGetBuffer(out var buffer); + return Task.FromResult(new MemoryStream(buffer.Array, buffer.Offset, buffer.Count)); + } + + protected override Task GetWriteStream(Guid instanceId) + => Task.FromResult(_cache[instanceId] = new MemoryStream()); +} + diff --git a/src/Test/WorkflowApplicationTestExtensions/WorkflowApplicationTestExtensions.cs b/src/Test/WorkflowApplicationTestExtensions/WorkflowApplicationTestExtensions.cs index 825a096e..10781c5b 100644 --- a/src/Test/WorkflowApplicationTestExtensions/WorkflowApplicationTestExtensions.cs +++ b/src/Test/WorkflowApplicationTestExtensions/WorkflowApplicationTestExtensions.cs @@ -43,7 +43,7 @@ public static WorkflowApplicationResult RunUntilCompletion(this WorkflowApplicat output.TrySetException(args.Reason); }; - application.InstanceStore ??= new FileInstanceStore(Environment.CurrentDirectory); + application.InstanceStore ??= new MemoryInstanceStore(new DataContractWorkflowSerializer()); application.Unloaded += uargs => { Debug.WriteLine("Unloaded");