Skip to content

Commit

Permalink
Merge branch 'dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS committed May 21, 2024
2 parents f0d014f + e4df01a commit cbbe09d
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 168 deletions.
2 changes: 1 addition & 1 deletion docs/docs/zh/contribute/join/contribute.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
如果你想实现某个 Feature 或者修复某个 Bug。请参考以下内容:

* 所有的 Bug 与新 Feature 建议使用 Issues Page 进行管理。
* 如果想要开发实现某个 Feature 功能,请先回复该功能所关联的 Issue,表明你当前正在这个 Issue 上工作。 并在回复的时候为自己设置一个 **deadline**并添加的回复内容中
* 如果想要开发实现某个 Feature 功能,请先回复该功能所关联的 Issue,表明你当前正在这个 Issue 上工作。 并在回复的时候为自己设置一个 **deadline**并添加到回复内容中
* 最好在核心贡献者找到一个导师(指导者),导师会在设计与功能实现上给予即时的反馈。
* 你应该新建一个分支来开始你的工作,分支的名字参考[参与贡献 Pull Request 需知](./pull-request.md)。比如,你想完成 feature 功能并提交了 Issue 111,那么你的 branch 名字应为 feature-111。 功能名称可与导师讨论后确定。
* 完成后,发送一个 Pull Request 到 dolphinscheduler,提交过程具体请参考下面《[提交代码流程](./submit-code.md)》。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.Bootstrap;
Expand All @@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {

private final Bootstrap bootstrap = new Bootstrap();

private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
private final ReentrantLock channelsLock = new ReentrantLock();
private final Map<Host, Channel> channels = new ConcurrentHashMap<>();

private final AtomicBoolean isStarted = new AtomicBoolean(false);

Expand Down Expand Up @@ -104,9 +107,10 @@ public void initChannel(SocketChannel ch) {
isStarted.compareAndSet(false, true);
}

public IRpcResponse sendSync(final Host host, final Transporter transporter,
public IRpcResponse sendSync(final Host host,
final Transporter transporter,
final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
Expand Down Expand Up @@ -137,36 +141,43 @@ public IRpcResponse sendSync(final Host host, final Transporter transporter,
return iRpcResponse;
}

private Channel getChannel(Host host) {
private Channel getOrCreateChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
return createChannel(host, true);
try {
channelsLock.lock();
channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
channel = createChannel(host);
channels.put(host, channel);
} finally {
channelsLock.unlock();
}
return channel;
}

/**
* create channel
*
* @param host host
* @param isSync sync flag
* @param host host
* @return channel
*/
private Channel createChannel(Host host, boolean isSync) {
private Channel createChannel(Host host) {
try {
ChannelFuture future;
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if (isSync) {
future.sync();
}
future.await(clientConfig.getConnectTimeoutMillis());
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(host, channel);
return channel;
return future.channel();
} else {
throw new IllegalArgumentException("connect to host: " + host + " failed", future.cause());
}
throw new IllegalArgumentException("connect to host: " + host + " failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connect to host: " + host + " failed", e);
Expand All @@ -189,16 +200,23 @@ public void close() {
}

private void closeChannels() {
for (Channel channel : this.channels.values()) {
channel.close();
try {
channelsLock.lock();
channels.values().forEach(Channel::close);
} finally {
channelsLock.unlock();
}
this.channels.clear();
}

public void closeChannel(Host host) {
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
try {
channelsLock.lock();
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
}
} finally {
channelsLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base.utils;

import org.junit.jupiter.api.Test;

import com.google.common.truth.Truth;

class HostTest {

@Test
void testEquals() {
Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;

import org.apache.commons.lang3.time.DurationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
Expand Down Expand Up @@ -76,8 +77,8 @@ final class ZookeeperRegistry implements Registry {
.connectString(properties.getConnectString())
.retryPolicy(retryPolicy)
.namespace(properties.getNamespace())
.sessionTimeoutMs((int) properties.getSessionTimeout().toMillis())
.connectionTimeoutMs((int) properties.getConnectionTimeout().toMillis());
.sessionTimeoutMs(DurationUtils.toMillisInt(properties.getSessionTimeout()))
.connectionTimeoutMs(DurationUtils.toMillisInt(properties.getConnectionTimeout()));

final String digest = properties.getDigest();
if (!Strings.isNullOrEmpty(digest)) {
Expand All @@ -102,9 +103,10 @@ public List<ACL> getAclForPath(final String path) {
public void start() {
client.start();
try {
if (!client.blockUntilConnected((int) properties.getBlockUntilConnected().toMillis(), MILLISECONDS)) {
if (!client.blockUntilConnected(DurationUtils.toMillisInt(properties.getBlockUntilConnected()),
MILLISECONDS)) {
client.close();
throw new RegistryException("zookeeper connect timeout: " + properties.getConnectString());
throw new RegistryException("zookeeper connect failed in : " + properties.getConnectString() + "ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -120,7 +122,7 @@ public void addConnectionStateListener(ConnectionListener listener) {
@Override
public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException {
try {
if (!client.blockUntilConnected((int) timeout.toMillis(), MILLISECONDS)) {
if (!client.blockUntilConnected(DurationUtils.toMillisInt(timeout), MILLISECONDS)) {
throw new RegistryException(
String.format("Cannot connect to registry in %s s", timeout.getSeconds()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ registry:
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
block-until-connected: 3s
digest: ~
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public static void clear() {
workerTaskExecutorMap.clear();
}

public static int size() {
return workerTaskExecutorMap.size();
}

public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
return workerTaskExecutorMap.values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WorkerTaskExecutorThreadPool {
public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads());
threadPoolExecutor.prestartAllCoreThreads();
this.workerConfig = workerConfig;

WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
Expand All @@ -64,15 +65,19 @@ public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) {
}

public boolean isOverload() {
return threadPoolExecutor.getQueue().size() > 0;
return WorkerTaskExecutorHolder.size() >= workerConfig.getExecThreads();
}

public int getWaitingTaskExecutorSize() {
return threadPoolExecutor.getQueue().size();
if (WorkerTaskExecutorHolder.size() <= workerConfig.getExecThreads()) {
return 0;
} else {
return WorkerTaskExecutorHolder.size() - workerConfig.getExecThreads();
}
}

public int getRunningTaskExecutorSize() {
return threadPoolExecutor.getActiveCount();
return Math.min(WorkerTaskExecutorHolder.size(), workerConfig.getExecThreads());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKill
taskExecutionContext
.setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);

WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskInstanceId);
messageRetryRunner.removeRetryMessages(taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
LogUtils.removeTaskInstanceIdMDC();
Expand Down
Loading

0 comments on commit cbbe09d

Please sign in to comment.