Skip to content

Commit

Permalink
Migrated Poll tests #4
Browse files Browse the repository at this point in the history
  • Loading branch information
StormHub committed Mar 2, 2017
1 parent 722ba41 commit c259b53
Show file tree
Hide file tree
Showing 31 changed files with 718 additions and 223 deletions.
7 changes: 6 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
#### 0.1.0 February 25 2016
First public preview release
First public preview release

#### 0.1.14 March 2 2016
- Removed StreamListener class and stream listen returns handles directly.
- Added a user token object to hold arbitrary objects for all handles and requests.
- Fixed an issue where FSPoll trying to get file information even error occurred.
17 changes: 8 additions & 9 deletions src/NetUV.Core/Handles/HandleContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,19 @@ protected override void CloseHandle()

internal static T GetTarget<T>(IntPtr handle)
{
if (handle == IntPtr.Zero)
{
throw new InvalidOperationException($"{nameof(handle)} cannot be empty.");
}
Contract.Requires(handle != IntPtr.Zero);

IntPtr inernalHandle = ((uv_handle_t*)handle)->data;
GCHandle gcHandle = GCHandle.FromIntPtr(inernalHandle);

if (!gcHandle.IsAllocated)
if (inernalHandle != IntPtr.Zero)
{
return default(T);
GCHandle gcHandle = GCHandle.FromIntPtr(inernalHandle);
if (gcHandle.IsAllocated)
{
return (T)gcHandle.Target;
}
}

return (T)gcHandle.Target;
return default(T);
}

static void OnCloseHandle(IntPtr handle)
Expand Down
63 changes: 14 additions & 49 deletions src/NetUV.Core/Handles/HandleExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,20 @@ namespace NetUV.Core.Handles

public static class HandleExtensions
{
internal const int DefaultBacklog = 128;

public static StreamListener<Pipe> Listen(this Pipe pipe,
Action<Pipe, Exception> connectionHandler,
int backlog = DefaultBacklog)
{
Contract.Requires(pipe != null);
Contract.Requires(connectionHandler != null);
Contract.Requires(backlog > 0);

var listener = new StreamListener<Pipe>(pipe, connectionHandler);
listener.Listen(backlog);

return listener;
}

public static StreamListener<Pipe> Listen(this Pipe pipe,
public static Pipe Listen(this Pipe pipe,
string name,
Action<Pipe, Exception> connectionHandler,
int backlog = DefaultBacklog)
Action<Pipe, Exception> onConnection,
int backlog = ServerStream.DefaultBacklog)
{
Contract.Requires(pipe != null);
Contract.Requires(!string.IsNullOrEmpty(name));
Contract.Requires(connectionHandler != null);
Contract.Requires(onConnection != null);
Contract.Requires(backlog > 0);

pipe.Bind(name);
var listener = new StreamListener<Pipe>(pipe, connectionHandler);
listener.Listen(backlog);
pipe.Listen(onConnection, backlog);

return listener;
return pipe;
}

public static Pipe ConnectTo(this Pipe pipe,
Expand All @@ -56,45 +39,29 @@ public static class HandleExtensions
{
request = new PipeConnect(pipe, remoteName, connectionHandler);
}
catch (Exception exception)
catch (Exception)
{
ScheduleHandle.Log.Error($"{pipe.HandleType} {pipe.InternalHandle} Failed to connect to {remoteName}", exception);
request?.Dispose();
throw;
}

return pipe;
}

public static StreamListener<Tcp> Listen(this Tcp tcp,
Action<Tcp, Exception> connectionHandler,
int backlog = DefaultBacklog)
{
Contract.Requires(tcp != null);
Contract.Requires(connectionHandler != null);
Contract.Requires(backlog > 0);

var listener = new StreamListener<Tcp>(tcp, connectionHandler);
listener.Listen(backlog);

return listener;
}

public static StreamListener<Tcp> Listen(this Tcp tcp,
public static Tcp Listen(this Tcp tcp,
IPEndPoint localEndPoint,
Action<Tcp, Exception> connectionHandler,
int backlog = DefaultBacklog,
Action<Tcp, Exception> onConnection,
int backlog = ServerStream.DefaultBacklog,
bool dualStack = false)
{
Contract.Requires(tcp != null);
Contract.Requires(localEndPoint != null);
Contract.Requires(connectionHandler != null);
Contract.Requires(onConnection != null);

tcp.Bind(localEndPoint, dualStack);
var listener = new StreamListener<Tcp>(tcp, connectionHandler);
listener.Listen(backlog);
tcp.Listen(onConnection, backlog);

return listener;
return tcp;
}

public static Tcp ConnectTo(this Tcp tcp,
Expand Down Expand Up @@ -128,9 +95,8 @@ public static class HandleExtensions
{
request = new TcpConnect(tcp, remoteEndPoint, connectedHandler);
}
catch (Exception exception)
catch (Exception)
{
ScheduleHandle.Log.Error($"{tcp.HandleType} {tcp.InternalHandle} Failed to connect to {remoteEndPoint}", exception);
request?.Dispose();
throw;
}
Expand Down Expand Up @@ -169,7 +135,6 @@ public static class HandleExtensions
return udp;
}


public static Udp ReceiveStart(this Udp udp,
Action<Udp, IDatagramReadCompletion> receiveAction)
{
Expand Down
24 changes: 22 additions & 2 deletions src/NetUV.Core/Handles/Loop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,30 @@ public Async CreateAsync(Action<Async> callback)
return new Async(this.handle, callback);
}

public Poll CreatePoll(int fd)
public Poll CreatePoll(int fileDescriptor)
{
if (Platform.IsWindows)
{
throw new InvalidOperationException(
"Poll handle file descriptor is not supported on Windows platform");
}
this.handle.Validate();
return new Poll(this.handle, fd);

return new Poll(this.handle, fileDescriptor);
}

public Poll CreatePoll(IntPtr socket)
{
Contract.Requires(socket != IntPtr.Zero);

if (!Platform.IsWindows)
{
throw new InvalidOperationException(
"Poll handle socket is not supported on non Windows platform");
}
this.handle.Validate();

return new Poll(this.handle, socket);
}

public Signal CreateSignal()
Expand Down
9 changes: 9 additions & 0 deletions src/NetUV.Core/Handles/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ protected override unsafe StreamHandle NewStream()
return new Pipe(loop);
}

public Pipe Listen(Action<Pipe, Exception> onConnection, int backlog = DefaultBacklog)
{
Contract.Requires(onConnection != null);
Contract.Requires(backlog > 0);

this.StreamListen((handle, exception) => onConnection((Pipe)handle, exception), backlog);
return this;
}

public void CloseHandle(Action<Pipe> callback = null) =>
base.CloseHandle(state => callback?.Invoke((Pipe)state));
}
Expand Down
10 changes: 10 additions & 0 deletions src/NetUV.Core/Handles/Poll.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ internal Poll(LoopContext loop, int fd)
: base(loop, uv_handle_type.UV_POLL, fd)
{ }

