Skip to content

Commit

Permalink
Support for GCS proxies everywhere in the GCS API (#92192)
Browse files Browse the repository at this point in the history
Currently, if the user set proxy setting for the GCS repository, they are only used for interactions with the GCS API, but not the other network interactions that the the module performs (refreshing OAuth tokens, fetching metadata) outside of the SDK.

Fix that by making sure that all network interactions go via proxy if it's configured.

Fixes #91952
  • Loading branch information
arteam committed Feb 14, 2023
1 parent f2f34c9 commit b3ff847
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 38 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/92192.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92192
summary: Support for GCS proxies everywhere in the GCS API
area: Snapshot/Restore
type: bug
issues:
- 91952
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.gcs;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.core.SuppressForbidden;

import java.io.IOException;

@SuppressForbidden(reason = "Tests that all requests come via a proxy")
class ForwardedViaProxyHandler implements HttpHandler {

private final HttpHandler delegateHandler;

ForwardedViaProxyHandler(HttpHandler delegateHandler) {
this.delegateHandler = delegateHandler;
}

@Override
public void handle(HttpExchange exchange) throws IOException {
assert "test-web-proxy-server".equals(exchange.getRequestHeaders().getFirst("X-Via"));
delegateHandler.handle(exchange);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.gcs;

import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import fixture.gcs.TestUtils;

import com.sun.net.httpserver.HttpServer;

import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_HOST_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_PORT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROXY_TYPE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;

@SuppressForbidden(reason = "We start an HTTP proxy server to test proxy support for GCS")
public class GcsProxyIntegrationTests extends ESBlobStoreRepositoryIntegTestCase {

private static HttpServer upstreamServer;
private static WebProxyServer proxyServer;

@BeforeClass
public static void startServers() throws Exception {
upstreamServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
upstreamServer.start();
proxyServer = new WebProxyServer();
}

@AfterClass
public static void stopServers() throws IOException {
upstreamServer.stop(0);
proxyServer.close();
}

@Before
public void setUpHttpServer() {
upstreamServer.createContext("/", new ForwardedViaProxyHandler(new GoogleCloudStorageHttpHandler("bucket")));
upstreamServer.createContext("/token", new ForwardedViaProxyHandler(new FakeOAuth2HttpHandler()));
}

@After
public void tearDownHttpServer() {
upstreamServer.removeContext("/");
upstreamServer.removeContext("/token");
}

@Override
protected String repositoryType() {
return GoogleCloudStorageRepository.TYPE;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(GoogleCloudStoragePlugin.class);
}

@Override
protected Settings repositorySettings(String repoName) {
return Settings.builder()
.put(super.repositorySettings(repoName))
.put(BUCKET.getKey(), "bucket")
.put(CLIENT_NAME.getKey(), "test")
.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
var secureSettings = new MockSecureSettings();
secureSettings.setFile(
CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(),
TestUtils.createServiceAccount(random())
);
String upstreamServerUrl = "http://" + upstreamServer.getAddress().getHostString() + ":" + upstreamServer.getAddress().getPort();
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), upstreamServerUrl)
.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), upstreamServerUrl + "/token")
.put(PROXY_HOST_SETTING.getConcreteSettingForNamespace("test").getKey(), proxyServer.getHost())
.put(PROXY_PORT_SETTING.getConcreteSettingForNamespace("test").getKey(), proxyServer.getPort())
.put(PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), "http")
.setSecureSettings(secureSettings)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.gcs;

import org.elasticsearch.core.Strings;
import org.elasticsearch.core.SuppressForbidden;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.nio.charset.StandardCharsets.ISO_8859_1;

/**
* Emulates a <a href="https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers">Web Proxy Server</a>
*/
class WebProxyServer extends MockHttpProxyServer {

private static final Set<String> BLOCKED_HEADERS = Stream.of("Host", "Proxy-Connection", "Proxy-Authenticate")
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));

