Skip to content

Commit

Permalink
bzlmod: Add an HttpDownloader#downloadAndReadOneUrl method
Browse files Browse the repository at this point in the history
(#13316)

To implement the index registry outlined in the design doc, we need to make HTTP GET requests to certain URLs and read their contents immediately (instead of downloading and writing them to local disk).

Also added a way to distinguish a 404 from other unrecoverable errors, since 404 is semantically different from, say a 401, in terms of the registry list (earlier registries take precedence and delegate to later registries only when the requested module is not found).

PiperOrigin-RevId: 382304312
  • Loading branch information
Wyverald authored and Copybara-Service committed Jun 30, 2021
1 parent 18e635c commit d1120b6
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,17 @@ URLConnection connect(URL originalUrl, Function<URL, ImmutableMap<String, String
|| code == 505) { // Server refuses to support version quoth RFC7231 § 6.6.6
// This is a permanent error so we're not going to retry.
readAllBytesAndClose(connection.getErrorStream());
if (code == 404 || code == 410) {
// For Not Found, we throw a separate unrecoverable exception so that callers can
// distinguish between the resource being not found and the server being unavailable.
throw new FileNotFoundException(describeHttpResponse(connection));
}
throw new UnrecoverableHttpException(describeHttpResponse(connection));
} else {
// However we will retry on some 5xx errors, particularly 500 and 503.
throw new IOException(describeHttpResponse(connection));
}
} catch (UnrecoverableHttpException e) {
} catch (UnrecoverableHttpException | FileNotFoundException e) {
throw e;
} catch (IllegalArgumentException e) {
throw new UnrecoverableHttpException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.devtools.build.lib.util.JavaSleeper;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.lib.vfs.Path;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
Expand All @@ -46,7 +47,10 @@
*/
public class HttpDownloader implements Downloader {
private static final int MAX_PARALLEL_DOWNLOADS = 8;
private static final Semaphore semaphore = new Semaphore(MAX_PARALLEL_DOWNLOADS, true);
private static final Semaphore SEMAPHORE = new Semaphore(MAX_PARALLEL_DOWNLOADS, true);
private static final Clock CLOCK = new JavaClock();
private static final Sleeper SLEEPER = new JavaSleeper();
private static final Locale LOCALE = Locale.getDefault();

private float timeoutScaling = 1.0f;

Expand All @@ -67,17 +71,7 @@ public void download(
Map<String, String> clientEnv,
Optional<String> type)
throws IOException, InterruptedException {
Clock clock = new JavaClock();
Sleeper sleeper = new JavaSleeper();
Locale locale = Locale.getDefault();
ProxyHelper proxyHelper = new ProxyHelper(clientEnv);
HttpConnector connector =
new HttpConnector(locale, eventHandler, proxyHelper, sleeper, timeoutScaling);
ProgressInputStream.Factory progressInputStreamFactory =
new ProgressInputStream.Factory(locale, clock, eventHandler);
HttpStream.Factory httpStreamFactory = new HttpStream.Factory(progressInputStreamFactory);
HttpConnectorMultiplexer multiplexer =
new HttpConnectorMultiplexer(eventHandler, connector, httpStreamFactory);
HttpConnectorMultiplexer multiplexer = setUpConnectorMultiplexer(eventHandler, clientEnv);

// Iterate over urls and download the file falling back to the next url if previous failed,
// while reporting progress to the CLI.
Expand All @@ -86,7 +80,7 @@ public void download(
List<IOException> ioExceptions = ImmutableList.of();

for (URL url : urls) {
semaphore.acquire();
SEMAPHORE.acquire();

try (HttpStream payload = multiplexer.connect(url, checksum, authHeaders, type);
OutputStream out = destination.getOutputStream()) {
Expand All @@ -111,7 +105,7 @@ public void download(
Event.warn("Download from " + url + " failed: " + e.getClass() + " " + e.getMessage()));
continue;
} finally {
semaphore.release();
SEMAPHORE.release();
eventHandler.post(new FetchEvent(url.toString(), success));
}
}
Expand All @@ -134,4 +128,39 @@ public void download(
throw exception;
}
}

/** Downloads the contents of one URL and reads it into a byte array. */
public byte[] downloadAndReadOneUrl(
URL url, ExtendedEventHandler eventHandler, Map<String, String> clientEnv)
throws IOException, InterruptedException {
HttpConnectorMultiplexer multiplexer = setUpConnectorMultiplexer(eventHandler, clientEnv);

ByteArrayOutputStream out = new ByteArrayOutputStream();
SEMAPHORE.acquire();
try (HttpStream payload = multiplexer.connect(url, Optional.absent())) {
ByteStreams.copy(payload, out);
} catch (SocketTimeoutException e) {
// SocketTimeoutExceptions are InterruptedIOExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
throw new IOException(e);
} catch (InterruptedIOException e) {
throw new InterruptedException(e.getMessage());
} finally {
SEMAPHORE.release();
// TODO(wyv): Do we need to report any event here?
}
return out.toByteArray();
}

private HttpConnectorMultiplexer setUpConnectorMultiplexer(
ExtendedEventHandler eventHandler, Map<String, String> clientEnv) {
ProxyHelper proxyHelper = new ProxyHelper(clientEnv);
HttpConnector connector =
new HttpConnector(LOCALE, eventHandler, proxyHelper, SLEEPER, timeoutScaling);
ProgressInputStream.Factory progressInputStreamFactory =
new ProgressInputStream.Factory(LOCALE, CLOCK, eventHandler);
HttpStream.Factory httpStreamFactory = new HttpStream.Factory(progressInputStreamFactory);
return new HttpConnectorMultiplexer(eventHandler, connector, httpStreamFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.devtools.build.lib.testutil.ManualClock;
import com.google.devtools.build.lib.testutil.ManualSleeper;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -379,7 +380,7 @@ public Object call() throws Exception {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 404 Not Here",
"HTTP/1.1 401 Unauthorized",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
Expand All @@ -391,7 +392,39 @@ public Object call() throws Exception {
}
});
thrown.expect(IOException.class);
thrown.expectMessage("404 Not Here");
thrown.expectMessage("401 Unauthorized");
connector.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
url -> ImmutableMap.<String, String>of());
}
}

@Test
public void permanentErrorNotFound_doesNotRetryAndThrowsFileNotFoundException() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
new Callable<Object>() {
@Override
public Object call() throws Exception {
try (Socket socket = server.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 404 Not Found",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 0",
"",
"");
}
return null;
}
});
thrown.expect(FileNotFoundException.class);
thrown.expectMessage("404 Not Found");
connector.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
url -> ImmutableMap.<String, String>of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.devtools.build.lib.bazel.repository.downloader.HttpParser.readHttpRequest;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

Expand All @@ -28,6 +29,7 @@
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
Expand Down Expand Up @@ -333,4 +335,69 @@ private static byte[] readFile(Path path) throws IOException {

return data;
}

@Test
public void downloadAndReadOneUrl_ok() throws IOException, InterruptedException {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
try (Socket socket = server.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"hello");
}
return null;
});

assertThat(
new String(
httpDownloader.downloadAndReadOneUrl(
new URL(String.format("http://localhost:%d/foo", server.getLocalPort())),
eventHandler,
Collections.emptyMap()),
UTF_8))
.isEqualTo("hello");
}
}

@Test
public void downloadAndReadOneUrl_notFound() throws IOException, InterruptedException {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(
() -> {
try (Socket socket = server.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 404 Not Found",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"");
}
return null;
});

assertThrows(
FileNotFoundException.class,
() ->
httpDownloader.downloadAndReadOneUrl(
new URL(String.format("http://localhost:%d/foo", server.getLocalPort())),
eventHandler,
Collections.emptyMap()));
}
}
}

0 comments on commit d1120b6

Please sign in to comment.