Skip to content

Commit

Permalink
Merge branch 'dev' into issues/16028
Browse files Browse the repository at this point in the history
  • Loading branch information
rickchengx committed Jun 15, 2024
2 parents 155c1cf + 834c320 commit 22f8ec8
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {

/**
* select
* Selects a HostWeight from a collection of HostWeight objects.
* The selection is based on the current weight of each HostWeight.
* The HostWeight with the smallest current weight is selected.
*
* @param sources sources
* @return HostWeight
* @param sources A collection of HostWeight objects to select from.
* @return The selected HostWeight with the smallest current weight.
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
*/
public class RandomSelector extends AbstractSelector<HostWorker> {

/**
* This method selects a HostWorker from a collection of HostWorker objects using a weighted random algorithm.
* The selection is based on the weight of each HostWorker.
* A random number is generated and the HostWorker whose weight spans this random number is selected.
*
* @param source A collection of HostWorker objects to select from.
* @return The selected HostWorker based on the weighted random algorithm.
*/
@Override
public HostWorker doSelect(final Collection<HostWorker> source) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ void setLastUpdate(long lastUpdate) {

}

/**
* This method selects a HostWorker from a collection of HostWorker objects using a weighted round-robin algorithm.
* The selection is based on the current weight of each HostWorker.
* The HostWorker with the highest current weight is selected.
*
* @param source A collection of HostWorker objects to select from.
* @return The selected HostWorker with the highest current weight.
*/
@Override
public HostWorker doSelect(Collection<HostWorker> source) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.time.Duration;
import java.util.stream.Stream;

import lombok.SneakyThrows;
Expand Down Expand Up @@ -52,7 +53,7 @@ public static void setUpTestingServer() {
.withDatabaseName("dolphinscheduler")
.withNetwork(Network.newNetwork())
.withExposedPorts(3306)
.waitingFor(Wait.forHealthcheck());
.waitingFor(Wait.forHealthcheck().withStartupTimeout(Duration.ofSeconds(300)));

int exposedPort = RandomUtils.nextInt(10000, 65535);
mysqlContainer.setPortBindings(Lists.newArrayList(exposedPort + ":3306"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ private void registry() throws InterruptedException {
workerHeartBeat = workerHeartBeatTask.getHeartBeat();
Thread.sleep(SLEEP_TIME_MILLIS);
}
String workerZKPath = workerConfig.getWorkerRegistryPath();
String workerRegistryPath = workerConfig.getWorkerRegistryPath();
// remove before persist
registryClient.remove(workerZKPath);
registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
registryClient.remove(workerRegistryPath);
registryClient.persistEphemeral(workerRegistryPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to registry center {} successfully", workerConfig.getWorkerAddress(),
workerRegistryPath);

while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), RegistryNodeType.WORKER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
Expand Down

0 comments on commit 22f8ec8

Please sign in to comment.