WebProxyServer() throws IOException {
super(WebProxyServer::handle);
}

@SuppressForbidden(reason = "Proxy makes requests to the upstream HTTP server")
private static void handle(InputStream is, OutputStream os) throws IOException {
// We can't make a com.sun.net.httpserver act as an HTTP proxy, so we have to do work with
// raw sockets and do HTTP parsing ourselves
String requestLine = readLine(is);
String[] parts = requestLine.split(" ");
String requestMethod = parts[0];
String originUrl = parts[1];

var upstreamHttpConnection = (HttpURLConnection) URI.create(originUrl).toURL().openConnection();
upstreamHttpConnection.setRequestMethod(requestMethod);
upstreamHttpConnection.setRequestProperty("X-Via", "test-web-proxy-server");

int requestContentLength = -1;
boolean chunkedRequest = false;
while (true) {
String requestHeader = readLine(is);
if (requestHeader.isEmpty()) {
// End of the headers block
break;
}
String[] headerParts = requestHeader.split(":");
String headerName = headerParts[0].trim();
String headerValue = headerParts[1].trim();
if (headerName.equalsIgnoreCase("Content-Length")) {
requestContentLength = Integer.parseInt(headerValue);
} else if (headerName.equalsIgnoreCase("Transfer-Encoding") && headerValue.equalsIgnoreCase("chunked")) {
chunkedRequest = true;
}
if (BLOCKED_HEADERS.contains(headerName) == false) {
upstreamHttpConnection.setRequestProperty(headerName, headerValue);
}
}
if (requestContentLength > 0) {
upstreamHttpConnection.setDoOutput(true);
try (var uos = upstreamHttpConnection.getOutputStream()) {
uos.write(is.readNBytes(requestContentLength));
}
} else if (chunkedRequest) {
upstreamHttpConnection.setDoOutput(true);
upstreamHttpConnection.setChunkedStreamingMode(0);
try (var uos = upstreamHttpConnection.getOutputStream()) {
while (true) {
int chunkSize = Integer.parseInt(readLine(is), 16);
if (chunkSize == 0) {
// End of the chunked body
break;
}
uos.write(is.readNBytes(chunkSize));
if (is.read() != '\r' || is.read() != '\n') {
throw new IllegalStateException("Not CRLF");
}
}
}
}
upstreamHttpConnection.connect();

String upstreamStatusLine = Strings.format(
"HTTP/1.1 %s %s\r\n",
upstreamHttpConnection.getResponseCode(),
upstreamHttpConnection.getResponseMessage()
);
os.write(upstreamStatusLine.getBytes(ISO_8859_1));
StringBuilder responseHeaders = new StringBuilder();
for (var upstreamHeader : upstreamHttpConnection.getHeaderFields().entrySet()) {
if (upstreamHeader.getKey() == null) {
continue;
}
responseHeaders.append(upstreamHeader.getKey()).append(": ");
for (int i = 0; i < upstreamHeader.getValue().size(); i++) {
responseHeaders.append(upstreamHeader.getValue().get(i));
if (i < upstreamHeader.getValue().size() - 1) {
responseHeaders.append(",");
}
}
responseHeaders.append("\r\n");
}
responseHeaders.append("\r\n");
os.write(responseHeaders.toString().getBytes(ISO_8859_1));
// HttpURLConnection handles chunked and fixed-length responses transparently
try (var uis = upstreamHttpConnection.getInputStream()) {
uis.transferTo(os);
}
}

