Skip to content

Commit

Permalink
fixed missing buffer allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed May 9, 2017
1 parent 61267a1 commit 58b2d68
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 240 deletions.
13 changes: 7 additions & 6 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Expand Up @@ -2990,8 +2990,8 @@ namespace Akka.IO.Buffers
{
void Release(System.ArraySegment<byte> buf);
void Release(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buf);
System.ArraySegment<byte> Take();
System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> Take(int minimumSize);
System.ArraySegment<byte> Rent();
System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> Rent(int minimumSize);
}
}
namespace Akka.IO
Expand All @@ -3004,6 +3004,7 @@ namespace Akka.IO
[System.Diagnostics.DebuggerDisplayAttribute("(Count = {_count}, Buffers = {_buffers})")]
public sealed class ByteString : System.Collections.Generic.IEnumerable<byte>, System.Collections.IEnumerable, System.IEquatable<Akka.IO.ByteString>
{
public System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> Buffers { get; }
public int Count { get; }
public static Akka.IO.ByteString Empty { get; }
public bool IsCompact { get; }
Expand Down Expand Up @@ -3513,9 +3514,9 @@ namespace Akka.IO
}
public sealed class Send : Akka.IO.Udp.Command
{
public Send(Akka.IO.ByteString payload, System.Net.EndPoint target, Akka.IO.Udp.Event ack) { }
public Send(System.Collections.Generic.IEnumerator<System.ArraySegment<byte>> payload, System.Net.EndPoint target, Akka.IO.Udp.Event ack) { }
public Akka.IO.Udp.Event Ack { get; }
public Akka.IO.ByteString Payload { get; }
public System.Collections.Generic.IEnumerator<System.ArraySegment<byte>> Payload { get; }
public System.Net.EndPoint Target { get; }
public bool WantsAck { get; }
public static Akka.IO.Udp.Send Create(Akka.IO.ByteString data, System.Net.EndPoint target) { }
Expand Down Expand Up @@ -3614,9 +3615,9 @@ namespace Akka.IO
}
public sealed class Send : Akka.IO.UdpConnected.Command
{
public Send(Akka.IO.ByteString payload, object ack) { }
public Send(System.Collections.Generic.IEnumerator<System.ArraySegment<byte>> payload, object ack) { }
public object Ack { get; }
public Akka.IO.ByteString Payload { get; }
public System.Collections.Generic.IEnumerator<System.ArraySegment<byte>> Payload { get; }
public bool WantsAck { get; }
public static Akka.IO.UdpConnected.Send Create(Akka.IO.ByteString data) { }
}
Expand Down
9 changes: 5 additions & 4 deletions src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Expand Up @@ -30,6 +30,7 @@ public class TcpSpec : TcpHelper
{
public TcpSpec(ITestOutputHelper helper) : base(@"
akka.loglevel = DEBUG
#akka.io.tcp.trace-logging = true
akka.stream.materializer.subscription-timeout.timeout = 2s", helper)
{
}
Expand Down Expand Up @@ -194,7 +195,7 @@ public void Outgoing_TCP_stream_must_work_when_remote_closes_write_then_client_c
}, Materializer);
}

[Fact(Skip = "FIXME")]
[Fact]
public void Outgoing_TCP_stream_must_work_when_client_closes_read_then_client_closes_write()
{
this.AssertAllStagesStopped(() =>
Expand Down Expand Up @@ -421,12 +422,12 @@ public void Outgoing_TCP_stream_must_properly_full_close_if_requested()
var task =
Sys.TcpStream()
.Bind(serverAddress.Address.ToString(), serverAddress.Port)
.Bind(serverAddress.Address.ToString(), serverAddress.Port, halfClose: false)
.ToMaterialized(
Sink.ForEach<Tcp.IncomingConnection>(conn => conn.Flow.Join(writeButIgnoreRead).Run(Materializer)),
Keep.Left)
.Run(Materializer);
task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
task.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
var binding = task.Result;
var t = Source.Maybe<ByteString>()
Expand All @@ -436,7 +437,7 @@ public void Outgoing_TCP_stream_must_properly_full_close_if_requested()
var promise = t.Item1;
var result = t.Item2;
result.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
result.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue();
result.Result.ShouldBeEquivalentTo(ByteString.FromString("Early response"));
promise.SetResult(null); // close client upstream, no more data
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Expand Up @@ -23,7 +23,8 @@ namespace Akka.Tests.IO
public class TcpIntegrationSpec : AkkaSpec
{
public TcpIntegrationSpec(ITestOutputHelper output)
: base(@"akka.loglevel = DEBUG", output: output)
: base(@"akka.loglevel = DEBUG
akka.io.tcp.trace-logging = true", output: output)
{ }

private void VerifyActorTermination(IActorRef actor)
Expand Down
33 changes: 7 additions & 26 deletions src/core/Akka/IO/Tcp.cs
Expand Up @@ -435,10 +435,7 @@ public abstract class SimpleWriteCommand : WriteCommand
/// <summary>
/// TBD
/// </summary>
public bool WantsAck
{
get { return !(Ack is NoAck); }
}
public bool WantsAck => !(Ack is NoAck);

/// <summary>
/// TBD
Expand Down Expand Up @@ -468,7 +465,6 @@ public class Write : SimpleWriteCommand
/// </summary>
public static readonly Write Empty = new Write(ByteString.Empty, NoAck.Instance);

private readonly Event _ack;
/// <summary>
/// TBD
/// </summary>
Expand All @@ -477,14 +473,11 @@ public class Write : SimpleWriteCommand
/// <summary>
/// TBD
/// </summary>
public override Event Ack
{
get { return _ack; }
}
public override Event Ack { get; }

private Write(ByteString data, Event ack)
{
_ack = ack;
Ack = ack;
Data = data;
}

Expand Down Expand Up @@ -943,10 +936,7 @@ private Aborted()
/// <summary>
/// TBD
/// </summary>
public override bool IsAborted
{
get { return true; }
}
public override bool IsAborted => true;
}

/// <summary>
Expand All @@ -967,10 +957,7 @@ private ConfirmedClosed()
/// <summary>
/// TBD
/// </summary>
public override bool IsConfirmed
{
get { return true; }
}
public override bool IsConfirmed => true;
}

/// <summary>
Expand Down Expand Up @@ -1012,19 +999,13 @@ public ErrorClosed(string cause)
/// <summary>
/// TBD
/// </summary>
public override bool IsErrorClosed
{
get { return true; }
}
public override bool IsErrorClosed => true;

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public override string GetErrorCause()
{
return _cause;
}
public override string GetErrorCause() => _cause;

public override string ToString() =>
$"ErrorClosed('{_cause}')";
Expand Down

0 comments on commit 58b2d68

Please sign in to comment.