Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Akka.Stream file stream #4874

Merged
merged 5 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 212 additions & 94 deletions src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,22 @@ public void SynchronousFileSink_should_write_lines_to_a_file()
{
TargetFile(f =>
{
var completion = Source.From(_testByteStrings).RunWith(FileIO.ToFile(f), _materializer);
var (killSwitch, completion) = Source.From(_testByteStrings)
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f), Keep.Both)
.Run(_materializer);

completion.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion.Result;
result.Count.Should().Be(6006);
CheckFileContent(f, _testLines.Aggregate((s, s1) => s + s1));
try
{
completion.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion.Result;
result.Count.Should().Be(6006);
CheckFileContent(f, _testLines.Aggregate((s, s1) => s + s1));
}
finally
{
killSwitch.Shutdown();
}
});
}, _materializer);
}
Expand All @@ -76,11 +86,23 @@ public void SynchronousFileSink_should_create_new_file_if_not_exists()
{
TargetFile(f =>
{
var completion = Source.From(_testByteStrings).RunWith(FileIO.ToFile(f), _materializer);
completion.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion.Result;
result.Count.Should().Be(6006);
CheckFileContent(f, _testLines.Aggregate((s, s1) => s + s1));
var (killSwitch, completion) = Source.From(_testByteStrings)
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f), Keep.Both)
.Run(_materializer);

try
{
completion.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion.Result;
result.Count.Should().Be(6006);
CheckFileContent(f, _testLines.Aggregate((s, s1) => s + s1));
}
catch (Exception)
{
killSwitch.Shutdown();
throw;
}
}, false);
}, _materializer);
}
Expand All @@ -92,25 +114,43 @@ public void SynchronousFileSink_should_write_into_existing_file_without_wiping_e
{
TargetFile(f =>
{
Task<IOResult> Write(IEnumerable<string> lines) => Source.From(lines)
(UniqueKillSwitch, Task<IOResult>) Write(IEnumerable<string> lines) => Source.From(lines)
.Select(ByteString.FromString)
.RunWith(FileIO.ToFile(f, FileMode.OpenOrCreate), _materializer);

var completion1 = Write(_testLines);
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

var lastWrite = new string[100];
for (var i = 0; i < 100; i++)
lastWrite[i] = "x";

var completion2 = Write(lastWrite);
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion2.Result;
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f, FileMode.OpenOrCreate), Keep.Both)
.Run(_materializer);

var lastWriteString = new string(lastWrite.SelectMany(x => x).ToArray());
result.Count.Should().Be(lastWriteString.Length);
var testLinesString = new string(_testLines.SelectMany(x => x).ToArray());
CheckFileContent(f, lastWriteString + testLinesString.Substring(100));
var (killSwitch1, completion1) = Write(_testLines);
try
{
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

var lastWrite = new string[100];
for (var i = 0; i < 100; i++)
lastWrite[i] = "x";

var (killSwitch2, completion2) = Write(lastWrite);
try
{
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result = completion2.Result;

var lastWriteString = new string(lastWrite.SelectMany(x => x).ToArray());
result.Count.Should().Be(lastWriteString.Length);
var testLinesString = new string(_testLines.SelectMany(x => x).ToArray());
CheckFileContent(f, lastWriteString + testLinesString.Substring(100));
}
catch (Exception)
{
killSwitch2.Shutdown();
throw;
}
}
catch (Exception)
{
killSwitch1.Shutdown();
throw;
}
});
}, _materializer);
}
Expand All @@ -122,17 +162,36 @@ public void SynchronousFileSink_should_by_default_replace_the_existing_file()
{
TargetFile(f =>
{
Task<IOResult> Write(List<string> lines) =>
(UniqueKillSwitch, Task<IOResult>) Write(List<string> lines) =>
Source.From(lines).Select(ByteString.FromString)
.RunWith(FileIO.ToFile(f), _materializer);
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f), Keep.Both)
.Run(_materializer);

Write(_testLines).AwaitResult();

var lastWrite = Enumerable.Range(0, 100).Select(_ => "x").ToList();
var result = Write(lastWrite).AwaitResult();

result.Count.Should().Be(lastWrite.Count);
CheckFileContent(f, string.Join("", lastWrite));
var (killSwitch1, task1) = Write(_testLines);
try
{
task1.AwaitResult();
var lastWrite = Enumerable.Range(0, 100).Select(_ => "x").ToList();
var (killSwitch2, task2) = Write(lastWrite);
try
{
var result = task2.AwaitResult();

result.Count.Should().Be(lastWrite.Count);
CheckFileContent(f, string.Join("", lastWrite));
}
catch (Exception)
{
killSwitch2.Shutdown();
throw;
}
}
catch (Exception)
{
killSwitch1.Shutdown();
throw;
}
});
}, _materializer);
}
Expand All @@ -144,29 +203,47 @@ public void SynchronousFileSink_should_allow_appending_to_file()
{
TargetFile(f =>
{
Task<IOResult> Write(List<string> lines) => Source.From(lines)
(UniqueKillSwitch, Task<IOResult>) Write(List<string> lines) => Source.From(lines)
.Select(ByteString.FromString)
.RunWith(FileIO.ToFile(f, fileMode: FileMode.Append), _materializer);

var completion1 = Write(_testLines);
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result1 = completion1.Result;

var lastWrite = new List<string>();
for (var i = 0; i < 100; i++)
lastWrite.Add("x");

var completion2 = Write(lastWrite);
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result2 = completion2.Result;

var lastWriteString = new string(lastWrite.SelectMany(x => x).ToArray());
var testLinesString = new string(_testLines.SelectMany(x => x).ToArray());

f.Length.Should().Be(result1.Count + result2.Count);
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f, fileMode: FileMode.Append), Keep.Both)
.Run(_materializer);

//NOTE: no new line at the end of the file - does JVM/linux appends new line at the end of the file in append mode?
CheckFileContent(f, testLinesString + lastWriteString);
var (killSwitch1, completion1) = Write(_testLines);
try
{
completion1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result1 = completion1.Result;

var lastWrite = new List<string>();
for (var i = 0; i < 100; i++)
lastWrite.Add("x");

var (killSwitch2, completion2) = Write(lastWrite);
try
{
completion2.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var result2 = completion2.Result;

var lastWriteString = new string(lastWrite.SelectMany(x => x).ToArray());
var testLinesString = new string(_testLines.SelectMany(x => x).ToArray());

f.Length.Should().Be(result1.Count + result2.Count);

//NOTE: no new line at the end of the file - does JVM/linux appends new line at the end of the file in append mode?
CheckFileContent(f, testLinesString + lastWriteString);
}
catch (Exception)
{
killSwitch2.Shutdown();
throw;
}
}
catch (Exception)
{
killSwitch1.Shutdown();
throw;
}
});
}, _materializer);
}
Expand Down Expand Up @@ -195,18 +272,29 @@ public void SynchronousFileSink_should_allow_writing_from_specific_position_to_t
new string('x', 1000) + "\n",
};

