Skip to content

Commit

Permalink
Implemented client for IPC connection over Unix Domain Sockets.
Browse files Browse the repository at this point in the history
UnixEndPoint - copyrigths to the mono.
  • Loading branch information
Marcin Sowa committed Jul 24, 2017
1 parent 2176ecf commit bc2b619
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 3 deletions.
23 changes: 21 additions & 2 deletions Nethereum.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26228.10
VisualStudioVersion = 15.0.26430.16
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{962EC435-6130-4B75-BAC1-12323B067443}"
EndProject
Expand Down Expand Up @@ -90,10 +90,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nethereum.Parity.Tests", "s
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nethereum.JsonRpc.RpcClient", "src\Nethereum.JsonRpc.RpcClient\Nethereum.JsonRpc.RpcClient.csproj", "{3F8D6B13-B483-4873-8946-106D25BF6212}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nethereum.Generator.Console", "src\Nethereum.Generator.Console\Nethereum.Generator.Console.csproj", "{5A9F1DA7-7931-4869-8E3A-B2F26DBFC303}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nethereum.Generator.Console", "src\Nethereum.Generator.Console\Nethereum.Generator.Console.csproj", "{5A9F1DA7-7931-4869-8E3A-B2F26DBFC303}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Nethereum.Unity", "src\Nethereum.Unity\Nethereum.Unity.csproj", "{A295CBA8-DBE4-44EF-A673-6372728CEDB0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nethereum.JsonRpc.UnixIpcClient", "src\Nethereum.JsonRpc.UnixIpcClient\Nethereum.JsonRpc.UnixIpcClient.csproj", "{F3FC9487-0A98-4180-BDED-771A1BA68F2E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -714,6 +716,22 @@ Global
{A295CBA8-DBE4-44EF-A673-6372728CEDB0}.Release|x64.Build.0 = Release|Any CPU
{A295CBA8-DBE4-44EF-A673-6372728CEDB0}.Release|x86.ActiveCfg = Release|Any CPU
{A295CBA8-DBE4-44EF-A673-6372728CEDB0}.Release|x86.Build.0 = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|ARM.ActiveCfg = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|ARM.Build.0 = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|x64.ActiveCfg = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|x64.Build.0 = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|x86.ActiveCfg = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Debug|x86.Build.0 = Debug|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|Any CPU.Build.0 = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|ARM.ActiveCfg = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|ARM.Build.0 = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|x64.ActiveCfg = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|x64.Build.0 = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|x86.ActiveCfg = Release|Any CPU
{F3FC9487-0A98-4180-BDED-771A1BA68F2E}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -760,5 +778,6 @@ Global
{3F8D6B13-B483-4873-8946-106D25BF6212} = {962EC435-6130-4B75-BAC1-12323B067443}
{5A9F1DA7-7931-4869-8E3A-B2F26DBFC303} = {CDD3B024-0BCD-4985-9B5B-0A43D854EACC}
{A295CBA8-DBE4-44EF-A673-6372728CEDB0} = {962EC435-6130-4B75-BAC1-12323B067443}
{F3FC9487-0A98-4180-BDED-771A1BA68F2E} = {962EC435-6130-4B75-BAC1-12323B067443}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="EdjCase.JsonRpc.Client" Version="1.3.1" />
<PackageReference Include="EdjCase.JsonRpc.Core" Version="1.0.4" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Nethereum.JsonRpc.Client\Nethereum.JsonRpc.Client.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace Nethereum.JsonRpc.UnixIpcClient
{
internal class RpcConnectionNotAvailableException : Exception
{
public RpcConnectionNotAvailableException()
{
}

public RpcConnectionNotAvailableException(string message) : base(message)
{
}

public RpcConnectionNotAvailableException(string message, Exception innerException) : base(message, innerException)
{
}
}
}
213 changes: 213 additions & 0 deletions src/Nethereum.JsonRpc.UnixIpcClient/UnixDomainSocketClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
using Nethereum.JsonRpc.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using EdjCase.JsonRpc.Client;
using EdjCase.JsonRpc.Core;
using RpcError = Nethereum.JsonRpc.Client.RpcError;
using RpcRequest = Nethereum.JsonRpc.Client.RpcRequest;

namespace Nethereum.JsonRpc.UnixIpcClient
{
public class UnixDomainSocketClient : IClient, IDisposable
{
private readonly string _ipcPath;
private readonly UnixEndPoint _unixEndPoint;

private readonly JsonSerializerSettings _jsonSerializationSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};

public UnixDomainSocketClient(string ipcPath)
{
_ipcPath = ipcPath;

_unixEndPoint = new UnixEndPoint(_ipcPath);
}

public RequestInterceptor OverridingRequestInterceptor { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

public async Task<T> SendRequestAsync<T>(RpcRequest request, string route = null)
{
var response = await SendAsync<EdjCase.JsonRpc.Core.RpcRequest, RpcResponse>(new EdjCase.JsonRpc.Core.RpcRequest(request.Id, request.Method, request.RawParameters)).ConfigureAwait(false);
HandleRpcError(response);
if (response == null || response.Result == null || string.IsNullOrEmpty(response.Result.ToString()))
return default(T);
return response.GetResult<T>();
}

public async Task<T> SendRequestAsync<T>(string method, string route = null, params object[] paramList)
{
var response = await SendAsync<EdjCase.JsonRpc.Core.RpcRequest, RpcResponse>(new EdjCase.JsonRpc.Core.RpcRequest(Configuration.DefaultRequestId, method, paramList)).ConfigureAwait(false);
HandleRpcError(response);
if (response == null || response.Result == null || string.IsNullOrEmpty(response.Result.ToString()))
return default(T);
return response.GetResult<T>();
}

public async Task SendRequestAsync(RpcRequest request, string route = null)
{
var response = await SendAsync<EdjCase.JsonRpc.Core.RpcRequest, RpcResponse>(new EdjCase.JsonRpc.Core.RpcRequest(request.Id, request.Method, request.RawParameters)).ConfigureAwait(false);
HandleRpcError(response);
}

public async Task SendRequestAsync(string method, string route = null, params object[] paramList)
{
var response = await SendAsync<EdjCase.JsonRpc.Core.RpcRequest, RpcResponse>(new EdjCase.JsonRpc.Core.RpcRequest(Configuration.DefaultRequestId, method, paramList)).ConfigureAwait(false);
HandleRpcError(response);
}

private async Task<byte[]> ReadResponseStream(Socket pipeClientStream)
{
var buffer = new byte[1024];
var bufferSegment = new ArraySegment<byte>(buffer, 0, buffer.Length);

using (var ms = new MemoryStream())
{
while (true)
{
//if the total number of bytes matches 1024 for the last Read, it will wait for the next read forever as we don't have a flag for completeness
//a wait is in place for this with a 10 second (maybe too long..)
//if timesout (false returned) we have to close the pipestream and return the memory stream
var read = 0;
if (Task.Run(
async () =>
{
read = await pipeClientStream.ReceiveAsync(bufferSegment, SocketFlags.None).ConfigureAwait(false);
}
).Wait(10000))
{
ms.Write(bufferSegment.Array, 0, read);
if (read < 1024)
return ms.ToArray();
}
else
{
return ms.ToArray();
}
}
}
}

private void HandleRpcError(RpcResponse response)
{
if (response != null && response.HasError)
{
throw new RpcResponseException(new RpcError(response.Error.Code, response.Error.Message, response.Error.Data));
}
}

private async Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request)
{
using (var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.IP))
{
if (!socket.Connected)
{
try
{
await socket.ConnectAsync(_unixEndPoint).ConfigureAwait(false);
}
catch (System.Exception connEx)
{
throw new RpcConnectionNotAvailableException("IPC connection not available !!!", connEx);
}
}

try
{
var rpcRequestJson = JsonConvert.SerializeObject(request, _jsonSerializationSettings);

var requestBytes = Encoding.UTF8.GetBytes(rpcRequestJson);
var requestSegment = new ArraySegment<byte>(requestBytes, 0, requestBytes.Length);

await socket.SendAsync(requestSegment, SocketFlags.None).ConfigureAwait(false);

var responseBytes = await ReadResponseStream(socket).ConfigureAwait(false);

if (responseBytes == null)
{
throw new RpcClientUnknownException("Invalid response / null");
}

var totalMegs = responseBytes.Length / 1024f / 1024f;

if (totalMegs > 10)
{
throw new RpcClientUnknownException("Response exceeds 10MB it will be impossible to parse it");
}

var responseJson = Encoding.UTF8.GetString(responseBytes);

try
{
var result = JsonConvert.DeserializeObject<TResponse>(responseJson, _jsonSerializationSettings);
if (result == null)
{
Console.WriteLine($"{DateTime.UtcNow} Null response object. RAW request:{rpcRequestJson}{Environment.NewLine}RAW response:{responseJson}");
}
return result;
}
catch (JsonSerializationException)
{
var rpcResponse = JsonConvert.DeserializeObject<RpcResponse>(responseJson, _jsonSerializationSettings);
if (rpcResponse == null)
{
throw new RpcClientUnknownException($"Unable to parse response from the ipc server. Response Json: {responseJson}");
}

throw rpcResponse.Error.CreateException();
}
catch (NullReferenceException)
{
throw new RpcClientUnknownException($"Unable to parse response from the ipc server. Response Json: {responseJson}");
//throw;
}
}
finally
{
socket.Shutdown(SocketShutdown.Both);
}
}
}

#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
}

// TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
// TODO: set large fields to null.

disposedValue = true;
}
}

// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~UnixDomainSocketClient() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }

// This code added to correctly implement the disposable pattern.
void IDisposable.Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);
}
#endregion
}
}

0 comments on commit bc2b619

Please sign in to comment.