diff --git a/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.IO.Lib/JsonRpcPipe.cs b/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.IO.Lib/JsonRpcPipe.cs index 6b6b3f0..73f8cfb 100644 --- a/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.IO.Lib/JsonRpcPipe.cs +++ b/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.IO.Lib/JsonRpcPipe.cs @@ -20,6 +20,7 @@ public class JsonRpcPipe : IInputPipe, IOutputPipe /// Constant to indicate that headers are done. /// private const string HeaderEndConstant = "end"; + private const string InputStreamClosedConstant = "closed"; private IInputPipe characterReader; private IOutputPipe characterWriter; private ConcurrentQueue blockQueue; @@ -48,7 +49,7 @@ public JsonRpcPipe(IInputPipe characterReader, IOutputPipe characterWriter) /// Read a single valid JSON-RPC block. /// /// Type of block to read and deserialize. - /// Block of type . + /// Block of type . Returns null if input stream has been closed. public async Task ReadBlock() where T : class { object returnObj = null; @@ -69,7 +70,14 @@ public async Task ReadBlock() where T : class do { headers[header.Item1.ToLowerInvariant()] = header.Item2; - } while ((header = await ReadHeader()) != null && !header.Item1.Equals(HeaderEndConstant)); + } while ((header = await ReadHeader()) != null && !header.Item1.Equals(HeaderEndConstant) && !header.Item1.Equals(InputStreamClosedConstant)); + + if (header.Item1.Equals(InputStreamClosedConstant)) + { + // Input stream closed - return null to indicate this + return null; + } + byte[] bytes = new byte[Int32.Parse(headers["content-length"])]; for (int i = 0; i < bytes.Length; i++) { @@ -189,6 +197,9 @@ private async Task> ReadHeader() break; case '\r': break; + case '\uffff': + // When this character is read, assume the input stream has been closed + return new Tuple(InputStreamClosedConstant, String.Empty); default: if (spaceRead) { diff --git a/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.Lib/LiveTestServer.cs b/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.Lib/LiveTestServer.cs index d53ffb3..2803597 100644 --- a/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.Lib/LiveTestServer.cs +++ b/PSSwagger.LiveTestFramework/src/PSSwagger.LTF.Lib/LiveTestServer.cs @@ -163,58 +163,70 @@ public async Task RunAsync() LiveTestRequest msg = await this.Input.ReadBlock(); if (this.IsRunning) { - if (this.parameters.Logger != null) + if (msg == null) { - this.parameters.Logger.LogAsync("Processing message: {0}", msg); + if (this.parameters.Logger != null) + { + this.parameters.Logger.LogAsync("Input stream has been closed, stopping server.", msg); + } + + this.IsRunning = false; } - Task.Run(() => + else { - LiveTestResponse response = null; - IServiceTracer serviceTracer = null; - try + if (this.parameters.Logger != null) + { + this.parameters.Logger.LogAsync("Processing message: {0}", msg); + } + Task.Run(() => { + LiveTestResponse response = null; + IServiceTracer serviceTracer = null; + try + { // Enable service tracing so that we can get service layer information required by test protocol long invocationId = this.parameters.TracingManager.GetNextInvocationId(); - serviceTracer = this.parameters.TracingManager.CreateTracer(invocationId, this.parameters.Logger); - this.parameters.TracingManager.EnableTracing(); + serviceTracer = this.parameters.TracingManager.CreateTracer(invocationId, this.parameters.Logger); + this.parameters.TracingManager.EnableTracing(); // Process teh request CommandExecutionResult commandResult = this.currentModule.ProcessRequest(msg, this.parameters.CredentialFactory); - if (commandResult == null) + if (commandResult == null) + { + if (this.parameters.Logger != null) + { + this.parameters.Logger.LogAsync("Command not found."); + } + + response = msg.MakeResponse(null, MethodNotFound); + } + else + { + response = msg.MakeResponse(commandResult, serviceTracer, parameters.ObjectTransforms, this.parameters.Logger); + } + } + catch (Exception exRequest) { if (this.parameters.Logger != null) { - this.parameters.Logger.LogAsync("Command not found."); + this.parameters.Logger.LogError("Exception processing request: " + exRequest.ToString()); } - response = msg.MakeResponse(null, MethodNotFound); + response = msg.MakeResponse(exRequest, InternalError); } - else + finally { - response = msg.MakeResponse(commandResult, serviceTracer, parameters.ObjectTransforms, this.parameters.Logger); - } - } - catch (Exception exRequest) - { - if (this.parameters.Logger != null) - { - this.parameters.Logger.LogError("Exception processing request: " + exRequest.ToString()); - } - - response = msg.MakeResponse(exRequest, InternalError); - } - finally - { - if (response != null) - { - this.Output.WriteBlock(response); - } + if (response != null) + { + this.Output.WriteBlock(response); + } - if (serviceTracer != null) - { - this.parameters.TracingManager.RemoveTracer(serviceTracer); + if (serviceTracer != null) + { + this.parameters.TracingManager.RemoveTracer(serviceTracer); + } } - } - }); + }); + } } } catch (Exception eRead)