Task<IOResult> Write(List<string> lines, long pos) => Source.From(lines)
(UniqueKillSwitch, Task<IOResult>) Write(List<string> lines, long pos) => Source.From(lines)
.Select(ByteString.FromString)
.RunWith(FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: pos), _materializer);
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: pos), Keep.Both)
.Run(_materializer);

var completion1 = Write(_testLines, 0);
var result1 = completion1.AwaitResult();
var (killSwitch1, completion1) = Write(_testLines, 0);
var (killSwitch2, completion2) = Write(testLinesPart2, startPosition);

var completion2 = Write(testLinesPart2, startPosition);
var result2 = completion2.AwaitResult();
try
{
completion1.AwaitResult();
var result2 = completion2.AwaitResult();

f.Length.ShouldBe(startPosition + result2.Count);
CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join(""));
f.Length.ShouldBe(startPosition + result2.Count);
CheckFileContent(f, testLinesCommon.Join("") + testLinesPart2.Join(""));
}
catch (Exception)
{
killSwitch1.Shutdown();
killSwitch2.Shutdown();
throw;
}
});
}, _materializer);
}
Expand All @@ -224,18 +312,27 @@ public void SynchronousFileSink_should_use_dedicated_blocking_io_dispatcher_by_d
try
{
//hack for Iterator.continually
Source
var killSwitch = Source
.FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), int.MaxValue).GetEnumerator())
.RunWith(FileIO.ToFile(f), materializer);

