Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class JsonRpcPipe : IInputPipe, IOutputPipe
/// Constant to indicate that headers are done.
/// </summary>
private const string HeaderEndConstant = "end";
private const string InputStreamClosedConstant = "closed";
private IInputPipe characterReader;
private IOutputPipe characterWriter;
private ConcurrentQueue<object> blockQueue;
Expand Down Expand Up @@ -48,7 +49,7 @@ public JsonRpcPipe(IInputPipe characterReader, IOutputPipe characterWriter)
/// Read a single valid JSON-RPC block.
/// </summary>
/// <typeparam name="T">Type of block to read and deserialize.</typeparam>
/// <returns>Block of type <typeparamref name="T"/>.</returns>
/// <returns>Block of type <typeparamref name="T"/>. Returns null if input stream has been closed.</returns>
public async Task<T> ReadBlock<T>() where T : class
{
object returnObj = null;
Expand All @@ -69,7 +70,14 @@ public async Task<T> ReadBlock<T>() where T : class
do
{
headers[header.Item1.ToLowerInvariant()] = header.Item2;
} while ((header = await ReadHeader()) != null && !header.Item1.Equals(HeaderEndConstant));
} while ((header = await ReadHeader()) != null && !header.Item1.Equals(HeaderEndConstant) && !header.Item1.Equals(InputStreamClosedConstant));

if (header.Item1.Equals(InputStreamClosedConstant))
{
// Input stream closed - return null to indicate this
return null;
}

byte[] bytes = new byte[Int32.Parse(headers["content-length"])];
for (int i = 0; i < bytes.Length; i++)
{
Expand Down Expand Up @@ -189,6 +197,9 @@ private async Task<Tuple<string, string>> ReadHeader()
break;
case '\r':
break;
case '\uffff':
// When this character is read, assume the input stream has been closed
return new Tuple<string, string>(InputStreamClosedConstant, String.Empty);
default:
if (spaceRead)
{
Expand Down
82 changes: 47 additions & 35 deletions PSSwagger.LiveTestFramework/src/PSSwagger.LTF.Lib/LiveTestServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,58 +163,70 @@ public async Task RunAsync()
LiveTestRequest msg = await this.Input.ReadBlock<LiveTestRequest>();
if (this.IsRunning)
{
if (this.parameters.Logger != null)
if (msg == null)
{
this.parameters.Logger.LogAsync("Processing message: {0}", msg);
if (this.parameters.Logger != null)
{
this.parameters.Logger.LogAsync("Input stream has been closed, stopping server.", msg);
}

this.IsRunning = false;
}
Task.Run(() =>
else
{
LiveTestResponse response = null;
IServiceTracer serviceTracer = null;
try
if (this.parameters.Logger != null)
{
this.parameters.Logger.LogAsync("Processing message: {0}", msg);
}
Task.Run(() =>
{
LiveTestResponse response = null;
IServiceTracer serviceTracer = null;
try
{
// Enable service tracing so that we can get service layer information required by test protocol
long invocationId = this.parameters.TracingManager.GetNextInvocationId();
serviceTracer = this.parameters.TracingManager.CreateTracer(invocationId, this.parameters.Logger);
this.parameters.TracingManager.EnableTracing();
serviceTracer = this.parameters.TracingManager.CreateTracer(invocationId, this.parameters.Logger);
this.parameters.TracingManager.EnableTracing();
// Process teh request
CommandExecutionResult commandResult = this.currentModule.ProcessRequest(msg, this.parameters.CredentialFactory);
if (commandResult == null)
if (commandResult == null)
{
if (this.parameters.Logger != null)
{
this.parameters.Logger.LogAsync("Command not found.");
}

response = msg.MakeResponse(null, MethodNotFound);
}
else
{
response = msg.MakeResponse(commandResult, serviceTracer, parameters.ObjectTransforms, this.parameters.Logger);
}
}
catch (Exception exRequest)
{
if (this.parameters.Logger != null)
{
this.parameters.Logger.LogAsync("Command not found.");
this.parameters.Logger.LogError("Exception processing request: " + exRequest.ToString());
}

response = msg.MakeResponse(null, MethodNotFound);
response = msg.MakeResponse(exRequest, InternalError);
}
else
finally
{
response = msg.MakeResponse(commandResult, serviceTracer, parameters.ObjectTransforms, this.parameters.Logger);
}
}
catch (Exception exRequest)
{
if (this.parameters.Logger != null)
{
this.parameters.Logger.LogError("Exception processing request: " + exRequest.ToString());
}

response = msg.MakeResponse(exRequest, InternalError);
}
finally
{
if (response != null)
{
this.Output.WriteBlock(response);
}
if (response != null)
{
this.Output.WriteBlock(response);
}

if (serviceTracer != null)
{
this.parameters.TracingManager.RemoveTracer(serviceTracer);
if (serviceTracer != null)
{
this.parameters.TracingManager.RemoveTracer(serviceTracer);
}
}
}
});
});
}
}
}
catch (Exception eRead)
Expand Down