diff --git a/pom.xml b/pom.xml index f6afa21..bf92402 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.microsoft.azure qpid-proton-j-extensions - 1.0.0 + 1.1.0 https://github.com/Azure/qpid-proton-j-extensions diff --git a/src/main/java/com/microsoft/azure/proton/transport/proxy/Proxy.java b/src/main/java/com/microsoft/azure/proton/transport/proxy/Proxy.java new file mode 100644 index 0000000..a543715 --- /dev/null +++ b/src/main/java/com/microsoft/azure/proton/transport/proxy/Proxy.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy; + +import java.util.Map; + +import org.apache.qpid.proton.engine.Transport; + +public interface Proxy { + enum ProxyState { + PN_PROXY_NOT_STARTED, + PN_PROXY_CONNECTING, + PN_PROXY_CONNECTED, + PN_PROXY_FAILED + } + + void configure( + String host, + Map headers, + ProxyHandler proxyHandler, + Transport underlyingTransport); +} diff --git a/src/main/java/com/microsoft/azure/proton/transport/proxy/ProxyHandler.java b/src/main/java/com/microsoft/azure/proton/transport/proxy/ProxyHandler.java new file mode 100644 index 0000000..6956702 --- /dev/null +++ b/src/main/java/com/microsoft/azure/proton/transport/proxy/ProxyHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy; + +import java.nio.ByteBuffer; +import java.util.Map; + +public interface ProxyHandler { + + class ProxyResponseResult { + private Boolean isSuccess; + private String error; + + public ProxyResponseResult(final Boolean isSuccess, final String error) { + this.isSuccess = isSuccess; + this.error = error; + } + + public Boolean getIsSuccess() { + return isSuccess; + } + + public String getError() { + return error; + } + } + + String createProxyRequest(String hostName, Map additionalHeaders); + + ProxyResponseResult validateProxyResponse(ByteBuffer buffer); + +} diff --git a/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImpl.java b/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImpl.java new file mode 100644 index 0000000..0a35d30 --- /dev/null +++ b/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImpl.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy.impl; + +import com.microsoft.azure.proton.transport.proxy.ProxyHandler; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Scanner; + +public class ProxyHandlerImpl implements ProxyHandler { + + @Override + public String createProxyRequest(String hostName, Map additionalHeaders) { + final String endOfLine = "\r\n"; + final StringBuilder connectRequestBuilder = new StringBuilder(); + connectRequestBuilder.append( + String.format( + "CONNECT %1$s HTTP/1.1%2$sHost: %1$s%2$sConnection: Keep-Alive%2$s", + hostName, + endOfLine)); + if (additionalHeaders != null) { + for (Map.Entry entry: additionalHeaders.entrySet()) { + connectRequestBuilder.append(entry.getKey()); + connectRequestBuilder.append(": "); + connectRequestBuilder.append(entry.getValue()); + connectRequestBuilder.append(endOfLine); + } + } + connectRequestBuilder.append(endOfLine); + return connectRequestBuilder.toString(); + } + + @Override + public ProxyResponseResult validateProxyResponse(ByteBuffer buffer) { + int size = buffer.remaining(); + String response = null; + + if (size > 0) { + byte[] responseBytes = new byte[buffer.remaining()]; + buffer.get(responseBytes); + response = new String(responseBytes, StandardCharsets.UTF_8); + final Scanner responseScanner = new Scanner(response); + if (responseScanner.hasNextLine()) { + final String firstLine = responseScanner.nextLine(); + if (firstLine.toLowerCase().contains("http/1.1") + && firstLine.contains("200") + && firstLine.toLowerCase().contains("connection established")) { + return new ProxyResponseResult(true, null); + } + } + } + + return new ProxyResponseResult(false, response); + } +} diff --git a/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImpl.java b/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImpl.java new file mode 100644 index 0000000..86b393a --- /dev/null +++ b/src/main/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImpl.java @@ -0,0 +1,278 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy.impl; + +import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer; + +import com.microsoft.azure.proton.transport.proxy.Proxy; +import com.microsoft.azure.proton.transport.proxy.ProxyHandler; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.TransportException; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.apache.qpid.proton.engine.impl.TransportInput; +import org.apache.qpid.proton.engine.impl.TransportLayer; +import org.apache.qpid.proton.engine.impl.TransportOutput; +import org.apache.qpid.proton.engine.impl.TransportWrapper; + +public class ProxyImpl implements Proxy, TransportLayer { + private final int proxyHandshakeBufferSize = 4 * 1024; // buffers used only for proxy-handshake + private final ByteBuffer inputBuffer; + private final ByteBuffer outputBuffer; + + private boolean tailClosed = false; + private boolean headClosed = false; + private boolean isProxyConfigured; + private String host = ""; + private Map headers = null; + private TransportImpl underlyingTransport; + private ProxyState proxyState = ProxyState.PN_PROXY_NOT_STARTED; + + private ProxyHandler proxyHandler; + + /** + * Create proxy transport layer - which, after configuring using + * the {@link #configure(String, Map, ProxyHandler, Transport)} API + * is ready for layering in qpid-proton-j transport layers, using + * {@link org.apache.qpid.proton.engine.impl.TransportInternal#addTransportLayer(TransportLayer)} API. + */ + public ProxyImpl() { + inputBuffer = newWriteableBuffer(proxyHandshakeBufferSize); + outputBuffer = newWriteableBuffer(proxyHandshakeBufferSize); + isProxyConfigured = false; + } + + @Override + public TransportWrapper wrap(TransportInput input, TransportOutput output) { + return new ProxyTransportWrapper(input, output); + } + + @Override + public void configure( + String host, + Map headers, + ProxyHandler proxyHandler, + Transport underlyingTransport) { + this.host = host; + this.headers = headers; + this.proxyHandler = proxyHandler; + this.underlyingTransport = (TransportImpl) underlyingTransport; + isProxyConfigured = true; + } + + protected ByteBuffer getInputBuffer() { + return this.inputBuffer; + } + + protected ByteBuffer getOutputBuffer() { + return this.outputBuffer; + } + + protected Boolean getIsProxyConfigured() { + return this.isProxyConfigured; + } + + protected ProxyHandler getProxyHandler() { + return this.proxyHandler; + } + + protected Transport getUnderlyingTransport() { + return this.underlyingTransport; + } + + protected void writeProxyRequest() { + outputBuffer.clear(); + String request = proxyHandler.createProxyRequest(host, headers); + outputBuffer.put(request.getBytes()); + } + + protected boolean getIsHandshakeInProgress() { + // if handshake is in progress + // we do not engage the underlying transportInput/transportOutput. + // Only when, ProxyState == Connected - then we can start engaging + // next TransportLayers. + // So, InProgress includes - proxyState = failed as well. + // return true - from the point when proxyImpl.configure() is invoked to + // proxyState transitions to Connected. + // returns false - in all other cases + return isProxyConfigured && proxyState != ProxyState.PN_PROXY_CONNECTED; + } + + protected ProxyState getProxyState() { + return this.proxyState; + } + + public Map getProxyRequestHeaders() { + return this.headers; + } + + private class ProxyTransportWrapper implements TransportWrapper { + private final TransportInput underlyingInput; + private final TransportOutput underlyingOutput; + private final ByteBuffer head; + + ProxyTransportWrapper(TransportInput input, TransportOutput output) { + underlyingInput = input; + underlyingOutput = output; + head = outputBuffer.asReadOnlyBuffer(); + } + + @Override + public int capacity() { + if (getIsHandshakeInProgress()) { + if (tailClosed) { + return Transport.END_OF_STREAM; + } else { + return inputBuffer.remaining(); + } + } else { + return underlyingInput.capacity(); + } + } + + @Override + public int position() { + if (getIsHandshakeInProgress()) { + if (tailClosed) { + return Transport.END_OF_STREAM; + } else { + return inputBuffer.position(); + } + } else { + return underlyingInput.position(); + } + } + + @Override + public ByteBuffer tail() throws TransportException { + if (getIsHandshakeInProgress()) { + return inputBuffer; + } else { + return underlyingInput.tail(); + } + } + + @Override + public void process() throws TransportException { + if (getIsHandshakeInProgress()) { + switch (proxyState) { + case PN_PROXY_CONNECTING: + inputBuffer.flip(); + final ProxyHandler.ProxyResponseResult responseResult = proxyHandler + .validateProxyResponse(inputBuffer); + inputBuffer.compact(); + + if (responseResult.getIsSuccess()) { + proxyState = ProxyState.PN_PROXY_CONNECTED; + } else { + tailClosed = true; + underlyingTransport.closed( + new TransportException( + "proxy connect request failed with error: " + + responseResult.getError())); + } + break; + default: + underlyingInput.process(); + } + } else { + underlyingInput.process(); + } + } + + @Override + public void close_tail() { + tailClosed = true; + if (getIsHandshakeInProgress()) { + headClosed = true; + } + + underlyingInput.close_tail(); + } + + @Override + public int pending() { + if (getIsHandshakeInProgress()) { + switch (proxyState) { + case PN_PROXY_NOT_STARTED: + if (outputBuffer.position() == 0) { + proxyState = ProxyState.PN_PROXY_CONNECTING; + writeProxyRequest(); + + head.limit(outputBuffer.position()); + if (headClosed) { + proxyState = ProxyState.PN_PROXY_FAILED; + return Transport.END_OF_STREAM; + } else { + return outputBuffer.position(); + } + } else { + return outputBuffer.position(); + } + + case PN_PROXY_CONNECTING: + if (headClosed && (outputBuffer.position() == 0)) { + proxyState = ProxyState.PN_PROXY_FAILED; + return Transport.END_OF_STREAM; + } else { + return outputBuffer.position(); + } + + default: + return Transport.END_OF_STREAM; + } + } else { + return underlyingOutput.pending(); + } + } + + @Override + public ByteBuffer head() { + if (getIsHandshakeInProgress()) { + switch (proxyState) { + case PN_PROXY_CONNECTING: + return head; + default: + return underlyingOutput.head(); + } + } else { + return underlyingOutput.head(); + } + } + + @Override + public void pop(int bytes) { + if (getIsHandshakeInProgress()) { + switch (proxyState) { + case PN_PROXY_CONNECTING: + if (outputBuffer.position() != 0) { + outputBuffer.flip(); + outputBuffer.position(bytes); + outputBuffer.compact(); + head.position(0); + head.limit(outputBuffer.position()); + } else { + underlyingOutput.pop(bytes); + } + break; + default: + underlyingOutput.pop(bytes); + } + } else { + underlyingOutput.pop(bytes); + } + } + + @Override + public void close_head() { + headClosed = true; + underlyingOutput.close_head(); + } + } +} diff --git a/src/main/java/com/microsoft/azure/proton/transport/ws/impl/WebSocketImpl.java b/src/main/java/com/microsoft/azure/proton/transport/ws/impl/WebSocketImpl.java index 7ac3dd4..c7253d7 100644 --- a/src/main/java/com/microsoft/azure/proton/transport/ws/impl/WebSocketImpl.java +++ b/src/main/java/com/microsoft/azure/proton/transport/ws/impl/WebSocketImpl.java @@ -94,6 +94,7 @@ public void configure( WebSocketHandler webSocketHandler) { this.host = host; this.path = path; + this.query = query; this.port = port; this.protocol = protocol; this.additionalHeaders = additionalHeaders; diff --git a/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImplTest.java b/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImplTest.java new file mode 100644 index 0000000..dacdcca --- /dev/null +++ b/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyHandlerImplTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy.impl; + +import com.microsoft.azure.proton.transport.proxy.ProxyHandler; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; + +public class ProxyHandlerImplTest { + @Test + public void testCreateProxyRequest() { + final String hostName = "testHostName"; + final HashMap headers = new HashMap<>(); + headers.put("header1", "headervalue1"); + headers.put("header2", "headervalue2"); + + final ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl(); + final String actualProxyRequest = proxyHandler.createProxyRequest(hostName, headers); + + final String expectedProxyRequest = "CONNECT testHostName HTTP/1.1\r\n" + + "Host: testHostName\r\n" + + "Connection: Keep-Alive\r\n" + + "header2: headervalue2\r\n" + + "header1: headervalue1\r\n" + + "\r\n"; + + Assert.assertEquals(expectedProxyRequest, actualProxyRequest); + } + + @Test + public void testValidateProxyResponseOnSuccess() { + final String validResponse = "HTTP/1.1 200 Connection Established\r\n" + + "FiddlerGateway: Direct\r\n" + + "StartTime: 13:08:21.574\r\n" + + "Connection: close\r\n\r\n"; + final ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.put(validResponse.getBytes(StandardCharsets.UTF_8)); + buffer.flip(); + + final ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl(); + ProxyHandler.ProxyResponseResult responseResult = proxyHandler.validateProxyResponse(buffer); + + Assert.assertTrue(responseResult.getIsSuccess()); + Assert.assertNull(responseResult.getError()); + + Assert.assertEquals(0, buffer.remaining()); + } + + @Test + public void testValidateProxyResponseOnFailure() { + final String failResponse = "HTTP/1.1 407 Proxy Auth Required\r\n" + + "Connection: close\r\n" + + "Proxy-Authenticate: Basic realm=\"FiddlerProxy (user: 1, pass: 1)\"\r\n" + + "Content-Type: text/html\r\n" + + "[Fiddler] Proxy Authentication Required.
\r\n\r\n"; + final ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.put(failResponse.getBytes(StandardCharsets.UTF_8)); + buffer.flip(); + + final ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl(); + ProxyHandler.ProxyResponseResult responseResult = proxyHandler.validateProxyResponse(buffer); + + Assert.assertTrue(!responseResult.getIsSuccess()); + Assert.assertEquals(failResponse, responseResult.getError()); + + Assert.assertEquals(0, buffer.remaining()); + } + + @Test + public void testValidateProxyResponseOnInvalidResponse() { + final String invalidResponse = "HTTP/1.1 abc Connection Established\r\n" + + "HTTP/1.1 200 Connection Established\r\n" + + "FiddlerGateway: Direct\r\n" + + "StartTime: 13:08:21.574\r\n" + + "Connection: close\r\n\r\n"; + final ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.put(invalidResponse.getBytes(StandardCharsets.UTF_8)); + buffer.flip(); + + final ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl(); + ProxyHandler.ProxyResponseResult responseResult = proxyHandler.validateProxyResponse(buffer); + + Assert.assertTrue(!responseResult.getIsSuccess()); + Assert.assertEquals(invalidResponse, responseResult.getError()); + + Assert.assertEquals(0, buffer.remaining()); + } + + @Test + public void testValidateProxyResponseOnEmptyResponse() { + final String emptyResponse = "\r\n\r\n"; + final ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.put(emptyResponse.getBytes(StandardCharsets.UTF_8)); + buffer.flip(); + + final ProxyHandlerImpl proxyHandler = new ProxyHandlerImpl(); + ProxyHandler.ProxyResponseResult responseResult = proxyHandler.validateProxyResponse(buffer); + + Assert.assertTrue(!responseResult.getIsSuccess()); + Assert.assertEquals(emptyResponse, responseResult.getError()); + + Assert.assertEquals(0, buffer.remaining()); + } +} diff --git a/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImplTest.java b/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImplTest.java new file mode 100644 index 0000000..bbf0964 --- /dev/null +++ b/src/test/java/com/microsoft/azure/proton/transport/proxy/impl/ProxyImplTest.java @@ -0,0 +1,588 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ + +package com.microsoft.azure.proton.transport.proxy.impl; + +import com.microsoft.azure.proton.transport.proxy.Proxy; +import com.microsoft.azure.proton.transport.proxy.ProxyHandler; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.TransportException; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.apache.qpid.proton.engine.impl.TransportInput; +import org.apache.qpid.proton.engine.impl.TransportOutput; +import org.apache.qpid.proton.engine.impl.TransportWrapper; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.*; + +// \org\apache\qpid\proton\reactor\impl\IOHandler.java > connectionReadable and connectionWriteable +// methods are the starting point which invokes all methods of TransportInput and TransportOutput +// classes - to implement transport layering. +// Goal of this class is to test - expected outcomes of proxy transport layer +// when these methods are invoked, and how ProxyState state transitions plays along. +public class ProxyImplTest { + + private String hostName = "test.host.name"; + private int bufferSize = 4 * 1024; + private Map headers = new HashMap<>(); + private int proxyConnectRequestLength = 132; + + private void initHeaders() { + headers.put("header1", "value1"); + headers.put("header2", "value2"); + headers.put("header3", "value3"); + } + + @Test + public void testConstructor() { + ProxyImpl proxyImpl = new ProxyImpl(); + + Assert.assertEquals(bufferSize, proxyImpl.getInputBuffer().capacity()); + Assert.assertEquals(bufferSize, proxyImpl.getOutputBuffer().capacity()); + + Assert.assertFalse(proxyImpl.getIsProxyConfigured()); + } + + @Test + public void testConfigure() { + ProxyImpl proxyImpl = new ProxyImpl(); + ProxyHandlerImpl proxyHandler = mock(ProxyHandlerImpl.class); + TransportImpl transport = mock(TransportImpl.class); + + proxyImpl.configure(hostName, headers, proxyHandler, transport); + + Assert.assertTrue(proxyImpl.getIsProxyConfigured()); + Assert.assertEquals(proxyHandler, proxyImpl.getProxyHandler()); + Assert.assertEquals(transport, proxyImpl.getUnderlyingTransport()); + Assert.assertEquals(headers, proxyImpl.getProxyRequestHeaders()); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + } + + @Test + public void testWriteProxyRequest() { + initHeaders(); + + ProxyHandlerImpl spyProxyHandler = spy(new ProxyHandlerImpl()); + TransportImpl transport = mock(TransportImpl.class); + + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, spyProxyHandler, transport); + proxyImpl.writeProxyRequest(); + + verify(spyProxyHandler, times(1)).createProxyRequest(hostName, headers); + + ByteBuffer outputBuffer = proxyImpl.getOutputBuffer(); + outputBuffer.flip(); + + Assert.assertEquals(proxyConnectRequestLength, outputBuffer.remaining()); + } + + @Test + public void testProxyHandshakeStatesBeforeConfigure() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_NOT_STARTED); + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + } + + @Test + public void testProxyHandshakeStatesAfterConfigure() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + ProxyHandlerImpl proxyHandler = mock(ProxyHandlerImpl.class); + TransportImpl transport = mock(TransportImpl.class); + proxyImpl.configure(hostName, headers, proxyHandler, transport); + + Assert.assertTrue(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + Assert.assertTrue(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + Assert.assertFalse(proxyImpl.getIsHandshakeInProgress()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + Assert.assertTrue(proxyImpl.getIsHandshakeInProgress()); + } + + @Test + public void testPendingWhenProxyStateIsNotStarted() { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + + TransportOutput mockOutput = mock(TransportOutput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mockOutput); + int bytesCount = transportWrapper.pending(); + + Assert.assertEquals(proxyConnectRequestLength, transportWrapper.head().remaining()); + + ByteBuffer outputBuffer = proxyImpl.getOutputBuffer(); + outputBuffer.flip(); + + Assert.assertEquals(proxyConnectRequestLength, transportWrapper.head().remaining()); + + Assert.assertEquals(proxyConnectRequestLength, outputBuffer.remaining()); + Assert.assertEquals(proxyConnectRequestLength, bytesCount); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + + verify(mockOutput, times(0)).pending(); + } + + @Test + public void testPendingWhenProxyStateIsNotStartedAndOutputBufferIsNotEmpty() { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + String message = "olddata"; + ByteBuffer outputBuffer = proxyImpl.getOutputBuffer(); + outputBuffer.put(message.getBytes()); + + int bytesCount = transportWrapper.pending(); + + outputBuffer.flip(); + Assert.assertEquals(message.length(), outputBuffer.remaining()); + Assert.assertEquals(message.length(), bytesCount); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + } + + @Test + public void testPendingWhenProxyStateIsConnecting() { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + transportWrapper.pending(); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + + for (int i=0; i<10; i++) { + transportWrapper.pending(); + } + + Assert.assertEquals(proxyConnectRequestLength, transportWrapper.head().remaining()); + + ByteBuffer outputBuffer = proxyImpl.getOutputBuffer(); + outputBuffer.flip(); + + Assert.assertEquals(proxyConnectRequestLength, outputBuffer.remaining()); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + } + + @Test + public void testPendingWhenProxyStateIsConnected() throws Exception { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + transportWrapper.pending(); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTED, proxyImpl.getProxyState()); + } + + @Test + public void testPendingWhenProxyStateIsFailed() throws Exception { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + Assert.assertEquals(-1, transportWrapper.pending()); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_FAILED, proxyImpl.getProxyState()); + } + + @Test + public void testProcessWhenProxyStateNotStarted() { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, new ProxyHandlerImpl(), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + transportWrapper.process(); + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + + verify(mockInput, times(1)).process(); + } + + @Test + public void testProcessWhenProxyStateConnectingTransitionsToConnectedOnValidResponse() { + ProxyImpl proxyImpl = new ProxyImpl(); + ProxyHandler mockHandler = mock(ProxyHandler.class); + proxyImpl.configure(hostName, headers, mockHandler, mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + ProxyHandler.ProxyResponseResult mockResponse = mock(ProxyHandler.ProxyResponseResult.class); + + when(mockHandler.createProxyRequest((String) any(), (Map) any())).thenReturn("proxy request"); + + when(mockResponse.getIsSuccess()).thenReturn(true); + when(mockResponse.getError()).thenReturn(null); + when(mockHandler.validateProxyResponse((ByteBuffer) any())).thenReturn(mockResponse); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + transportWrapper.pending(); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + transportWrapper.process(); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTED, proxyImpl.getProxyState()); + } + + @Test + public void testProcessProxyStateConnectingFailureLeadsToUnderlyingTransportClosed() { + ProxyImpl proxyImpl = new ProxyImpl(); + ProxyHandler mockHandler = mock(ProxyHandler.class); + TransportImpl mockTransport = mock(TransportImpl.class); + proxyImpl.configure(hostName, headers, mockHandler, mockTransport); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + ProxyHandler.ProxyResponseResult mockResponse = mock(ProxyHandler.ProxyResponseResult.class); + + when(mockHandler.createProxyRequest((String) any(), (Map) any())).thenReturn("proxy request"); + + when(mockResponse.getIsSuccess()).thenReturn(false); + when(mockResponse.getError()).thenReturn("proxy failure response"); + when(mockHandler.validateProxyResponse((ByteBuffer) any())).thenReturn(mockResponse); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + transportWrapper.pending(); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + transportWrapper.process(); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_CONNECTING, proxyImpl.getProxyState()); + verify(mockTransport, times(1)).closed((TransportException) any()); + } + + @Test + public void testProcessProxyStateIsConnected() throws Exception { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + transportWrapper.process(); + + verify(mockInput, times(1)).process(); + } + + @Test + public void testProcessProxyStateIsFailed() throws Exception { + initHeaders(); + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + transportWrapper.process(); + + verify(mockInput, times(1)).process(); + } + + @Test + public void testPopProxyStateIsNotStarted() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_NOT_STARTED); + + TransportOutput mockOutput = mock(TransportOutput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mockOutput); + transportWrapper.pop(20); + + verify(mockOutput, times(1)).pop(20); + } + + @Test + public void testPopProxyStateConnecting() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + + ByteBuffer outputBuffer = proxyImpl.getOutputBuffer(); + byte[] outputBufferData = "test pop moves position".getBytes(); + outputBuffer.put(outputBufferData); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + Assert.assertEquals(outputBufferData.length, outputBuffer.position()); + + transportWrapper.pop(5); + + Assert.assertEquals(outputBufferData.length - 5, outputBuffer.position()); + Assert.assertEquals(bufferSize - outputBufferData.length + 5, outputBuffer.remaining()); + + ByteBuffer head = transportWrapper.head(); + Assert.assertEquals(0, head.position()); + Assert.assertEquals(outputBufferData.length - 5, head.remaining()); + } + + @Test + public void testPopProxyStateIsConnected() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + + TransportOutput mockOutput = mock(TransportOutput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mockOutput); + transportWrapper.pop(20); + + verify(mockOutput, times(1)).pop(20); + } + + @Test + public void testPopProxyStateIsFailed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + + TransportOutput mockOutput = mock(TransportOutput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mockOutput); + transportWrapper.pop(20); + + verify(mockOutput, times(1)).pop(20); + } + + @Test + public void testTailReturnsCurrentInputBufferExceptProxyStateIsConnected() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_NOT_STARTED); + Assert.assertTrue(proxyImpl.getInputBuffer() == transportWrapper.tail()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + Assert.assertTrue(proxyImpl.getInputBuffer() == transportWrapper.tail()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + Assert.assertTrue(proxyImpl.getInputBuffer() == transportWrapper.tail()); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + Assert.assertTrue(proxyImpl.getInputBuffer() != transportWrapper.tail()); + verify(mockInput, times(1)).tail(); + } + + @Test + public void testHeadDelegatesToUnderlyingOutputWhenProxyStateIsConnected() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportOutput mockOutput = mock(TransportOutput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mockOutput); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + transportWrapper.head(); + verify(mockOutput, times(1)).head(); + } + + @Test + public void testPositionWhenProxyStateIsNotStarted() { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + Assert.assertEquals(0, transportWrapper.position()); + } + + @Test + public void testPositionWhenProxyStateIsNotStartedAndTailClosed() { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.capacity()); + } + + @Test + public void testPositionWhenProxyStateIsConnecting() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + Assert.assertEquals(0, transportWrapper.position()); + } + + @Test + public void testPositionWhenProxyStateIsConnectingAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.position()); + } + + @Test + public void testPositionWhenProxyStateIsConnected() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + transportWrapper.position(); + + verify(mockInput, times(1)).position(); + } + + @Test + public void testPositionWhenProxyStateIsConnectedAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + transportWrapper.close_tail(); + transportWrapper.position(); + + verify(mockInput, times(1)).position(); + } + + @Test + public void testPositionWhenProxyStateIsFailed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + Assert.assertEquals(0, transportWrapper.position()); + } + + @Test + public void testPositionWhenProxyStateIsFailedAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.position()); + } + + @Test + public void testCapacityWhenProxyStateIsNotStarted() { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + Assert.assertEquals(bufferSize, transportWrapper.capacity()); + } + + @Test + public void testCapacityWhenProxyStateIsNotStartedAndTailClosed() { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + Assert.assertEquals(Proxy.ProxyState.PN_PROXY_NOT_STARTED, proxyImpl.getProxyState()); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.capacity()); + } + + @Test + public void testCapacityWhenProxyStateIsConnecting() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + Assert.assertEquals(bufferSize, transportWrapper.capacity()); + } + + @Test + public void testCapacityWhenProxyStateIsConnectingAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTING); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.capacity()); + } + + @Test + public void testCapacityWhenProxyStateIsConnected() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + transportWrapper.capacity(); + + verify(mockInput, times(1)).capacity(); + } + + @Test + public void testCapacityWhenProxyStateIsConnectedAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportInput mockInput = mock(TransportInput.class); + TransportWrapper transportWrapper = proxyImpl.wrap(mockInput, mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_CONNECTED); + transportWrapper.close_tail(); + transportWrapper.capacity(); + + verify(mockInput, times(1)).capacity(); + } + + @Test + public void testCapacityWhenProxyStateIsFailed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + Assert.assertEquals(bufferSize, transportWrapper.capacity()); + } + + @Test + public void testCapacityWhenProxyStateIsFailedAndTailClosed() throws Exception { + ProxyImpl proxyImpl = new ProxyImpl(); + proxyImpl.configure(hostName, headers, mock(ProxyHandler.class), mock(TransportImpl.class)); + TransportWrapper transportWrapper = proxyImpl.wrap(mock(TransportInput.class), mock(TransportOutput.class)); + + setProxyState(proxyImpl, Proxy.ProxyState.PN_PROXY_FAILED); + transportWrapper.close_tail(); + Assert.assertEquals(Transport.END_OF_STREAM, transportWrapper.capacity()); + } + + private void setProxyState(ProxyImpl proxyImpl, Proxy.ProxyState proxyState) throws NoSuchFieldException, IllegalAccessException { + Field proxyStateField = ProxyImpl.class.getDeclaredField("proxyState"); + proxyStateField.setAccessible(true); + proxyStateField.set(proxyImpl, proxyState); + Assert.assertEquals(proxyState, proxyImpl.getProxyState()); + } +}