Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ public interface NodeRegistry extends HasReadyState, Closeable {
DistributorStatus getStatus();

/**
* Gets all available nodes that are not DOWN or DRAINING.
* Get all nodes that are UP.
*
* @return Set of UP node statuses.
*/
Set<NodeStatus> getUpNodes();

/**
* Gets all available nodes that are not DOWN or DRAINING and has free slots.
*
* @return Set of available node statuses.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ private SlotId reserveSlot(RequestId requestId, Capabilities caps) {
}

private boolean isNotSupported(Capabilities caps) {
return getAvailableNodes().stream().noneMatch(node -> node.hasCapability(caps, slotMatcher));
return nodeRegistry.getUpNodes().stream()
.noneMatch(node -> node.hasCapability(caps, slotMatcher));
}

private boolean reserve(SlotId id) {
Expand Down Expand Up @@ -560,21 +561,28 @@ public void run() {
}

private void checkMatchingSlot(List<SessionRequestCapability> sessionRequests) {
for (SessionRequestCapability request : sessionRequests) {
long unmatchableCount =
request.getDesiredCapabilities().stream()
.filter(LocalDistributor.this::isNotSupported)
.count();

if (unmatchableCount == request.getDesiredCapabilities().size()) {
LOG.info(
"No nodes support the capabilities in the request: "
+ request.getDesiredCapabilities());
SessionNotCreatedException exception =
new SessionNotCreatedException("No nodes support the capabilities in the request");
sessionQueue.complete(request.getRequestId(), Either.left(exception));
}
}
// Get UP nodes once to avoid lock & loop over multiple requests
Set<NodeStatus> upNodes = nodeRegistry.getUpNodes();
// Filter and reject only requests where NO capabilities are supported by ANY UP node
// This prevents rejecting requests when nodes support capabilities but are just busy
sessionRequests.stream()
.filter(
request ->
request.getDesiredCapabilities().stream()
.noneMatch(
caps ->
upNodes.stream()
.anyMatch(node -> node.hasCapability(caps, slotMatcher))))
.forEach(
request -> {
LOG.info(
"No nodes support the capabilities in the request: "
+ request.getDesiredCapabilities());
SessionNotCreatedException exception =
new SessionNotCreatedException(
"No nodes support the capabilities in the request");
sessionQueue.complete(request.getRequestId(), Either.left(exception));
});
}

private void handleNewSessionRequest(SessionRequest sessionRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,19 @@ public DistributorStatus getStatus() {

@Override
public Set<NodeStatus> getAvailableNodes() {
// Filter nodes are UP and have capacity (available slots)
return getUpNodes().stream()
.filter(NodeStatus::hasCapacity)
.collect(ImmutableSet.toImmutableSet());
}

@Override
public Set<NodeStatus> getUpNodes() {
Lock readLock = this.lock.readLock();
readLock.lock();
try {
return model.getSnapshot().stream()
// Filter nodes are UP and have capacity (available slots)
.filter(node -> UP.equals(node.getAvailability()) && node.hasCapacity())
.filter(node -> UP.equals(node.getAvailability()))
.collect(ImmutableSet.toImmutableSet());
} finally {
readLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,140 @@ void shouldReduceRedundantSlotChecks() throws URISyntaxException {
}
}

@Test
void shouldNotRejectRequestsWhenNodesHaveCapabilityButNoFreeSlots() throws URISyntaxException {
// Create a distributor with rejectUnsupportedCaps enabled
NewSessionQueue queue =
new LocalNewSessionQueue(
tracer,
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalDistributor distributor =
new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(localNode),
new LocalSessionMap(tracer, bus),
queue,
new DefaultSlotSelector(),
registrationSecret,
Duration.ofMinutes(5),
true, // Enable rejectUnsupportedCaps
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

// Create a node that supports Chrome with single slot
URI nodeUri = new URI("http://example:1234");
Node node =
LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret)
.add(
new ImmutableCapabilities("browserName", "chrome"),
new TestSessionFactory(
(id, c) ->
new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now())))
.maximumConcurrentSessions(1)
.build();
distributor.add(node);

// Occupy the node's only slot
SessionRequest sessionRequest =
new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "chrome")),
Map.of(),
Map.of());
Either<SessionNotCreatedException, CreateSessionResponse> result =
distributor.newSession(sessionRequest);
assertThat(result.isRight()).isTrue(); // Session created successfully

// Verify node has no available capacity but still supports Chrome
assertThat(distributor.getAvailableNodes()).isEmpty(); // No available nodes

// Test that the distributor status shows the node is still UP and supports Chrome
// even though it has no available capacity
DistributorStatus status = distributor.getStatus();
Set<NodeStatus> allNodes = status.getNodes();
assertThat(allNodes).hasSize(1);

NodeStatus nodeStatus = allNodes.iterator().next();
assertThat(nodeStatus.getAvailability()).isEqualTo(UP);

// Verify the node still supports Chrome capability even with no free slots
boolean supportsChrome =
nodeStatus.hasCapability(
new ImmutableCapabilities("browserName", "chrome"), new DefaultSlotMatcher());
assertThat(supportsChrome).isTrue();

// Verify the node has no capacity (all slots occupied)
assertThat(nodeStatus.hasCapacity()).isFalse();
}

@Test
void shouldRejectRequestsWhenNoNodesHaveCapability() throws URISyntaxException {
// Create a distributor with rejectUnsupportedCaps enabled
NewSessionQueue queue =
new LocalNewSessionQueue(
tracer,
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalDistributor distributor =
new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(localNode),
new LocalSessionMap(tracer, bus),
queue,
new DefaultSlotSelector(),
registrationSecret,
Duration.ofMinutes(5),
true, // Enable rejectUnsupportedCaps
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

// Create a node that only supports Chrome
URI nodeUri = new URI("http://example:1234");
Node node =
LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret)
.add(
new ImmutableCapabilities("browserName", "chrome"),
new TestSessionFactory(
(id, c) ->
new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now())))
.build();
distributor.add(node);

// Add a Firefox request to the queue (unsupported capability)
SessionRequest unsupportedRequest =
new SessionRequest(
new RequestId(UUID.randomUUID()),
Instant.now(),
Set.of(W3C),
Set.of(new ImmutableCapabilities("browserName", "firefox")),
Map.of(),
Map.of());
queue.addToQueue(unsupportedRequest);

// Wait for checkMatchingSlot to run and reject the request
wait.until(obj -> queue.getQueueContents().isEmpty());

// The request should be rejected and removed from queue
assertThat(queue.getQueueContents()).isEmpty();
}

@Test
void shouldHandleAllNodesFullyOccupied() throws URISyntaxException {
// Create a distributor
Expand Down
Loading