internal Poll(LoopContext loop, IntPtr handle)
: base(loop, uv_handle_type.UV_POLL, handle)
{ }

public IntPtr GetFileDescriptor()
{
this.Validate();
return NativeMethods.GetFileDescriptor(this.InternalHandle);
}

public Poll Start(PollMask eventMask, Action<Poll, PollMask> callback)
{
Contract.Requires(callback != null);
Expand Down
3 changes: 3 additions & 0 deletions src/NetUV.Core/Handles/ScheduleHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public abstract class ScheduleHandle : IDisposable

public bool IsValid => this.handle.IsValid;

public object UserToken { get; set; }

internal IntPtr InternalHandle => this.handle.Handle;

internal uv_handle_type HandleType { get; }
Expand All @@ -57,6 +59,7 @@ internal void OnHandleClosed()
finally
{
this.closeCallback = null;
this.UserToken = null;
}
}

Expand Down
34 changes: 30 additions & 4 deletions src/NetUV.Core/Handles/ServerStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
namespace NetUV.Core.Handles
{
using System;
using System.Diagnostics.Contracts;
using NetUV.Core.Native;

public abstract class ServerStream : StreamHandle
{
internal const int DefaultBacklog = 128;

internal static readonly uv_watcher_cb ConnectionCallback = OnConnectionCallback;
Action<StreamHandle, Exception> connectionHandler;

internal ServerStream(
LoopContext loop,
Expand All @@ -17,10 +21,33 @@ public abstract class ServerStream : StreamHandle
: base(loop, handleType, args)
{ }

internal Action<StreamHandle, Exception> ConnectionHandler { get; set; }

protected abstract StreamHandle NewStream();

protected void StreamListen(Action<StreamHandle, Exception> onConnection, int backlog = DefaultBacklog)
{
Contract.Requires(this.connectionHandler != null);
Contract.Requires(backlog > 0);

this.Validate();
this.connectionHandler = onConnection;
try
{
NativeMethods.StreamListen(this.InternalHandle, backlog, ConnectionCallback);
Log.DebugFormat("Stream {0} {1} listening, backlog = {2}", this.HandleType, this.InternalHandle, backlog);
}
catch
{
this.Dispose();
throw;
}
}

protected override void Close()
{
this.connectionHandler = null;
base.Close();
}

static void OnConnectionCallback(IntPtr handle, int status)
{
var server = HandleContext.GetTarget<ServerStream>(handle);
Expand Down Expand Up @@ -48,11 +75,10 @@ static void OnConnectionCallback(IntPtr handle, int status)

NativeMethods.StreamAccept(server.InternalHandle, client.InternalHandle);
client.ReadStart();

Log.DebugFormat("{0} {1} client {2} accepted", server.HandleType, handle, client.InternalHandle);
}

server.ConnectionHandler.Invoke(client, error);
server.connectionHandler.Invoke((ServerStream)client, error);
}
catch
{
Expand Down
6 changes: 6 additions & 0 deletions src/NetUV.Core/Handles/StreamHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ protected int ReceiveBufferSize(int value)
return NativeMethods.ReceiveBufferSize(this.InternalHandle, value);
}

public IntPtr GetFileDescriptor()
{
this.Validate();
return NativeMethods.GetFileDescriptor(this.InternalHandle);
}

public unsafe long GetWriteQueueSize()
{
this.Validate();
Expand Down
Loading

0 comments on commit c259b53

Please sign in to comment.