Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -753,7 +754,7 @@ public void run() {
// starting the session, we just put the request back on the queue.
// This does mean, however, that under high contention, we might end
// up starving a session request.
Set<Capabilities> stereotypes =
Map<Capabilities, Long> stereotypes =
getAvailableNodes()
.stream()
.filter(NodeStatus::hasCapacity)
Expand All @@ -763,15 +764,15 @@ public void run() {
.getSlots()
.stream()
.map(Slot::getStereotype)
.collect(Collectors.toSet()))
.collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toSet());
.collect(Collectors.groupingBy(ImmutableCapabilities::new, Collectors.counting()));

if (!stereotypes.isEmpty()) {
Optional<SessionRequest> maybeRequest = sessionQueue.getNextAvailable(stereotypes);
maybeRequest.ifPresent(
List<SessionRequest> matchingRequests = sessionQueue.getNextAvailable(stereotypes);
matchingRequests.forEach(
req -> sessionCreatorExecutor.execute(() -> handleNewSessionRequest(req)));
loop = maybeRequest.isPresent();
loop = !matchingRequests.isEmpty();
} else {
loop = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.grid.data.SessionRequest;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.TypeToken;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpHandler;
Expand All @@ -30,16 +31,18 @@

import java.io.UncheckedIOException;
import java.lang.reflect.Type;
import java.util.Optional;
import java.util.Set;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST;
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;

class GetNextMatchingRequest implements HttpHandler {
private static final Type SET_OF_CAPABILITIES = new TypeToken<Set<Capabilities>>() {}.getType();
private static final Type MAP_OF_CAPABILITIES = new TypeToken<Map<String, Long>>() {}.getType();
private static final Json JSON = new Json();

private final Tracer tracer;
private final NewSessionQueue queue;
Expand All @@ -53,11 +56,18 @@ public GetNextMatchingRequest(Tracer tracer, NewSessionQueue queue) {
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
try (Span span = newSpanAsChildOf(tracer, req, "sessionqueue.getrequest")) {
HTTP_REQUEST.accept(span, req);
Set<Capabilities> stereotypes = Contents.fromJson(req, SET_OF_CAPABILITIES);
Map<String, Long> stereotypesJson = Contents.fromJson(req, MAP_OF_CAPABILITIES);

Optional<SessionRequest> maybeRequest = queue.getNextAvailable(stereotypes);
Map<Capabilities, Long> stereotypes = new HashMap<>();

HttpResponse response = new HttpResponse().setContent(Contents.asJson(singletonMap("value", maybeRequest.orElse(null))));
stereotypesJson.forEach((k,v) -> {
Capabilities caps = JSON.toType(k, Capabilities.class);
stereotypes.put(caps, v);
});

List<SessionRequest> sessionRequestList = queue.getNextAvailable(stereotypes);

HttpResponse response = new HttpResponse().setContent(Contents.asJson(singletonMap("value", sessionRequestList)));

HTTP_RESPONSE.accept(span, response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private RequestId requestIdFrom(Map<String, String> params) {

public abstract Optional<SessionRequest> remove(RequestId reqId);

public abstract Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes);
public abstract List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);

public abstract void complete(RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;

import org.openqa.selenium.Capabilities;
import org.openqa.selenium.ImmutableCapabilities;
import org.openqa.selenium.SessionNotCreatedException;
import org.openqa.selenium.concurrent.GuardedRunnable;
import org.openqa.selenium.grid.config.Config;
Expand Down Expand Up @@ -295,23 +296,33 @@ public Optional<SessionRequest> remove(RequestId reqId) {
}

@Override
public Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);

Predicate<Capabilities> matchesStereotype =
caps -> stereotypes.stream().anyMatch(stereotype -> slotMatcher.matches(stereotype, caps));
caps -> stereotypes.entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.anyMatch(entry -> {
boolean matches = slotMatcher.matches(entry.getKey(), caps);
if (matches) {
Long value = entry.getValue();
entry.setValue(value - 1);
}
return matches;
});

Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Optional<SessionRequest> maybeRequest =
queue.stream()
.filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
.findFirst();
List<SessionRequest> availableRequests = queue.stream()
.filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
.limit(10) // TODO: Batch size should be configurable via a flag
.collect(Collectors.toList());

maybeRequest.ifPresent(req -> this.remove(req.getRequestId()));
availableRequests.forEach(req -> this.remove(req.getRequestId()));

return maybeRequest;
return availableRequests;
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.openqa.selenium.remote.http.HttpMethod.DELETE;
import static org.openqa.selenium.remote.http.HttpMethod.GET;
Expand All @@ -59,6 +60,7 @@
public class RemoteNewSessionQueue extends NewSessionQueue {

private static final Type QUEUE_CONTENTS_TYPE = new TypeToken<List<SessionRequestCapability>>() {}.getType();
private static final Type SESSION_REQUEST_TYPE = new TypeToken<List<SessionRequest>>() {}.getType();
private static final Json JSON = new Json();
private final HttpClient client;
private final Filter addSecret;
Expand Down Expand Up @@ -128,17 +130,19 @@ public Optional<SessionRequest> remove(RequestId reqId) {
}

@Override
public Optional<SessionRequest> getNextAvailable(Set<Capabilities> stereotypes) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);

Map<String, Long> stereotypeJson = new HashMap<>();
stereotypes.forEach((k,v) -> stereotypeJson.put(JSON.toJson(k), v));

HttpRequest upstream = new HttpRequest(POST, "/se/grid/newsessionqueue/session/next")
.setContent(Contents.asJson(stereotypes));
.setContent(Contents.asJson(stereotypeJson));

HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream);
HttpResponse response = client.with(addSecret).execute(upstream);

SessionRequest value = Values.get(response, SessionRequest.class);

return Optional.ofNullable(value);
return Values.get(response, SESSION_REQUEST_TYPE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -520,10 +521,57 @@ void shouldBeAbleToReturnTheNextAvailableEntryThatMatchesAStereotype(Supplier<Te
Map.of(),
Map.of()));

Optional<SessionRequest> returned = queue.getNextAvailable(
Set.of(new ImmutableCapabilities("browserName", "cheese")));
Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 1L);

assertThat(returned).isEqualTo(Optional.of(expected));
List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned.get(0)).isEqualTo(expected);
}

@ParameterizedTest
@MethodSource("data")
void shouldBeAbleToReturnTheNextAvailableBatchThatMatchesStereotypes(Supplier<TestData> supplier) {
setup(supplier);

SessionRequest firstSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "cheese", "se:kind", "smoked")),
Map.of(),
Map.of());