((ActorMaterializerImpl)materializer)
.Supervisor
.Tell(StreamSupervisor.GetChildren.Instance, TestActor);
var refs = ExpectMsg<StreamSupervisor.Children>().Refs;
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("fileSink"));

// haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(FileIO.ToFile(f), Keep.Left)
.Run(materializer);
try
{
((ActorMaterializerImpl)materializer)
.Supervisor
.Tell(StreamSupervisor.GetChildren.Instance, TestActor);
var refs = ExpectMsg<StreamSupervisor.Children>().Refs;
var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("fileSink"));

// haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
}
catch (Exception)
{
killSwitch.Shutdown();
throw;
}
}
finally
{
Expand All @@ -259,7 +356,7 @@ public void SynchronousFileSink_should_allow_overriding_the_dispatcher_using_Att
try
{
//hack for Iterator.continually
Source.FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), Int32.MaxValue).GetEnumerator())
Source.FromEnumerator(() => Enumerable.Repeat(_testByteStrings.Head(), int.MaxValue).GetEnumerator())
.To(FileIO.ToFile(f))
.WithAttributes(ActorAttributes.CreateDispatcher("akka.actor.default-dispatcher"));
//.Run(materializer);
Expand Down Expand Up @@ -288,11 +385,21 @@ public void SynchronousFileSink_should_write_single_line_to_a_file_from_lazy_sin
() => Task.FromResult(IOResult.Success(0)))
.MapMaterializedValue(t => t.AwaitResult());

var completion = Source.From(new []{_testByteStrings.Head()})
.RunWith(lazySink, _materializer);
var (killSwitch, completion) = Source.From(new []{_testByteStrings.Head()})
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Right)
.ToMaterialized(lazySink, Keep.Both)
.Run(_materializer);

completion.AwaitResult();
CheckFileContent(f, _testLines.Head());
try
{
completion.AwaitResult();
CheckFileContent(f, _testLines.Head());
}
catch(Exception)
{
killSwitch.Shutdown();
throw;
}
});
}, _materializer);
}
Expand All @@ -304,26 +411,37 @@ public void SynchronousFileSink_should_write_each_element_if_auto_flush_is_set()
{
TargetFile(f =>
{
var (actor, task) = Source.ActorRef<string>(64, OverflowStrategy.DropNew)
var ((actor, killSwitch), task) = Source.ActorRef<string>(64, OverflowStrategy.DropNew)
.Select(ByteString.FromString)
.ToMaterialized(FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: 0, autoFlush:true), (a, t) => (a, t))
.ViaMaterialized(KillSwitches.Single<ByteString>(), Keep.Both)
.ToMaterialized(
FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: 0, autoFlush:true),
(a, t) => (a, t))
.Run(_materializer);

actor.Tell("a\n");
actor.Tell("b\n");
try
{
actor.Tell("a\n");
actor.Tell("b\n");

// wait for flush
Thread.Sleep(100);
CheckFileContent(f, "a\nb\n");
// wait for flush
Thread.Sleep(100);
CheckFileContent(f, "a\nb\n");

actor.Tell("a\n");
actor.Tell("b\n");
actor.Tell("a\n");
actor.Tell("b\n");

actor.Tell(new Status.Success(NotUsed.Instance));
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
actor.Tell(new Status.Success(NotUsed.Instance));
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

f.Length.ShouldBe(8);
CheckFileContent(f, "a\nb\na\nb\n");
f.Length.ShouldBe(8);
CheckFileContent(f, "a\nb\na\nb\n");
}
catch(Exception)
{
killSwitch.Shutdown();
throw;
}
});
}, _materializer);
}
Expand Down
Loading