diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/api/strategy/JobShardingUnit.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/api/strategy/JobShardingUnit.java index fd4e4092fd..73f6a29ab5 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/api/strategy/JobShardingUnit.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/api/strategy/JobShardingUnit.java @@ -32,12 +32,16 @@ public final class JobShardingUnit { /** - * 作业服务器IP地址. + * 作业实例主键. */ - private final String serverIp; + private final String jobInstanceId; /** - * 作业实例主键. + * 获取作业服务器IP地址. + * + * @return 作业服务器IP地址 */ - private final String jobInstanceId; + public String getIp() { + return jobInstanceId.substring(0, jobInstanceId.indexOf("@-@")); + } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManager.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManager.java index c8837023bc..cb6c9866c2 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManager.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManager.java @@ -19,8 +19,8 @@ import com.dangdang.ddframe.job.lite.internal.listener.AbstractJobListener; import com.dangdang.ddframe.job.lite.internal.listener.AbstractListenerManager; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNode; import com.dangdang.ddframe.job.lite.internal.server.ServerNode; -import com.dangdang.ddframe.job.lite.internal.server.ServerOperationNode; import com.dangdang.ddframe.job.lite.internal.server.ServerService; import com.dangdang.ddframe.job.lite.internal.server.ServerStatus; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; @@ -44,17 +44,17 @@ public class ElectionListenerManager extends AbstractListenerManager { private final ElectionNode electionNode; - private final ServerNode serverNode; + private final InstanceNode instanceNode; - private final ServerOperationNode serverOperationNode; + private final ServerNode serverNode; public ElectionListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) { super(regCenter, jobName); leaderElectionService = new LeaderElectionService(regCenter, jobName); serverService = new ServerService(regCenter, jobName); electionNode = new ElectionNode(jobName); + instanceNode = new InstanceNode(jobName); serverNode = new ServerNode(jobName); - serverOperationNode = new ServerOperationNode(jobName); } @Override @@ -94,7 +94,7 @@ private boolean isLeaderCrashed() { } private boolean isServerEnabled() { - return serverOperationNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(new String(event.getData().getData())); + return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(new String(event.getData().getData())); } boolean isServerOff() { @@ -102,11 +102,11 @@ boolean isServerOff() { } private boolean isServerDisabled() { - return serverOperationNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(new String(event.getData().getData())); + return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(new String(event.getData().getData())); } private boolean isServerShutdown() { - return serverNode.isLocalInstancePath(path) && Type.NODE_REMOVED == event.getType(); + return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == event.getType(); } } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/execution/ExecutionNode.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/execution/ExecutionNode.java index cf7d5b2f0c..8845909a20 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/execution/ExecutionNode.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/execution/ExecutionNode.java @@ -34,7 +34,9 @@ public final class ExecutionNode { public static final String IP = ROOT + "/%s/ip"; - public static final String INSTANCE = ROOT + "/%s/instance"; + static final String INSTANCE_APPENDIX = "instance"; + + public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX; static final String RUNNING_APPENDIX = "running"; @@ -64,6 +66,10 @@ public static String getInstanceNode(final int item) { return String.format(INSTANCE, item); } + public boolean isInstanceNode(final String path) { + return path.startsWith(jobNodePath.getFullPath(ROOT)) && path.endsWith(INSTANCE_APPENDIX); + } + /** * 获取作业运行状态节点路径. * diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNode.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNode.java new file mode 100644 index 0000000000..a7a7f07747 --- /dev/null +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNode.java @@ -0,0 +1,78 @@ +/* + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.lite.internal.server; + +import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; +import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; + +/** + * Elastic Job运行实例节点名称的常量类. + * + * @author zhangliang + */ +public class InstanceNode { + + /** + * 运行实例信息根节点. + */ + public static final String ROOT = "instances"; + + private static final String INSTANCES = ROOT + "/%s"; + + private final String jobName; + + private final JobNodePath jobNodePath; + + public InstanceNode(final String jobName) { + this.jobName = jobName; + jobNodePath = new JobNodePath(jobName); + } + + /** + * 获取本地作业运行实例路径. + * + * @return 本地作业运行实例路径 + */ + public String getLocalInstanceNode() { + return String.format(INSTANCES, JobRegistry.getInstance().getJobInstanceId(jobName)); + } + + static String getInstanceNode(final String jobInstanceId) { + return String.format(INSTANCES, jobInstanceId); + } + + /** + * 判断给定路径是否为本地作业运行实例路径. + * + * @param path 待判断的路径 + * @return 是否为本地作业运行实例路径 + */ + public boolean isLocalInstancePath(final String path) { + return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, JobRegistry.getInstance().getJobInstanceId(jobName)))); + } + + /** + * 判断给定路径是否为作业运行实例路径. + * + * @param path 待判断的路径 + * @return 是否为作业运行实例路径 + */ + public boolean isInstancePath(final String path) { + return path.startsWith(jobNodePath.getFullPath(InstanceNode.ROOT)); + } +} diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManager.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManager.java index 5ed11d1619..d52e3a2f04 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManager.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManager.java @@ -24,6 +24,7 @@ import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController; import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; +import com.dangdang.ddframe.job.util.env.LocalHostService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.state.ConnectionState; @@ -38,7 +39,7 @@ public class JobOperationListenerManager extends AbstractListenerManager { private final String jobName; - private final ServerNode serverNode; + private final InstanceNode instanceNode; private final ServerService serverService; @@ -46,13 +47,16 @@ public class JobOperationListenerManager extends AbstractListenerManager { private final ExecutionService executionService; + private final LocalHostService localHostService; + public JobOperationListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) { super(regCenter, jobName); this.jobName = jobName; - serverNode = new ServerNode(jobName); + instanceNode = new InstanceNode(jobName); serverService = new ServerService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); executionService = new ExecutionService(regCenter, jobName); + localHostService = new LocalHostService(); } @Override @@ -70,7 +74,7 @@ public void stateChanged(final CuratorFramework client, final ConnectionState ne if (ConnectionState.LOST == newState) { jobScheduleController.pauseJob(); } else if (ConnectionState.RECONNECTED == newState) { - serverService.persistServerOnline(serverService.isLocalhostServerEnabled()); + serverService.persistServerOnline(serverService.isServerEnabled(localHostService.getIp())); executionService.clearRunningInfo(shardingService.getLocalHostShardingItems()); jobScheduleController.resumeJob(); } @@ -100,7 +104,7 @@ class JobShutdownStatusJobListener extends AbstractJobListener { @Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { - if (serverNode.isLocalInstancePath(path) && TreeCacheEvent.Type.NODE_REMOVED == event.getType() && null != JobRegistry.getInstance().getJobScheduleController(jobName)) { + if (instanceNode.isLocalInstancePath(path) && TreeCacheEvent.Type.NODE_REMOVED == event.getType() && null != JobRegistry.getInstance().getJobScheduleController(jobName)) { JobRegistry.getInstance().getJobScheduleController(jobName).shutdown(); serverService.removeInstanceStatus(); } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerNode.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerNode.java index 3a7d7bb1b4..dfb9636828 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerNode.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerNode.java @@ -17,71 +17,59 @@ package com.dangdang.ddframe.job.lite.internal.server; -import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; import com.dangdang.ddframe.job.util.env.LocalHostService; +import java.util.regex.Pattern; + /** - * Elastic Job服务器节点名称的常量类. + * Elastic Job服务器节点常量类. * * @author zhangliang */ public class ServerNode { /** - * 作业服务器信息根节点. + * 服务器信息根节点. */ public static final String ROOT = "servers"; - /** - * 作业实例信息根节点. - */ - public static final String INSTANCES_ROOT = "instances"; - - static final String INSTANCES = ROOT + "/%s/" + INSTANCES_ROOT + "/%s"; - - private final String jobName; + private static final String SERVERS = ROOT + "/%s"; private final String ip; private final JobNodePath jobNodePath; public ServerNode(final String jobName) { - this.jobName = jobName; ip = new LocalHostService().getIp(); jobNodePath = new JobNodePath(jobName); } - /** - * 获取本地作业运行实例路径. - * - * @return 本地作业运行实例路径 - */ - public String getLocalInstanceNode() { - return String.format(INSTANCES, ip, JobRegistry.getInstance().getJobInstanceId(jobName)); + String getServerNode() { + return getServerNode(ip); } - static String getInstanceNode(final String ip, final String jobInstanceId) { - return String.format(INSTANCES, ip, jobInstanceId); + String getServerNode(final String ip) { + return String.format(SERVERS, ip); } /** - * 判断给定路径是否为本地作业运行实例路径. + * 判断给定路径是否为作业服务器路径. * * @param path 待判断的路径 - * @return 是否为本地作业运行实例路径 + * @return 是否为作业服务器路径 */ - public boolean isLocalInstancePath(final String path) { - return path.equals(jobNodePath.getFullPath(String.format(INSTANCES, ip, JobRegistry.getInstance().getJobInstanceId(jobName)))); + public boolean isLocalServerPath(final String path) { + return path.equals(jobNodePath.getFullPath(String.format(SERVERS, ip))); } /** - * 判断给定路径是否为作业运行实例路径. + * 判断给定路径是否为作业服务器路径. * * @param path 待判断的路径 - * @return 是否为作业运行实例路径 + * @return 是否为作业服务器路径 */ - public boolean isInstancePath(final String path) { - return path.startsWith(jobNodePath.getFullPath(ServerNode.ROOT)) && path.contains(INSTANCES_ROOT); + public boolean isServerPath(final String path) { + return Pattern.compile(jobNodePath.getFullPath(ServerNode.ROOT) + "/" + LocalHostService.IP_REGEX).matcher(path).matches(); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNode.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNode.java deleted file mode 100644 index 1aacaddd15..0000000000 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNode.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 1999-2015 dangdang.com. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package com.dangdang.ddframe.job.lite.internal.server; - -import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; -import com.dangdang.ddframe.job.util.env.LocalHostService; - -import java.util.regex.Pattern; - -/** - * Elastic Job服务器节点操作常量类. - * - * @author zhangliang - */ -public class ServerOperationNode { - - private final String ip; - - private final JobNodePath jobNodePath; - - public ServerOperationNode(final String jobName) { - ip = new LocalHostService().getIp(); - jobNodePath = new JobNodePath(jobName); - } - - String getServerNode() { - return getServerNode(ip); - } - - String getServerNode(final String ip) { - return String.format(ServerNode.ROOT + "/%s", ip); - } - - /** - * 判断给定路径是否为作业服务器路径. - * - * @param path 待判断的路径 - * @return 是否为作业服务器路径 - */ - public boolean isLocalServerPath(final String path) { - return path.equals(jobNodePath.getFullPath(String.format(ServerNode.ROOT + "/%s", ip))); - } - - /** - * 判断给定路径是否为作业服务器路径. - * - * @param path 待判断的路径 - * @return 是否为作业服务器路径 - */ - public boolean isServerPath(final String path) { - return Pattern.compile(jobNodePath.getFullPath(ServerNode.ROOT) + "/" + LocalHostService.IP_REGEX).matcher(path).matches(); - } -} diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerService.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerService.java index 71d6f0700a..875764abcd 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerService.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/server/ServerService.java @@ -37,16 +37,16 @@ public class ServerService { private final JobNodeStorage jobNodeStorage; - private final ServerNode serverNode; + private final InstanceNode instanceNode; - private final ServerOperationNode serverOperationNode; + private final ServerNode serverNode; private final LocalHostService localHostService = new LocalHostService(); public ServerService(final CoordinatorRegistryCenter regCenter, final String jobName) { jobNodeStorage = new JobNodeStorage(regCenter, jobName); + instanceNode = new InstanceNode(jobName); serverNode = new ServerNode(jobName); - serverOperationNode = new ServerOperationNode(jobName); } /** @@ -55,8 +55,8 @@ public ServerService(final CoordinatorRegistryCenter regCenter, final String job * @param enabled 作业是否启用 */ public void persistServerOnline(final boolean enabled) { - jobNodeStorage.fillJobNode(serverOperationNode.getServerNode(), enabled ? "" : ServerStatus.DISABLED.name()); - jobNodeStorage.fillEphemeralJobNode(serverNode.getLocalInstanceNode(), InstanceStatus.READY.name()); + jobNodeStorage.fillJobNode(serverNode.getServerNode(), enabled ? "" : ServerStatus.DISABLED.name()); + jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), InstanceStatus.READY.name()); } /** @@ -65,31 +65,14 @@ public void persistServerOnline(final boolean enabled) { * @param status 服务器状态 */ public void updateInstanceStatus(final InstanceStatus status) { - jobNodeStorage.updateJobNode(serverNode.getLocalInstanceNode(), status.name()); + jobNodeStorage.updateJobNode(instanceNode.getLocalInstanceNode(), status.name()); } /** * 删除运行实例状态. */ public void removeInstanceStatus() { - jobNodeStorage.removeJobNodeIfExisted(serverNode.getLocalInstanceNode()); - } - - /** - * 获取所有分片单元列表. - * - * @return 所有分片单元列表 - */ - public List getAllShardingUnits() { - List servers = getAllServers(); - List result = new LinkedList<>(); - for (String each : servers) { - List jobInstances = jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT + "/" + each + "/" + ServerNode.INSTANCES_ROOT); - for (String jobInstanceId : jobInstances) { - result.add(new JobShardingUnit(each, jobInstanceId)); - } - } - return result; + jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstanceNode()); } /** @@ -98,26 +81,11 @@ public List getAllShardingUnits() { * @return 可分片的单元列表 */ public List getAvailableShardingUnits() { - List servers = getAllServers(); List result = new LinkedList<>(); - for (String each : servers) { - List jobInstances = getAvailableInstances(each); - for (String jobInstanceId : jobInstances) { - result.add(new JobShardingUnit(each, jobInstanceId)); - } - } - return result; - } - - private List getAvailableInstances(final String ip) { - List result = new LinkedList<>(); - if (ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverOperationNode.getServerNode(ip)))) { - return result; - } - List jobInstances = jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT + "/" + ip + "/" + ServerNode.INSTANCES_ROOT); - for (String each : jobInstances) { - if (jobNodeStorage.isJobNodeExisted(ServerNode.getInstanceNode(ip, each))) { - result.add(each); + for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) { + JobShardingUnit shardingUnit = new JobShardingUnit(each); + if (isServerEnabled(shardingUnit.getIp())) { + result.add(new JobShardingUnit(each)); } } return result; @@ -148,41 +116,39 @@ private List getAllServers() { /** * 判断作业服务器是否可用. * - * @param ip 作业服务器IP地址. + * @param ip 作业服务器IP地址 * @return 作业服务器是否可用 */ public boolean isAvailableServer(final String ip) { - return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverOperationNode.getServerNode(ip))) - && !jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT + "/" + ip + "/" + ServerNode.INSTANCES_ROOT).isEmpty(); + return isServerEnabled(ip) && hasOnlineInstances(ip); } /** - * 判断当前服务器是否是等待执行的状态. - * - * @return 当前服务器是否是等待执行的状态 + * 判断服务器是否启用. + * + * @param ip 作业服务器IP地址 + * @return 服务器是否启用 */ - public boolean isLocalhostServerReady() { - return isAvailableServer(localHostService.getIp()) && jobNodeStorage.isJobNodeExisted(serverNode.getLocalInstanceNode()) - && InstanceStatus.READY.name().equals(jobNodeStorage.getJobNodeDataDirectly(serverNode.getLocalInstanceNode())); + public boolean isServerEnabled(final String ip) { + return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip))); } - /** - * 判断当前服务器是否是启用状态. - * - * @return 当前服务器是否是启用状态 - */ - public boolean isLocalhostServerEnabled() { - return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverOperationNode.getServerNode())); + private boolean hasOnlineInstances(final String ip) { + for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) { + if (each.startsWith(ip)) { + return true; + } + } + return false; } /** - * 判断作业节点是否离线. + * 判断当前服务器是否是等待执行的状态. * - * @param ip 作业服务器IP - * @param jobInstanceId 作业实例主键 - * @return 作业节点是否离线 + * @return 当前服务器是否是等待执行的状态 */ - public boolean isOffline(final String ip, final String jobInstanceId) { - return !jobNodeStorage.isJobNodeExisted(ServerNode.getInstanceNode(ip, jobInstanceId)); + public boolean isLocalhostServerReady() { + return isAvailableServer(localHostService.getIp()) && jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstanceNode()) + && InstanceStatus.READY.name().equals(jobNodeStorage.getJobNodeDataDirectly(instanceNode.getLocalInstanceNode())); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManager.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManager.java index c80e5b9a14..a541e8d054 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManager.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManager.java @@ -19,11 +19,12 @@ import com.dangdang.ddframe.job.lite.internal.config.ConfigurationNode; import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory; +import com.dangdang.ddframe.job.lite.internal.execution.ExecutionNode; import com.dangdang.ddframe.job.lite.internal.execution.ExecutionService; import com.dangdang.ddframe.job.lite.internal.listener.AbstractJobListener; import com.dangdang.ddframe.job.lite.internal.listener.AbstractListenerManager; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNode; import com.dangdang.ddframe.job.lite.internal.server.ServerNode; -import com.dangdang.ddframe.job.lite.internal.server.ServerOperationNode; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import lombok.Setter; import org.apache.curator.framework.CuratorFramework; @@ -42,9 +43,11 @@ public class ShardingListenerManager extends AbstractListenerManager { private final ConfigurationNode configNode; + private final InstanceNode instanceNode; + private final ServerNode serverNode; - private final ServerOperationNode serverOperationNode; + private final ExecutionNode executionNode; @Setter private int currentShardingTotalCount; @@ -54,8 +57,9 @@ public ShardingListenerManager(final CoordinatorRegistryCenter regCenter, final shardingService = new ShardingService(regCenter, jobName); executionService = new ExecutionService(regCenter, jobName); configNode = new ConfigurationNode(jobName); + instanceNode = new InstanceNode(jobName); serverNode = new ServerNode(jobName); - serverOperationNode = new ServerOperationNode(jobName); + executionNode = new ExecutionNode(jobName); } @Override @@ -83,9 +87,21 @@ class ListenServersChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { - if (serverNode.isInstancePath(path) && TreeCacheEvent.Type.NODE_UPDATED != event.getType() || serverOperationNode.isServerPath(path)) { + if (isInstanceChange(event, path) || isServerChange(path) || isShardingChange(event, path)) { shardingService.setReshardingFlag(); } } + + private boolean isInstanceChange(final TreeCacheEvent event, final String path) { + return instanceNode.isInstancePath(path) && TreeCacheEvent.Type.NODE_UPDATED != event.getType(); + } + + private boolean isServerChange(final String path) { + return serverNode.isServerPath(path); + } + + private boolean isShardingChange(final TreeCacheEvent event, final String path) { + return executionNode.isInstanceNode(path) && TreeCacheEvent.Type.NODE_REMOVED == event.getType(); + } } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingService.java b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingService.java index 872eaaf753..5605d1c1de 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingService.java +++ b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingService.java @@ -180,12 +180,8 @@ public List getLocalHostShardingItems() { * * @return 是包含有分片节点的不在线服务器 */ + // TODO 删除 public boolean hasShardingInfoInOfflineServers() { - for (int i = 0; i < configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount(); i++) { - if (serverService.isOffline(jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getIpNode(i)), jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getInstanceNode(i)))) { - return true; - } - } return false; } @@ -199,7 +195,7 @@ public void execute(final CuratorTransactionFinal curatorTransactionFinal) throw for (JobShardingResult each : shardingResults) { for (int shardingItem : each.getShardingItems()) { curatorTransactionFinal.create().withMode(CreateMode.EPHEMERAL) - .forPath(jobNodePath.getFullPath(ExecutionNode.getIpNode(shardingItem)), each.getJobShardingUnit().getServerIp().getBytes()).and(); + .forPath(jobNodePath.getFullPath(ExecutionNode.getIpNode(shardingItem)), each.getJobShardingUnit().getIp().getBytes()).and(); curatorTransactionFinal.create().withMode(CreateMode.EPHEMERAL) .forPath(jobNodePath.getFullPath(ExecutionNode.getInstanceNode(shardingItem)), each.getJobShardingUnit().getJobInstanceId().getBytes()).and(); } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/AverageAllocationJobShardingStrategyTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/AverageAllocationJobShardingStrategyTest.java index 0161bde3d3..363024709b 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/AverageAllocationJobShardingStrategyTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/AverageAllocationJobShardingStrategyTest.java @@ -42,51 +42,51 @@ public void shardingForZeroServer() { @Test public void shardingForOneServer() { - Collection expected = Collections.singletonList(new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Arrays.asList(0, 1, 2))); - assertThat(jobShardingStrategy.sharding(Collections.singletonList(new JobShardingUnit("host0", "127.0.0.1@-@0")), getJobShardingMetadata(3)), is(expected)); + Collection expected = Collections.singletonList(new JobShardingResult(new JobShardingUnit("host0@-@0"), Arrays.asList(0, 1, 2))); + assertThat(jobShardingStrategy.sharding(Collections.singletonList(new JobShardingUnit("host0@-@0")), getJobShardingMetadata(3)), is(expected)); } @Test public void shardingForServersMoreThanShardingCount() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.singletonList(0)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.singletonList(1)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.emptyList())); + new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.singletonList(0)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.singletonList(1)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.emptyList())); assertThat(jobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), getJobShardingMetadata(2)), is(expected)); } @Test public void shardingForServersLessThanShardingCountAliquot() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Arrays.asList(0, 1, 2)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Arrays.asList(3, 4, 5)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Arrays.asList(6, 7, 8))); + new JobShardingResult(new JobShardingUnit("host0@-@0"), Arrays.asList(0, 1, 2)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Arrays.asList(3, 4, 5)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Arrays.asList(6, 7, 8))); assertThat(jobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), getJobShardingMetadata(9)), is(expected)); } @Test public void shardingForServersLessThanShardingCountAliquantFor8ShardingCountAnd3Servers() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Arrays.asList(0, 1, 6)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Arrays.asList(2, 3, 7)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Arrays.asList(4, 5))); + new JobShardingResult(new JobShardingUnit("host0@-@0"), Arrays.asList(0, 1, 6)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Arrays.asList(2, 3, 7)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Arrays.asList(4, 5))); assertThat(jobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), getJobShardingMetadata(8)), is(expected)); } @Test public void shardingForServersLessThanShardingCountAliquantFor10ShardingCountAnd3Servers() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Arrays.asList(0, 1, 2, 9)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Arrays.asList(3, 4, 5)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Arrays.asList(6, 7, 8))); + new JobShardingResult(new JobShardingUnit("host0@-@0"), Arrays.asList(0, 1, 2, 9)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Arrays.asList(3, 4, 5)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Arrays.asList(6, 7, 8))); assertThat(jobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), getJobShardingMetadata(10)), is(expected)); } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/OdevitySortByNameJobShardingStrategyTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/OdevitySortByNameJobShardingStrategyTest.java index 7a30bc543d..32cac5bfbb 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/OdevitySortByNameJobShardingStrategyTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/OdevitySortByNameJobShardingStrategyTest.java @@ -37,22 +37,22 @@ public final class OdevitySortByNameJobShardingStrategyTest { @Test public void assertShardingByAsc() { Collection expected = new LinkedList<>(); - expected.add(new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.singletonList(0))); - expected.add(new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.singletonList(1))); - expected.add(new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.emptyList())); + expected.add(new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.singletonList(0))); + expected.add(new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.singletonList(1))); + expected.add(new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.emptyList())); assertThat(odevitySortByNameJobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), new JobShardingMetadata("1", 2)), is(expected)); } @Test public void assertShardingByDesc() { Collection expected = new LinkedList<>(); - expected.add(new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.singletonList(0))); - expected.add(new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.singletonList(1))); - expected.add(new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.emptyList())); + expected.add(new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.singletonList(0))); + expected.add(new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.singletonList(1))); + expected.add(new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.emptyList())); assertThat(odevitySortByNameJobShardingStrategy.sharding( - Arrays.asList(new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + Arrays.asList(new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), new JobShardingMetadata("0", 2)), is(expected)); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/RotateServerByNameJobShardingStrategyTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/RotateServerByNameJobShardingStrategyTest.java index bb8b0f18aa..a3f5281b0a 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/RotateServerByNameJobShardingStrategyTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/api/strategy/impl/RotateServerByNameJobShardingStrategyTest.java @@ -36,33 +36,33 @@ public final class RotateServerByNameJobShardingStrategyTest { @Test public void assertSharding1() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.singletonList(0)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.singletonList(1)), - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.emptyList())); + new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.singletonList(0)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.singletonList(1)), + new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.emptyList())); assertThat(rotateServerByNameJobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), new JobShardingMetadata("1", 2)), is(expected)); } @Test public void assertSharding2() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.singletonList(0)), - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.singletonList(1)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.emptyList())); + new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.singletonList(0)), + new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.singletonList(1)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.emptyList())); assertThat(rotateServerByNameJobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), new JobShardingMetadata("2", 2)), is(expected)); } @Test public void assertSharding3() { Collection expected = Arrays.asList( - new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Collections.singletonList(0)), - new JobShardingResult(new JobShardingUnit("host1", "127.0.0.1@-@0"), Collections.singletonList(1)), - new JobShardingResult(new JobShardingUnit("host2", "127.0.0.1@-@0"), Collections.emptyList())); + new JobShardingResult(new JobShardingUnit("host0@-@0"), Collections.singletonList(0)), + new JobShardingResult(new JobShardingUnit("host1@-@0"), Collections.singletonList(1)), + new JobShardingResult(new JobShardingUnit("host2@-@0"), Collections.emptyList())); assertThat(rotateServerByNameJobShardingStrategy.sharding(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"), new JobShardingUnit("host2", "127.0.0.1@-@0")), + new JobShardingUnit("host0@-@0"), new JobShardingUnit("host1@-@0"), new JobShardingUnit("host2@-@0")), new JobShardingMetadata("3", 2)), is(expected)); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/integrate/AbstractBaseStdJobTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/integrate/AbstractBaseStdJobTest.java index 9c0b11270e..0ae0a0aa69 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/integrate/AbstractBaseStdJobTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/integrate/AbstractBaseStdJobTest.java @@ -53,7 +53,6 @@ import org.unitils.util.ReflectionUtils; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -166,7 +165,6 @@ void assertRegCenterCommonInfoWithEnabled() { protected void assertRegCenterCommonInfoWithDisabled() { assertRegCenterCommonInfo(); - assertFalse(leaderElectionService.isLeader()); } private void assertRegCenterCommonInfo() { @@ -179,12 +177,12 @@ private void assertRegCenterCommonInfo() { while (null != regCenter.get("/" + jobName + "/leader/election/host_instance")) { BlockUtils.waitingShortTime(); } + regCenter.persist("/" + jobName + "/servers/" + localHostService.getIp(), ""); } else { assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp()), is("")); assertThat(regCenter.get("/" + jobName + "/leader/election/host_instance"), is(localHostService.getIp() + "_" + JobRegistry.getInstance().getJobInstanceId(jobName))); } - assertThat(regCenter.get("/" + jobName + "/servers/" + localHostService.getIp() + "/instances/" + JobRegistry.getInstance().getJobInstanceId(jobName)), - CoreMatchers.is(InstanceStatus.READY.name())); + assertThat(regCenter.get("/" + jobName + "/instances/" + JobRegistry.getInstance().getJobInstanceId(jobName)), CoreMatchers.is(InstanceStatus.READY.name())); regCenter.remove("/" + jobName + "/leader/election"); } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/AllInternalTests.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/AllInternalTests.java index 8be6bf1965..dd3e5b8d53 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/AllInternalTests.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/AllInternalTests.java @@ -45,8 +45,8 @@ import com.dangdang.ddframe.job.lite.internal.schedule.LiteJobFacadeTest; import com.dangdang.ddframe.job.lite.internal.schedule.SchedulerFacadeTest; import com.dangdang.ddframe.job.lite.internal.server.JobOperationListenerManagerTest; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNodeTest; import com.dangdang.ddframe.job.lite.internal.server.ServerNodeTest; -import com.dangdang.ddframe.job.lite.internal.server.ServerOperationNodeTest; import com.dangdang.ddframe.job.lite.internal.server.ServerServiceTest; import com.dangdang.ddframe.job.lite.internal.sharding.ShardingListenerManagerTest; import com.dangdang.ddframe.job.lite.internal.sharding.ShardingServiceTest; @@ -70,10 +70,10 @@ ElectionNodeTest.class, ElectionListenerManagerTest.class, ServerServiceTest.class, - ServerNodeTest.class, + InstanceNodeTest.class, JobOperationListenerManagerTest.class, ShardingServiceTest.class, - ServerOperationNodeTest.class, + ServerNodeTest.class, ShardingListenerManagerTest.class, ExecutionContextServiceTest.class, ExecutionServiceTest.class, diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManagerTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManagerTest.java index f3984faace..bee93ff7ce 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManagerTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/election/ElectionListenerManagerTest.java @@ -19,8 +19,8 @@ import com.dangdang.ddframe.job.lite.internal.election.ElectionListenerManager.LeaderElectionJobListener; import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNode; import com.dangdang.ddframe.job.lite.internal.server.ServerNode; -import com.dangdang.ddframe.job.lite.internal.server.ServerOperationNode; import com.dangdang.ddframe.job.lite.internal.server.ServerService; import com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage; import org.apache.curator.framework.recipes.cache.ChildData; @@ -44,10 +44,10 @@ public final class ElectionListenerManagerTest { private JobNodeStorage jobNodeStorage; @Mock - private ServerNode serverNode; + private InstanceNode instanceNode; @Mock - private ServerOperationNode serverOperationNode; + private ServerNode serverNode; @Mock private LeaderElectionService leaderElectionService; @@ -62,8 +62,8 @@ public void setUp() throws NoSuchFieldException { MockitoAnnotations.initMocks(this); JobRegistry.getInstance().addJobInstanceId("test_job", "127.0.0.1@-@0"); ReflectionUtils.setFieldValue(electionListenerManager, electionListenerManager.getClass().getSuperclass().getDeclaredField("jobNodeStorage"), jobNodeStorage); + ReflectionUtils.setFieldValue(electionListenerManager, "instanceNode", instanceNode); ReflectionUtils.setFieldValue(electionListenerManager, "serverNode", serverNode); - ReflectionUtils.setFieldValue(electionListenerManager, "serverOperationNode", serverOperationNode); ReflectionUtils.setFieldValue(electionListenerManager, "leaderElectionService", leaderElectionService); ReflectionUtils.setFieldValue(electionListenerManager, "serverService", serverService); } @@ -130,7 +130,7 @@ electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new Tr @Test public void assertLeaderElectionJobListenerWhenJobShutdownAndIsLeader() { when(leaderElectionService.isLeader()).thenReturn(true); - when(serverNode.isLocalInstancePath("/test_job/server/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); + when(instanceNode.isLocalInstancePath("/test_job/server/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/test_job/server/mockedIP/instances/127.0.0.1@-@0", null, "READY".getBytes())), "/test_job/server/mockedIP/instances/127.0.0.1@-@0"); diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNodeTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNodeTest.java new file mode 100644 index 0000000000..2ec2c39c50 --- /dev/null +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/InstanceNodeTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 1999-2015 dangdang.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package com.dangdang.ddframe.job.lite.internal.server; + +import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public final class InstanceNodeTest { + + private static InstanceNode instanceNode; + + @BeforeClass + public static void setUp() throws NoSuchFieldException { + JobRegistry.getInstance().addJobInstanceId("test_job", "127.0.0.1@-@0"); + instanceNode = new InstanceNode("test_job"); + } + + @Test + public void assertGetLocalInstanceNode() { + assertThat(instanceNode.getLocalInstanceNode(), is("instances/127.0.0.1@-@0")); + } + + @Test + public void assertGetInstanceNode() { + assertThat(InstanceNode.getInstanceNode("host0@-@0"), is("instances/host0@-@0")); + } + + @Test + public void assertIsLocalInstancePath() { + assertTrue(instanceNode.isLocalInstancePath("/test_job/instances/127.0.0.1@-@0")); + } + + @Test + public void assertIsNotLocalInstancePath() { + assertFalse(instanceNode.isLocalInstancePath("/test_job/instances/other_job_instance_id")); + } + + @Test + public void assertIsInstancePath() { + assertTrue(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@0")); + } + + @Test + public void assertIsNotInstancePath() { + assertFalse(instanceNode.isInstancePath("/test_job/other/127.0.0.1@-@0")); + } +} diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManagerTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManagerTest.java index 1f35e6b0e9..21387c85f3 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManagerTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/JobOperationListenerManagerTest.java @@ -91,9 +91,8 @@ public void assertConnectionLostListenerWhenConnectionStateIsLost() { public void assertConnectionLostListenerWhenConnectionStateIsReconnectedAndIsNotPausedManually() { JobRegistry.getInstance().addJobScheduleController("test_job", jobScheduleController); when(shardingService.getLocalHostShardingItems()).thenReturn(Arrays.asList(0, 1)); - when(serverService.isLocalhostServerEnabled()).thenReturn(true); + when(serverService.isServerEnabled(new LocalHostService().getIp())).thenReturn(true); jobOperationListenerManager.new ConnectionLostListener().stateChanged(null, ConnectionState.RECONNECTED); - verify(serverService).isLocalhostServerEnabled(); verify(serverService).persistServerOnline(true); verify(executionService).clearRunningInfo(Arrays.asList(0, 1)); verify(jobScheduleController).resumeJob(); @@ -176,9 +175,8 @@ TreeCacheEvent.Type.NODE_ADDED, new ChildData("/test_job/servers/" + ip + "/inst @Test public void assertJobShutdownStatusJobListenerWhenIsJobShutdownPathAndAdd() { JobRegistry.getInstance().addJobScheduleController("test_job", jobScheduleController); - jobOperationListenerManager.new JobShutdownStatusJobListener().dataChanged(null, new TreeCacheEvent( - TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/test_job/servers/" + ip + "/instances/127.0.0.1@-@0", null, "".getBytes())), - "/test_job/servers/" + ip + "/instances/127.0.0.1@-@0"); + jobOperationListenerManager.new JobShutdownStatusJobListener().dataChanged( + null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/test_job/instances/127.0.0.1@-@0", null, "".getBytes())), "/test_job/instances/127.0.0.1@-@0"); verify(jobScheduleController).shutdown(); verify(serverService).removeInstanceStatus(); } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerNodeTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerNodeTest.java index b933644afd..fa24370b3d 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerNodeTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerNodeTest.java @@ -17,55 +17,26 @@ package com.dangdang.ddframe.job.lite.internal.server; -import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; -import org.junit.BeforeClass; +import com.dangdang.ddframe.job.util.env.LocalHostService; import org.junit.Test; -import org.unitils.util.ReflectionUtils; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public final class ServerNodeTest { - private static ServerNode serverNode; + private LocalHostService localHostService = new LocalHostService(); - @BeforeClass - public static void setUp() throws NoSuchFieldException { - JobRegistry.getInstance().addJobInstanceId("test_job", "127.0.0.1@-@0"); - serverNode = new ServerNode("test_job"); - ReflectionUtils.setFieldValue(serverNode, "ip", "host0"); - } - - @Test - public void assertGetLocalInstanceNode() { - assertThat(serverNode.getLocalInstanceNode(), is("servers/host0/instances/127.0.0.1@-@0")); - } - - @Test - public void assertGetInstanceNode() { - assertThat(ServerNode.getInstanceNode("host0", "127.0.0.1@-@0"), is("servers/host0/instances/127.0.0.1@-@0")); - } - - @Test - public void assertIsLocalInstancePath() { - assertTrue(serverNode.isLocalInstancePath("/test_job/servers/host0/instances/127.0.0.1@-@0")); - } - - @Test - public void assertIsNotLocalInstancePath() { - assertFalse(serverNode.isLocalInstancePath("/test_job/servers/host1/instances/other_job_instance_id")); - } + private ServerNode serverNode = new ServerNode("test_job"); @Test - public void assertIsInstancePath() { - assertTrue(serverNode.isInstancePath("/test_job/servers/host0/instances/127.0.0.1@-@0")); + public void assertGetServerNode() { + assertThat(serverNode.getServerNode("host0"), is("servers/host0")); } @Test - public void assertIsNotInstancePath() { - assertFalse(serverNode.isInstancePath("/test_job/other/host0/instances/127.0.0.1@-@0")); - assertFalse(serverNode.isInstancePath("/test_job/servers/host0/other/127.0.0.1@-@0")); + public void assertIsLocalServerPath() { + assertTrue(serverNode.isLocalServerPath("/test_job/servers/" + localHostService.getIp())); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNodeTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNodeTest.java deleted file mode 100644 index 9fc5bc7528..0000000000 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerOperationNodeTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 1999-2015 dangdang.com. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *

- */ - -package com.dangdang.ddframe.job.lite.internal.server; - -import com.dangdang.ddframe.job.util.env.LocalHostService; -import org.junit.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -public final class ServerOperationNodeTest { - - private LocalHostService localHostService = new LocalHostService(); - - private ServerOperationNode serverOperationNode = new ServerOperationNode("test_job"); - - @Test - public void assertGetServerNode() { - assertThat(serverOperationNode.getServerNode("host0"), is("servers/host0")); - } - - @Test - public void assertIsLocalServerPath() { - assertTrue(serverOperationNode.isLocalServerPath("/test_job/servers/" + localHostService.getIp())); - } -} diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerServiceTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerServiceTest.java index 8a97720488..9af3b5bc7e 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerServiceTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/server/ServerServiceTest.java @@ -50,16 +50,14 @@ public final class ServerServiceTest { @Before public void setUp() throws NoSuchFieldException { MockitoAnnotations.initMocks(this); - JobRegistry.getInstance().addJobInstanceId("test_job", "127.0.0.1@-@0"); + JobRegistry.getInstance().addJobInstanceId("test_job", "mockedIP@-@0"); + InstanceNode instanceNode = new InstanceNode("test_job"); ServerNode serverNode = new ServerNode("test_job"); - ServerOperationNode serverOperationNode = new ServerOperationNode("test_job"); ReflectionUtils.setFieldValue(serverNode, "ip", "mockedIP"); - ReflectionUtils.setFieldValue(serverOperationNode, "ip", "mockedIP"); + ReflectionUtils.setFieldValue(serverService, "instanceNode", instanceNode); ReflectionUtils.setFieldValue(serverService, "serverNode", serverNode); - ReflectionUtils.setFieldValue(serverService, "serverOperationNode", serverOperationNode); ReflectionUtils.setFieldValue(serverService, "jobNodeStorage", jobNodeStorage); ReflectionUtils.setFieldValue(serverService, "localHostService", localHostService); - ReflectionUtils.setFieldValue(serverService, "serverOperationNode", serverOperationNode); when(localHostService.getIp()).thenReturn("mockedIP"); } @@ -67,149 +65,96 @@ public void setUp() throws NoSuchFieldException { public void assertPersistServerOnlineForDisabledServerWithLeaderElecting() { serverService.persistServerOnline(false); verify(jobNodeStorage).fillJobNode("servers/mockedIP", ServerStatus.DISABLED.name()); - verify(jobNodeStorage).fillEphemeralJobNode("servers/mockedIP/instances/127.0.0.1@-@0", InstanceStatus.READY.name()); + verify(jobNodeStorage).fillEphemeralJobNode("instances/mockedIP@-@0", InstanceStatus.READY.name()); } @Test public void assertPersistServerOnlineForEnabledServer() { serverService.persistServerOnline(true); verify(jobNodeStorage).fillJobNode("servers/mockedIP", ""); - verify(jobNodeStorage).fillEphemeralJobNode("servers/mockedIP/instances/127.0.0.1@-@0", InstanceStatus.READY.name()); + verify(jobNodeStorage).fillEphemeralJobNode("instances/mockedIP@-@0", InstanceStatus.READY.name()); } @Test public void assertProcessServerShutdown() { serverService.removeInstanceStatus(); - verify(jobNodeStorage).removeJobNodeIfExisted("servers/mockedIP/instances/127.0.0.1@-@0"); + verify(jobNodeStorage).removeJobNodeIfExisted("instances/mockedIP@-@0"); } @Test public void assertUpdateServerStatus() { serverService.updateInstanceStatus(InstanceStatus.RUNNING); - verify(jobNodeStorage).updateJobNode("servers/mockedIP/instances/127.0.0.1@-@0", InstanceStatus.RUNNING.name()); + verify(jobNodeStorage).updateJobNode("instances/mockedIP@-@0", InstanceStatus.RUNNING.name()); } @Test public void assertRemoveServerStatus() { serverService.removeInstanceStatus(); - verify(jobNodeStorage).removeJobNodeIfExisted("servers/mockedIP/instances/127.0.0.1@-@0"); - } - - @Test - public void assertGetAllShardingUnits() { - when(jobNodeStorage.getJobNodeChildrenKeys("servers")).thenReturn(Arrays.asList("host0", "host2", "host1", "host3")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host0/instances")).thenReturn(Arrays.asList("127.0.0.1@-@1", "127.0.0.1@-@2")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host1/instances")).thenReturn(Arrays.asList("127.0.0.1@-@1", "127.0.0.1@-@2")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host2/instances")).thenReturn(Arrays.asList("127.0.0.1@-@1", "127.0.0.1@-@2")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host3/instances")).thenReturn(Arrays.asList("127.0.0.1@-@1", "127.0.0.1@-@2")); - assertThat(serverService.getAllShardingUnits(), is(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@1"), new JobShardingUnit("host0", "127.0.0.1@-@2"), - new JobShardingUnit("host1", "127.0.0.1@-@1"), new JobShardingUnit("host1", "127.0.0.1@-@2"), - new JobShardingUnit("host2", "127.0.0.1@-@1"), new JobShardingUnit("host2", "127.0.0.1@-@2"), - new JobShardingUnit("host3", "127.0.0.1@-@1"), new JobShardingUnit("host3", "127.0.0.1@-@2")))); - verify(jobNodeStorage).getJobNodeChildrenKeys("servers"); - verify(jobNodeStorage).getJobNodeChildrenKeys("servers/host0/instances"); - verify(jobNodeStorage).getJobNodeChildrenKeys("servers/host1/instances"); - verify(jobNodeStorage).getJobNodeChildrenKeys("servers/host2/instances"); - verify(jobNodeStorage).getJobNodeChildrenKeys("servers/host3/instances"); + verify(jobNodeStorage).removeJobNodeIfExisted("instances/mockedIP@-@0"); } @Test public void assertGetAvailableShardingServers() { when(jobNodeStorage.getJobNodeChildrenKeys("servers")).thenReturn(Arrays.asList("host0", "host2", "host1", "host3", "host4")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host0/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host1/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host2/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host3/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host4/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/host0/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host0/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host0/operation/disabled")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/host1/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host1/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - when(jobNodeStorage.getJobNodeData("servers/host1")).thenReturn(ServerStatus.DISABLED.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host2/instances/127.0.0.1@-@0")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/host3/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host3/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host3/operation/disabled")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/host4/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host4/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - assertThat(serverService.getAvailableShardingUnits(), is(Arrays.asList( - new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host3", "127.0.0.1@-@0"), new JobShardingUnit("host4", "127.0.0.1@-@0")))); + when(jobNodeStorage.getJobNodeChildrenKeys("instances")).thenReturn(Arrays.asList("host0@-@0", "host2@-@0", "host3@-@0", "host4@-@0")); + when(jobNodeStorage.getJobNodeData("servers/host2")).thenReturn(ServerStatus.DISABLED.name()); + assertThat(serverService.getAvailableShardingUnits(), is(Arrays.asList(new JobShardingUnit("host0@-@0"), new JobShardingUnit("host3@-@0"), new JobShardingUnit("host4@-@0")))); } @Test public void assertGetAvailableServers() { when(jobNodeStorage.getJobNodeChildrenKeys("servers")).thenReturn(Arrays.asList("host0", "host2", "host1", "host3")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host0/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host1/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host2/instances")).thenReturn(Collections.emptyList()); - when(jobNodeStorage.getJobNodeChildrenKeys("servers/host3/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/host0/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host0/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host0/operation/disabled")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/host1/instances/127.0.0.1@-@0")).thenReturn(false); - when(jobNodeStorage.getJobNodeData("servers/host1")).thenReturn(ServerStatus.DISABLED.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host2/instances/127.0.0.1@-@0")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/host3/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/host3/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); - when(jobNodeStorage.isJobNodeExisted("servers/host3/operation/disabled")).thenReturn(false); + when(jobNodeStorage.getJobNodeChildrenKeys("instances")).thenReturn(Arrays.asList("host0@-@0", "host2@-@0", "host3@-@0")); + when(jobNodeStorage.getJobNodeData("servers/host2")).thenReturn(ServerStatus.DISABLED.name()); assertThat(serverService.getAvailableServers(), is(Arrays.asList("host0", "host3"))); } @Test public void assertIsLocalhostServerReadyWhenServerCrashed() { when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/operation/disabled")).thenReturn(false); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(false); + when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(false); assertFalse(serverService.isLocalhostServerReady()); } @Test public void assertIsLocalhostServerReadyWhenServerDisabled() { - when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); + when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("mockedIP@-@0")); + when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(true); when(jobNodeStorage.getJobNodeData("servers/mockedIP")).thenReturn(ServerStatus.DISABLED.name()); assertFalse(serverService.isLocalhostServerReady()); } @Test public void assertIsLocalhostServerReadyWhenServerShutdown() { - when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); + when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("mockedIP@-@0")); + when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(true); + when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(InstanceStatus.RUNNING.name()); when(jobNodeStorage.getJobNodeData("servers/mockedIP")).thenReturn(""); assertFalse(serverService.isLocalhostServerReady()); } @Test public void assertIsLocalhostServerReadyWhenServerRunning() { - when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.RUNNING.name()); + when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("mockedIP@-@0")); + when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(true); + when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/instances/mockedIP@-@0")).thenReturn(InstanceStatus.RUNNING.name()); when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/operation/disabled")).thenReturn(false); - when(jobNodeStorage.getJobNodeData("servers/mockedIP/127.0.0.1@-@0/status")).thenReturn(InstanceStatus.RUNNING.name()); + when(jobNodeStorage.getJobNodeData("servers/mockedIP/mockedIP@-@0/status")).thenReturn(InstanceStatus.RUNNING.name()); assertFalse(serverService.isLocalhostServerReady()); } @Test public void assertIsLocalhostServerReadyWhenServerReady() { - when(jobNodeStorage.getJobNodeChildrenKeys("servers/mockedIP/instances")).thenReturn(Collections.singletonList("127.0.0.1@-@0")); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(true); - when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/instances/127.0.0.1@-@0")).thenReturn(InstanceStatus.READY.name()); - when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/operation/disabled")).thenReturn(false); + when(jobNodeStorage.getJobNodeChildrenKeys("instances")).thenReturn(Collections.singletonList("mockedIP@-@0")); + when(jobNodeStorage.isJobNodeExisted("instances/mockedIP@-@0")).thenReturn(true); + when(jobNodeStorage.getJobNodeDataDirectly("instances/mockedIP@-@0")).thenReturn(InstanceStatus.READY.name()); + when(jobNodeStorage.getJobNodeData("servers/mockedIP")).thenReturn(""); assertTrue(serverService.isLocalhostServerReady()); } @Test - public void assertIsLocalhostServerEnabled() { + public void assertIsServerEnabled() { when(jobNodeStorage.getJobNodeData("servers/mockedIP")).thenReturn(""); - assertTrue(serverService.isLocalhostServerEnabled()); - } - - @Test - public void assertIsOffline() { - when(jobNodeStorage.isJobNodeExisted(ServerNode.getInstanceNode("ip1", "127.0.0.1@-@0"))).thenReturn(true); - assertFalse(serverService.isOffline("ip1", "127.0.0.1@-@0")); - verify(jobNodeStorage).isJobNodeExisted(ServerNode.getInstanceNode("ip1", "127.0.0.1@-@0")); + assertTrue(serverService.isServerEnabled("mockedIP")); } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManagerTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManagerTest.java index e17dd4fe2f..5f1d7762fd 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManagerTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingListenerManagerTest.java @@ -114,16 +114,23 @@ shardingListenerManager.new ListenServersChangedJobListener().dataChanged(null, } @Test - public void assertListenServersChangedJobListenerWhenIsServerStatusPathAndAdd() { + public void assertListenServersChangedJobListenerWhenIsInstanceChange() { shardingListenerManager.new ListenServersChangedJobListener().dataChanged(null, new TreeCacheEvent( - TreeCacheEvent.Type.NODE_ADDED, new ChildData("/test_job/servers/" + ip + "/instances", null, "".getBytes())), "/test_job/servers/" + ip + "/instances"); + TreeCacheEvent.Type.NODE_ADDED, new ChildData("/test_job/instances/xxx", null, "".getBytes())), "/test_job/instances/xxx"); verify(shardingService).setReshardingFlag(); } @Test - public void assertListenServersChangedJobListenerWhenIsServerStatusPathButUpdateAndIsServerDisabledPath() { + public void assertListenServersChangedJobListenerWhenIsServerChange() { shardingListenerManager.new ListenServersChangedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/test_job/servers/" + ip, null, "".getBytes())), "/test_job/servers/" + ip); verify(shardingService).setReshardingFlag(); } + + @Test + public void assertListenServersChangedJobListenerWhenIsShardingChange() { + shardingListenerManager.new ListenServersChangedJobListener().dataChanged(null, new TreeCacheEvent( + TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/test_job/execution/0/instance", null, "".getBytes())), "/test_job/execution/0/instance"); + verify(shardingService).setReshardingFlag(); + } } diff --git a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingServiceTest.java b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingServiceTest.java index ac6522e107..596651e219 100644 --- a/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingServiceTest.java +++ b/elastic-job-lite/elastic-job-lite-core/src/test/java/com/dangdang/ddframe/job/lite/internal/sharding/ShardingServiceTest.java @@ -26,7 +26,6 @@ import com.dangdang.ddframe.job.lite.fixture.TestSimpleJob; import com.dangdang.ddframe.job.lite.internal.config.ConfigurationService; import com.dangdang.ddframe.job.lite.internal.election.LeaderElectionService; -import com.dangdang.ddframe.job.lite.internal.execution.ExecutionNode; import com.dangdang.ddframe.job.lite.internal.execution.ExecutionService; import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry; import com.dangdang.ddframe.job.lite.internal.server.ServerService; @@ -50,7 +49,6 @@ import java.util.LinkedList; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; @@ -112,7 +110,7 @@ public void assertIsNeedSharding() { public void assertShardingWhenUnnecessary() { when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); - when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("mockedIP", "127.0.0.1@-@0"))); + when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("mockedIP@-@0"))); when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(false); shardingService.shardingIfNecessary(); verify(serverService).getAvailableShardingUnits(); @@ -123,7 +121,6 @@ public void assertShardingWhenUnnecessary() { public void assertShardingWithoutAvailableServers() { when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); - when(serverService.getAllShardingUnits()).thenReturn(Arrays.asList(new JobShardingUnit("ip1", "127.0.0.1@-@0"), new JobShardingUnit("ip2", "127.0.0.1@-@0"))); when(serverService.getAvailableShardingUnits()).thenReturn(Collections.emptyList()); shardingService.shardingIfNecessary(); verify(serverService).getAvailableShardingUnits(); @@ -140,7 +137,7 @@ public void assertShardingWithoutAvailableServers() { public void assertShardingWhenIsNotLeaderAndIsShardingProcessing() { when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); - when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("mockedIP", "127.0.0.1@-@0"))); + when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("mockedIP@-@0"))); when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true, true, false, false); when(leaderElectionService.isLeader()).thenReturn(false); when(jobNodeStorage.isJobNodeExisted("leader/sharding/processing")).thenReturn(true, false); @@ -152,8 +149,7 @@ public void assertShardingWhenIsNotLeaderAndIsShardingProcessing() { @Test public void assertShardingNecessaryWhenMonitorExecutionEnabled() { - when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("ip1", "127.0.0.1@-@0"))); - when(serverService.getAllShardingUnits()).thenReturn(Arrays.asList(new JobShardingUnit("ip1", "127.0.0.1@-@0"), new JobShardingUnit("ip2", "127.0.0.1@-@0"))); + when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("ip1@-@0"))); when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true); when(leaderElectionService.isLeader()).thenReturn(true); when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), @@ -177,8 +173,7 @@ public void assertShardingNecessaryWhenMonitorExecutionEnabled() { @Test public void assertShardingNecessaryWhenMonitorExecutionDisabled() throws Exception { - when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("ip1", "127.0.0.1@-@0"))); - when(serverService.getAllShardingUnits()).thenReturn(Arrays.asList(new JobShardingUnit("ip1", "127.0.0.1@-@0"), new JobShardingUnit("ip2", "127.0.0.1@-@0"))); + when(serverService.getAvailableShardingUnits()).thenReturn(Collections.singletonList(new JobShardingUnit("ip1@-@0"))); when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true); when(leaderElectionService.isLeader()).thenReturn(true); when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), @@ -236,11 +231,11 @@ public void assertPersistShardingInfoTransactionExecutionCallback() throws Excep when(configService.load(true)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); when(transactionCreateBuilder.forPath("/test_job/execution/0/ip", "host0".getBytes())).thenReturn(curatorTransactionBridge); - when(transactionCreateBuilder.forPath("/test_job/execution/0/instance", "127.0.0.1@-@0".getBytes())).thenReturn(curatorTransactionBridge); + when(transactionCreateBuilder.forPath("/test_job/execution/0/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge); when(transactionCreateBuilder.forPath("/test_job/execution/1/ip", "host0".getBytes())).thenReturn(curatorTransactionBridge); - when(transactionCreateBuilder.forPath("/test_job/execution/1/instance", "127.0.0.1@-@0".getBytes())).thenReturn(curatorTransactionBridge); + when(transactionCreateBuilder.forPath("/test_job/execution/1/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge); when(transactionCreateBuilder.forPath("/test_job/execution/2/ip", "host0".getBytes())).thenReturn(curatorTransactionBridge); - when(transactionCreateBuilder.forPath("/test_job/execution/2/instance", "127.0.0.1@-@0".getBytes())).thenReturn(curatorTransactionBridge); + when(transactionCreateBuilder.forPath("/test_job/execution/2/instance", "host0@-@0".getBytes())).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder); when(transactionDeleteBuilder.forPath("/test_job/leader/sharding/necessary")).thenReturn(curatorTransactionBridge); @@ -249,7 +244,7 @@ public void assertPersistShardingInfoTransactionExecutionCallback() throws Excep when(transactionDeleteBuilder.forPath("/test_job/leader/sharding/processing")).thenReturn(curatorTransactionBridge); when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal); Collection shardingItems = new LinkedList<>(); - shardingItems.add(new JobShardingResult(new JobShardingUnit("host0", "127.0.0.1@-@0"), Arrays.asList(0, 1, 2))); + shardingItems.add(new JobShardingResult(new JobShardingUnit("host0@-@0"), Arrays.asList(0, 1, 2))); ShardingService.PersistShardingInfoTransactionExecutionCallback actual = shardingService.new PersistShardingInfoTransactionExecutionCallback(shardingItems); actual.execute(curatorTransactionFinal); verify(curatorTransactionFinal, times(6)).create(); @@ -258,27 +253,4 @@ public void assertPersistShardingInfoTransactionExecutionCallback() throws Excep verify(transactionDeleteBuilder).forPath("/test_job/leader/sharding/processing"); verify(curatorTransactionBridge, times(8)).and(); } - - @Test - public void assertHasShardingInfoInOfflineServers() throws NoSuchFieldException { - when(configService.load(true)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), - TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); - when(serverService.getAllShardingUnits()).thenReturn(Arrays.asList(new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"))); - when(jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getIpNode(0))).thenReturn("host0"); - when(jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getInstanceNode(0))).thenReturn("127.0.0.1@-@0"); - when(jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getIpNode(1))).thenReturn("host1"); - when(jobNodeStorage.getJobNodeDataDirectly(ExecutionNode.getInstanceNode(1))).thenReturn("127.0.0.1@-@0"); - when(serverService.isOffline("host0", "127.0.0.1@-@0")).thenReturn(false); - when(serverService.isOffline("host1", "127.0.0.1@-@0")).thenReturn(true); - assertTrue(shardingService.hasShardingInfoInOfflineServers()); - } - - @Test - public void assertHasNotShardingInfoInOfflineServers() throws NoSuchFieldException { - when(configService.load(true)).thenReturn(LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), - TestSimpleJob.class.getCanonicalName())).monitorExecution(true).jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName()).build()); - when(serverService.getAllShardingUnits()).thenReturn(Arrays.asList(new JobShardingUnit("host0", "127.0.0.1@-@0"), new JobShardingUnit("host1", "127.0.0.1@-@0"))); - when(serverService.isOffline("host0", "127.0.0.1@-@0")).thenReturn(false); - assertFalse(shardingService.hasShardingInfoInOfflineServers()); - } } diff --git a/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/operate/JobOperateTemplate.java b/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/operate/JobOperateTemplate.java index 5b55635c9a..23072c17f5 100644 --- a/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/operate/JobOperateTemplate.java +++ b/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/operate/JobOperateTemplate.java @@ -17,7 +17,7 @@ package com.dangdang.ddframe.job.lite.lifecycle.internal.operate; -import com.dangdang.ddframe.job.lite.internal.server.ServerNode; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNode; import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.google.common.base.Optional; @@ -62,7 +62,7 @@ public Collection operate(final Optional jobName, final Optional List serverIpList = regCenter.getChildrenKeys(jobNodePath.getServerNodePath()); result = new ArrayList<>(serverIpList.size()); for (String ip : serverIpList) { - for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath(ip) + "/" + ServerNode.INSTANCES_ROOT)) { + for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath(ip) + "/" + InstanceNode.ROOT)) { boolean isSuccess = callback.doOperate(jobName.get(), ip, each); if (!isSuccess) { result.add(ip + "-" + each); diff --git a/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java b/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java index 4e78e492a3..a9f8704c67 100644 --- a/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java +++ b/elastic-job-lite/elastic-job-lite-lifecycle/src/main/java/com/dangdang/ddframe/job/lite/lifecycle/internal/statistics/JobStatisticsAPIImpl.java @@ -19,7 +19,7 @@ import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory; -import com.dangdang.ddframe.job.lite.internal.server.ServerNode; +import com.dangdang.ddframe.job.lite.internal.server.InstanceNode; import com.dangdang.ddframe.job.lite.internal.storage.JobNodePath; import com.dangdang.ddframe.job.lite.lifecycle.api.JobStatisticsAPI; import com.dangdang.ddframe.job.lite.lifecycle.domain.ExecutionInfo; @@ -82,7 +82,7 @@ private JobBriefInfo.JobStatus getJobStatus(final String jobName) { int crashedCount = 0; int disabledCount = 0; for (String serverIp : serverIps) { - List jobInstances = regCenter.getChildrenKeys(jobNodePath.getServerNodePath(serverIp) + "/" +ServerNode.INSTANCES_ROOT); + List jobInstances = regCenter.getChildrenKeys(jobNodePath.getServerNodePath(serverIp) + "/" + InstanceNode.ROOT); for (String each : jobInstances) { serverInstanceSize++; switch (getServerStatus(jobName, serverIp, each)) { @@ -111,7 +111,7 @@ public Collection getServers(final String jobName) { List serverIps = regCenter.getChildrenKeys(jobNodePath.getServerNodePath()); Collection result = new ArrayList<>(serverIps.size()); for (String serverIp : serverIps) { - List jobInstances = regCenter.getChildrenKeys(jobNodePath.getServerNodePath(serverIp) + "/" + ServerNode.INSTANCES_ROOT); + List jobInstances = regCenter.getChildrenKeys(jobNodePath.getServerNodePath(serverIp) + "/" + InstanceNode.ROOT); for (String each : jobInstances) { result.add(getJobServer(jobName, serverIp, each)); }