Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion java/src/org/openqa/selenium/grid/commands/Hub.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ protected Handlers createHandlers(Config config) {
distributorOptions.shouldRejectUnsupportedCaps(),
newSessionRequestOptions.getSessionRequestRetryInterval(),
distributorOptions.getNewSessionThreadPoolSize(),
distributorOptions.getSlotMatcher());
distributorOptions.getSlotMatcher(),
distributorOptions.getPurgeNodesInterval());
handler.addHandler(distributor);

Router router = new Router(tracer, clientFactory, sessions, queue, distributor);
Expand Down
3 changes: 2 additions & 1 deletion java/src/org/openqa/selenium/grid/commands/Standalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ protected Handlers createHandlers(Config config) {
distributorOptions.shouldRejectUnsupportedCaps(),
newSessionRequestOptions.getSessionRequestRetryInterval(),
distributorOptions.getNewSessionThreadPoolSize(),
distributorOptions.getSlotMatcher());
distributorOptions.getSlotMatcher(),
distributorOptions.getPurgeNodesInterval());
combinedHandler.addHandler(distributor);

Router router = new Router(tracer, clientFactory, sessions, queue, distributor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_DISTRIBUTOR_IMPLEMENTATION;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_HEALTHCHECK_INTERVAL;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_NEWSESSION_THREADPOOL_SIZE;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_PURGE_NODES_INTERVAL;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_REJECT_UNSUPPORTED_CAPS;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_SLOT_MATCHER;
import static org.openqa.selenium.grid.distributor.config.DistributorOptions.DEFAULT_SLOT_SELECTOR_IMPLEMENTATION;
Expand Down Expand Up @@ -115,6 +116,14 @@ public class DistributorFlags implements HasRoles {
@ConfigValue(section = DISTRIBUTOR_SECTION, name = "newsession-threadpool-size", example = "4")
public int newSessionThreadPoolSize = DEFAULT_NEWSESSION_THREADPOOL_SIZE;

@Parameter(
names = {"--purge-nodes-interval"},
description =
"How often, in seconds, will the Distributor purge Nodes that have been down for a while."
+ " This is calculated based on the heartbeat received from a particular node. ")
@ConfigValue(section = DISTRIBUTOR_SECTION, name = "purge-nodes-interval", example = "30")
public int purgeNodesInterval = DEFAULT_PURGE_NODES_INTERVAL;

@Override
public Set<Role> getRoles() {
return Collections.singleton(DISTRIBUTOR_ROLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class DistributorOptions {

public static final int DEFAULT_HEALTHCHECK_INTERVAL = 120;
public static final int DEFAULT_PURGE_NODES_INTERVAL = 30;
public static final String DISTRIBUTOR_SECTION = "distributor";
static final String DEFAULT_DISTRIBUTOR_IMPLEMENTATION =
"org.openqa.selenium.grid.distributor.local.LocalDistributor";
Expand Down Expand Up @@ -97,6 +98,17 @@ public Duration getHealthCheckInterval() {
return Duration.ofSeconds(seconds);
}

public Duration getPurgeNodesInterval() {
// If the user sets 0s or less, we default to 0s and disable the purge dead nodes service.
int seconds =
Math.max(
config
.getInt(DISTRIBUTOR_SECTION, "purge-nodes-interval")
.orElse(DEFAULT_PURGE_NODES_INTERVAL),
0);
return Duration.ofSeconds(seconds);
}

public Distributor getDistributor() {
return config.getClass(
DISTRIBUTOR_SECTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class LocalDistributor extends Distributor implements Closeable {
private final GridModel model;
private final Map<NodeId, Node> nodes;
private final SlotMatcher slotMatcher;
private final Duration purgeNodesInterval;

private final ScheduledExecutorService newSessionService =
Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -188,7 +189,8 @@ public LocalDistributor(
boolean rejectUnsupportedCaps,
Duration sessionRequestRetryInterval,
int newSessionThreadPoolSize,
SlotMatcher slotMatcher) {
SlotMatcher slotMatcher,
Duration purgeNodesInterval) {
super(tracer, clientFactory, registrationSecret);
this.tracer = Require.nonNull("Tracer", tracer);
this.bus = Require.nonNull("Event bus", bus);
Expand All @@ -202,6 +204,7 @@ public LocalDistributor(
this.nodes = new ConcurrentHashMap<>();
this.rejectUnsupportedCaps = rejectUnsupportedCaps;
this.slotMatcher = slotMatcher;
this.purgeNodesInterval = purgeNodesInterval;
Require.nonNull("Session request interval", sessionRequestRetryInterval);

bus.addListener(NodeStatusEvent.listener(this::register));
Expand Down Expand Up @@ -232,8 +235,14 @@ public LocalDistributor(
NewSessionRunnable newSessionRunnable = new NewSessionRunnable();
bus.addListener(NodeDrainComplete.listener(this::remove));

purgeDeadNodesService.scheduleAtFixedRate(
GuardedRunnable.guard(model::purgeDeadNodes), 30, 30, TimeUnit.SECONDS);
// Disable purge dead nodes service if interval is set to zero
if (!this.purgeNodesInterval.isZero()) {
purgeDeadNodesService.scheduleAtFixedRate(
GuardedRunnable.guard(model::purgeDeadNodes),
this.purgeNodesInterval.getSeconds(),
this.purgeNodesInterval.getSeconds(),
TimeUnit.SECONDS);
}

nodeHealthCheckService.scheduleAtFixedRate(
runNodeHealthChecks(),
Expand Down Expand Up @@ -276,7 +285,8 @@ public static Distributor create(Config config) {
distributorOptions.shouldRejectUnsupportedCaps(),
newSessionQueueOptions.getSessionRequestRetryInterval(),
distributorOptions.getNewSessionThreadPoolSize(),
distributorOptions.getSlotMatcher());
distributorOptions.getSlotMatcher(),
distributorOptions.getPurgeNodesInterval());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ void shouldBeAbleToRegisterALocalNode() throws URISyntaxException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

distributor =
new RemoteDistributor(
Expand Down Expand Up @@ -183,7 +184,8 @@ void shouldBeAbleToRegisterACustomNode() throws URISyntaxException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())) {
new DefaultSlotMatcher(),
Duration.ofSeconds(30))) {

distributor =
new RemoteDistributor(
Expand Down Expand Up @@ -223,7 +225,8 @@ void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxException
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())) {
new DefaultSlotMatcher(),
Duration.ofSeconds(30))) {

distributor =
new RemoteDistributor(
Expand Down Expand Up @@ -272,7 +275,8 @@ void shouldKeepOnlyOneNodeWhenTwoRegistrationsHaveTheSameUriByListeningForEvents
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())) {
new DefaultSlotMatcher(),
Duration.ofSeconds(30))) {

distributor =
new RemoteDistributor(
Expand Down Expand Up @@ -314,7 +318,8 @@ void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())) {
new DefaultSlotMatcher(),
Duration.ofSeconds(30))) {

distributor =
new RemoteDistributor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void drainedNodeDoesNotShutDownIfNotEmpty() throws InterruptedException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -139,7 +140,8 @@ void drainedNodeShutsDownAfterSessionsFinish() throws InterruptedException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -211,7 +213,8 @@ void testDrainedNodeShutsDownOnceEmpty() throws InterruptedException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -261,7 +264,8 @@ void drainingNodeDoesNotAcceptNewSessions() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
local.drain(node.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ void registeringTheSameNodeMultipleTimesOnlyCountsTheFirstTime() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

local.add(node);
local.add(node);
Expand Down Expand Up @@ -122,7 +123,8 @@ void shouldBeAbleToRemoveANode() throws MalformedURLException {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
Distributor distributor =
new RemoteDistributor(
tracer,
Expand Down Expand Up @@ -178,7 +180,8 @@ void shouldIncludeHostsThatAreUpInHostList() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
handler.addHandler(local);
local.add(alwaysDown);
waitForAllNodesToMeetCondition(local, 1, DOWN);
Expand Down Expand Up @@ -258,7 +261,8 @@ void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold() throws Interrupt
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
handler.addHandler(local);
local.add(node);

Expand Down Expand Up @@ -313,7 +317,8 @@ void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthCheckPasse
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
handler.addHandler(local);
local.add(node);
waitForAllNodesToMeetCondition(local, 1, DOWN);
Expand Down Expand Up @@ -366,7 +371,8 @@ void shouldBeAbleToAddANodeAndCreateASession() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ void creatingANewSessionWithoutANodeEndsInFailure() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
Either<SessionNotCreatedException, CreateSessionResponse> result =
local.newSession(createRequest(caps));
assertThatEither(result).isLeft();
Expand Down Expand Up @@ -109,7 +110,8 @@ void creatingASessionAddsItToTheSessionMap() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -160,7 +162,8 @@ void shouldReleaseSlotOnceSessionEnds() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -223,7 +226,8 @@ void shouldNotStartASessionIfTheCapabilitiesAreNotSupported() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
handler.addHandler(distributor);

Node node = createNode(caps, 1, 0);
Expand Down Expand Up @@ -273,7 +277,8 @@ void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(node);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -320,7 +325,8 @@ void shouldFallbackToSecondAvailableCapabilitiesIfFirstNotAvailable() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

local.add(firstNode);
local.add(secondNode);
Expand Down Expand Up @@ -368,7 +374,8 @@ void shouldFallbackToSecondAvailableCapabilitiesIfFirstThrowsOnCreation() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));
local.add(brokenNode);
local.add(node);
waitForAllNodesToHaveCapacity(local, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ void theMostLightlyLoadedNodeIsSelectedFirst() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())
new DefaultSlotMatcher(),
Duration.ofSeconds(30))
.add(heavy)
.add(medium)
.add(lightest)
Expand Down Expand Up @@ -144,7 +145,8 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher())
new DefaultSlotMatcher(),
Duration.ofSeconds(30))
.add(leastRecent);
waitToHaveCapacity(local);

Expand Down Expand Up @@ -218,7 +220,8 @@ void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

local.add(node);
waitToHaveCapacity(local);
Expand Down Expand Up @@ -267,7 +270,8 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() {
false,
Duration.ofSeconds(5),
newSessionThreadPoolSize,
new DefaultSlotMatcher());
new DefaultSlotMatcher(),
Duration.ofSeconds(30));

// Create all three Capability types
Capabilities edge = new ImmutableCapabilities("browserName", "edge");
Expand Down
Loading
Loading