SessionRequest secondSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "peas", "se:kind", "smoked")),
Map.of(),
Map.of());

SessionRequest thirdSessionRequest = new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "peas", "se:kind", "smoked")),
Map.of(),
Map.of());

localQueue.injectIntoQueue(firstSessionRequest);
localQueue.injectIntoQueue(secondSessionRequest);
localQueue.injectIntoQueue(thirdSessionRequest);

Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 2L);
stereotypes.put(new ImmutableCapabilities("browserName", "peas"), 2L);

List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned.size()).isEqualTo(3);
assertTrue(returned.contains(firstSessionRequest));
assertTrue(returned.contains(secondSessionRequest));
assertTrue(returned.contains(thirdSessionRequest));
}

@ParameterizedTest
Expand Down Expand Up @@ -551,10 +599,12 @@ void shouldNotReturnANextAvailableEntryThatDoesNotMatchTheStereotypes(Supplier<T
Map.of());
localQueue.injectIntoQueue(expected);

Optional<SessionRequest> returned = queue.getNextAvailable(
Set.of(new ImmutableCapabilities("browserName", "cheese")));
Map<Capabilities, Long> stereotypes = new HashMap<>();
stereotypes.put(new ImmutableCapabilities("browserName", "cheese"), 1L);

List<SessionRequest> returned = queue.getNextAvailable(stereotypes);

assertThat(returned).isEqualTo(Optional.of(expected));
assertThat(returned.get(0)).isEqualTo(expected);
}

static class TestData {
Expand Down