private static String readLine(InputStream is) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
while (true) {
int b = is.read();
if (b == -1) {
break;
}
if (b == '\r') {
if (is.read() != '\n') {
throw new IllegalStateException("Not CRLF");
}
break;
}
os.write(b);
}
return os.toString(ISO_8859_1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.repositories.gcs;

import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.services.storage.StorageScopes;
import com.google.auth.oauth2.ServiceAccountCredentials;

Expand Down Expand Up @@ -150,9 +151,7 @@ public class GoogleCloudStorageClientSettings {
final TimeValue readTimeout,
final String applicationName,
final URI tokenUri,
final Proxy.Type proxyType,
final String proxyHost,
final Integer proxyPort
final Proxy proxy
) {
this.credential = credential;
this.endpoint = endpoint;
Expand All @@ -161,13 +160,7 @@ public class GoogleCloudStorageClientSettings {
this.readTimeout = readTimeout;
this.applicationName = applicationName;
this.tokenUri = tokenUri;
try {
proxy = proxyType.equals(Proxy.Type.DIRECT)
? null
: new Proxy(proxyType, new InetSocketAddress(InetAddress.getByName(proxyHost), proxyPort));
} catch (UnknownHostException e) {
throw new SettingsException("GCS proxy host is unknown.", e);
}
this.proxy = proxy;
}

public ServiceAccountCredentials getCredential() {
Expand Down Expand Up @@ -217,17 +210,26 @@ public static Map<String, GoogleCloudStorageClientSettings> load(final Settings
}

static GoogleCloudStorageClientSettings getClientSettings(final Settings settings, final String clientName) {
Proxy.Type proxyType = getConfigValue(settings, clientName, PROXY_TYPE_SETTING);
String proxyHost = getConfigValue(settings, clientName, PROXY_HOST_SETTING);
Integer proxyPort = getConfigValue(settings, clientName, PROXY_PORT_SETTING);
Proxy proxy;
try {
proxy = proxyType.equals(Proxy.Type.DIRECT)
? null
: new Proxy(proxyType, new InetSocketAddress(InetAddress.getByName(proxyHost), proxyPort));
} catch (UnknownHostException e) {
throw new SettingsException("GCS proxy host is unknown.", e);
}
return new GoogleCloudStorageClientSettings(
loadCredential(settings, clientName),
loadCredential(settings, clientName, proxy),
getConfigValue(settings, clientName, ENDPOINT_SETTING),
getConfigValue(settings, clientName, PROJECT_ID_SETTING),
getConfigValue(settings, clientName, CONNECT_TIMEOUT_SETTING),
getConfigValue(settings, clientName, READ_TIMEOUT_SETTING),
getConfigValue(settings, clientName, APPLICATION_NAME_SETTING),
getConfigValue(settings, clientName, TOKEN_URI_SETTING),
getConfigValue(settings, clientName, PROXY_TYPE_SETTING),
getConfigValue(settings, clientName, PROXY_HOST_SETTING),
getConfigValue(settings, clientName, PROXY_PORT_SETTING)
proxy
);
}

Expand All @@ -243,7 +245,7 @@ static GoogleCloudStorageClientSettings getClientSettings(final Settings setting
* @return the {@link ServiceAccountCredentials} to use for the given client,
* {@code null} if no service account is defined.
*/
static ServiceAccountCredentials loadCredential(final Settings settings, final String clientName) {
static ServiceAccountCredentials loadCredential(final Settings settings, final String clientName, @Nullable Proxy proxy) {
final var credentialsFileSetting = CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(clientName);
try {
if (credentialsFileSetting.exists(settings) == false) {
Expand All @@ -254,7 +256,8 @@ static ServiceAccountCredentials loadCredential(final Settings settings, final S
try (InputStream credStream = credentialsFileSetting.get(settings)) {
final Collection<String> scopes = Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL);
return SocketAccess.doPrivilegedIOException(() -> {
final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(credStream);
NetHttpTransport netHttpTransport = new NetHttpTransport.Builder().setProxy(proxy).build();
final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(credStream, () -> netHttpTransport);
if (credentials.createScopedRequired()) {
return (ServiceAccountCredentials) credentials.createScoped(scopes);
}
Expand Down

0 comments on commit b3ff847

Please sign in to comment.