Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport layer for proxy handshake #8

Merged
merged 21 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.microsoft.azure</groupId>
<artifactId>qpid-proton-j-extensions</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>

<url>https://github.com/Azure/qpid-proton-j-extensions</url>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/

package com.microsoft.azure.proton.transport.proxy;

import org.apache.qpid.proton.engine.Transport;

import java.util.Map;

public interface Proxy {
enum ProxyState {
PN_PROXY_NOT_STARTED,
PN_PROXY_CONNECTING,
PN_PROXY_CONNECTED,
PN_PROXY_FAILED
}

void configure(
String host,
Map<String, String> headers,
ProxyHandler proxyHandler,
Transport underlyingTransport);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/

package com.microsoft.azure.proton.transport.proxy;

import java.nio.ByteBuffer;
import java.util.Map;

public interface ProxyHandler {

class ProxyResponseResult {
private Boolean isSuccess;
private String error;

public ProxyResponseResult(final Boolean isSuccess, final String error) {
this.isSuccess = isSuccess;
this.error = error;
}

public Boolean getIsSuccess() {
return isSuccess;
}

public String getError() {
return error;
}
}

String createProxyRequest(String hostName, Map<String, String> additionalHeaders);

ProxyResponseResult validateProxyResponse(ByteBuffer buffer);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/

package com.microsoft.azure.proton.transport.proxy.impl;

import com.microsoft.azure.proton.transport.proxy.ProxyHandler;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Scanner;

public class ProxyHandlerImpl implements ProxyHandler {

@Override
public String createProxyRequest(String hostName, Map<String, String> additionalHeaders) {
final String endOfLine = "\r\n";
final StringBuilder connectRequestBuilder = new StringBuilder();
connectRequestBuilder.append(
String.format(
"CONNECT %1$s HTTP/1.1%2$sHost: %1$s%2$sConnection: Keep-Alive%2$s",
hostName,
endOfLine));
if (additionalHeaders != null) {
for (Map.Entry<String, String> entry: additionalHeaders.entrySet()) {
connectRequestBuilder.append(entry.getKey());
connectRequestBuilder.append(": ");
connectRequestBuilder.append(entry.getValue());
connectRequestBuilder.append(endOfLine);
}
}
connectRequestBuilder.append(endOfLine);
return connectRequestBuilder.toString();
}

@Override
public ProxyResponseResult validateProxyResponse(ByteBuffer buffer) {
int size = buffer.remaining();
String response = null;

if (size > 0) {
byte[] responseBytes = new byte[buffer.remaining()];
buffer.get(responseBytes);
response = new String(responseBytes, StandardCharsets.UTF_8);
final Scanner responseScanner = new Scanner(response);
if (responseScanner.hasNextLine()) {
final String firstLine = responseScanner.nextLine();
if (firstLine.toLowerCase().contains("http/1.1")
&& firstLine.contains("200")
&& firstLine.toLowerCase().contains("connection established")) {
return new ProxyResponseResult(true, null);
}
}
}

return new ProxyResponseResult(false, response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/

package com.microsoft.azure.proton.transport.proxy.impl;

import com.microsoft.azure.proton.transport.proxy.Proxy;
import com.microsoft.azure.proton.transport.proxy.ProxyHandler;

import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.engine.impl.TransportInput;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.engine.impl.TransportOutput;
import org.apache.qpid.proton.engine.impl.TransportWrapper;

import java.nio.ByteBuffer;
import java.util.Map;

import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;

public class ProxyImpl implements Proxy, TransportLayer {
private final int proxyHandshakeBufferSize = 4 * 1024; // buffers used only for proxy-handshake
private final ByteBuffer inputBuffer;
private final ByteBuffer outputBuffer;

private boolean tailClosed = false;
private boolean headClosed = false;
private boolean isProxyConfigured;
private String host = "";
private Map<String, String> headers = null;
private TransportImpl underlyingTransport;
private ProxyState proxyState = ProxyState.PN_PROXY_NOT_STARTED;
SreeramGarlapati marked this conversation as resolved.
Show resolved Hide resolved

private ProxyHandler proxyHandler;

public ProxyImpl() {
inputBuffer = newWriteableBuffer(proxyHandshakeBufferSize);
outputBuffer = newWriteableBuffer(proxyHandshakeBufferSize);
isProxyConfigured = false;
}

@Override
public TransportWrapper wrap(TransportInput input, TransportOutput output) {
return new ProxyTransportWrapper(input, output);
}

@Override
public void configure(
String host,
Map<String, String> headers,
ProxyHandler proxyHandler,
Transport underlyingTransport) {
this.host = host;
this.headers = headers;
this.proxyHandler = proxyHandler;
this.underlyingTransport = (TransportImpl) underlyingTransport;
isProxyConfigured = true;
}

protected ByteBuffer getInputBuffer() {
return this.inputBuffer;
}

protected ByteBuffer getOutputBuffer() {
return this.outputBuffer;
}

protected Boolean getIsProxyConfigured() {
return this.isProxyConfigured;
}

protected ProxyHandler getProxyHandler() {
return this.proxyHandler;
}

protected Transport getUnderlyingTransport() {
return this.underlyingTransport;
}

protected void writeProxyRequest() {
outputBuffer.clear();
String request = proxyHandler.createProxyRequest(host, headers);
outputBuffer.put(request.getBytes());
}

protected boolean getIsHandshakeInProgress() {
return isProxyConfigured
&& (proxyState == ProxyState.PN_PROXY_NOT_STARTED || proxyState == ProxyState.PN_PROXY_CONNECTING);
}

protected ProxyState getProxyState() {
return this.proxyState;
}

public Map<String, String> getProxyRequestHeaders() {
return this.headers;
}

private class ProxyTransportWrapper implements TransportWrapper {
private final TransportInput underlyingInput;
private final TransportOutput underlyingOutput;
private final ByteBuffer head;

ProxyTransportWrapper(TransportInput input, TransportOutput output) {
underlyingInput = input;
underlyingOutput = output;
head = outputBuffer.asReadOnlyBuffer();
}

@Override
public int capacity() {
if (getIsHandshakeInProgress()) {
if (tailClosed) {
return Transport.END_OF_STREAM;
} else {
return inputBuffer.remaining();
}
} else {
return underlyingInput.capacity();
}
}

@Override
public int position() {
if (getIsHandshakeInProgress()) {
if (tailClosed) {
return Transport.END_OF_STREAM;
} else {
return inputBuffer.position();
}
} else {
return underlyingInput.position();
}
}

@Override
public ByteBuffer tail() throws TransportException {
if (getIsHandshakeInProgress()) {
return inputBuffer;
} else {
return underlyingInput.tail();
}
}

@Override
public void process() throws TransportException {
if (getIsHandshakeInProgress()) {
inputBuffer.flip();

switch (proxyState) {
case PN_PROXY_CONNECTING:
final ProxyHandler.ProxyResponseResult responseResult = proxyHandler
.validateProxyResponse(inputBuffer);
inputBuffer.compact();

if (responseResult.getIsSuccess()) {
proxyState = ProxyState.PN_PROXY_CONNECTED;
} else {
tailClosed = true;
underlyingTransport.closed(
new TransportException(
"proxy connect request failed with error: " +
responseResult.getError()));
}
break;
default:
underlyingInput.process();
}
} else {
underlyingInput.process();
}
}

@Override
public void close_tail() {
tailClosed = true;
if (getIsHandshakeInProgress()) {
headClosed = true;
}

underlyingInput.close_tail();
}

@Override
public int pending() {
if (getIsHandshakeInProgress()) {
switch (proxyState) {
case PN_PROXY_NOT_STARTED:
if (outputBuffer.position() == 0) {
proxyState = ProxyState.PN_PROXY_CONNECTING;
writeProxyRequest();

head.limit(outputBuffer.position());
if (headClosed) {
proxyState = ProxyState.PN_PROXY_FAILED;
return Transport.END_OF_STREAM;
} else {
return outputBuffer.position();
}
} else {
return outputBuffer.position();
}

case PN_PROXY_CONNECTING:
if (headClosed && (outputBuffer.position() == 0)) {
proxyState = ProxyState.PN_PROXY_FAILED;
return Transport.END_OF_STREAM;
} else {
return outputBuffer.position();
}

default:
return Transport.END_OF_STREAM;
}
} else {
return underlyingOutput.pending();
}
}

@Override
public ByteBuffer head() {
if (getIsHandshakeInProgress()) {
switch (proxyState) {
case PN_PROXY_CONNECTING:
return head;
default:
return underlyingOutput.head();
}
} else {
return underlyingOutput.head();
}
}

@Override
public void pop(int bytes) {
if (getIsHandshakeInProgress()) {
switch (proxyState) {
case PN_PROXY_CONNECTING:
if (outputBuffer.position() != 0) {
outputBuffer.flip();
outputBuffer.position(bytes);
outputBuffer.compact();
head.position(0);
head.limit(outputBuffer.position());
} else {
underlyingOutput.pop(bytes);
}
break;
default:
underlyingOutput.pop(bytes);
}
} else {
underlyingOutput.pop(bytes);
}
}

@Override
public void close_head() {
underlyingOutput.close_head();
}
}
}
Loading