Skip to content

Commit

Permalink
[Feature-2815][server] One worker can belong to different workergroups
Browse files Browse the repository at this point in the history
  • Loading branch information
yangyichao-mango committed Jun 16, 2020
1 parent b21259d commit 476f787
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 89 deletions.
Expand Up @@ -16,29 +16,34 @@
*/
package org.apache.dolphinscheduler.api.service;

import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
import com.google.common.collect.Sets;

/**
* monitor service
*/
@Service
public class MonitorService extends BaseService{
public class MonitorService extends BaseService {

@Autowired
private ZookeeperMonitor zookeeperMonitor;
Expand Down Expand Up @@ -108,15 +113,41 @@ public Map<String,Object> queryZookeeperState(User loginUser) {
public Map<String,Object> queryWorker(User loginUser) {

Map<String, Object> result = new HashMap<>(5);
List<Server> masterServers = getServerListFromZK(false);

result.put(Constants.DATA_LIST, masterServers);
List<WorkerServerModel> workerServers = getServerListFromZK(false)
.stream()
.map((Server server) -> {
WorkerServerModel model = new WorkerServerModel();
model.setId(server.getId());
model.setHost(server.getHost());
model.setPort(server.getPort());
model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
model.setResInfo(server.getResInfo());
model.setCreateTime(server.getCreateTime());
model.setLastHeartbeatTime(server.getLastHeartbeatTime());
return model;
})
.collect(Collectors.toList());

Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
.stream()
.collect(Collectors.toMap(
(WorkerServerModel worker) -> {
String[] s = worker.getZkDirectories().iterator().next().split("/");
return s[s.length - 1];
}
, Function.identity()
, (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
return oldOne;
}));

result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
putMsg(result,Status.SUCCESS);

return result;
}

public List<Server> getServerListFromZK(boolean isMaster){
public List<Server> getServerListFromZK(boolean isMaster) {

checkNotNull(zookeeperMonitor);
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
Expand Down
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;


import java.util.Date;
import java.util.Set;

/**
* server
*/
public class WorkerServerModel {

/**
* id
*/
private int id;

/**
* host
*/
private String host;

/**
* port
*/
private int port;

/**
* worker directory in zookeeper
*/
private Set<String> zkDirectories;

/**
* resource info: CPU and memory
*/
private String resInfo;

/**
* create time
*/
private Date createTime;

/**
* laster heart beat time
*/
private Date lastHeartbeatTime;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public Set<String> getZkDirectories() {
return zkDirectories;
}

public void setZkDirectories(Set<String> zkDirectories) {
this.zkDirectories = zkDirectories;
}

public Date getLastHeartbeatTime() {
return lastHeartbeatTime;
}

public void setLastHeartbeatTime(Date lastHeartbeatTime) {
this.lastHeartbeatTime = lastHeartbeatTime;
}

public String getResInfo() {
return resInfo;
}

public void setResInfo(String resInfo) {
this.resInfo = resInfo;
}


}
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.dolphinscheduler.server.worker.config;

import java.util.Set;

import org.apache.dolphinscheduler.common.Constants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
Expand All @@ -41,8 +43,8 @@ public class WorkerConfig {
@Value("${worker.reserved.memory:0.5}")
private double workerReservedMemory;

@Value("${worker.group: default}")
private String workerGroup;
@Value("#{'${worker.groups: default}'.split(',')}")
private Set<String> workerGroups;

@Value("${worker.listen.port: 1234}")
private int listenPort;
Expand All @@ -55,12 +57,12 @@ public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}

public String getWorkerGroup() {
return workerGroup;
public Set<String> getWorkerGroups() {
return workerGroups;
}

public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
public void setWorkerGroups(Set<String> workerGroups) {
this.workerGroups = workerGroups;
}

public int getWorkerExecThreads() {
Expand Down

0 comments on commit 476f787

Please sign in to comment.