From f52ea6d0283461c7e53c8f1fd9006fbe0f46b58d Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Fri, 25 Aug 2017 17:38:28 +0800 Subject: [PATCH 1/8] cluster can acquire more backends in same host --- .../palo/alter/DecommissionBackendJob.java | 11 +- fe/src/com/baidu/palo/catalog/Catalog.java | 2 +- fe/src/com/baidu/palo/clone/CloneChecker.java | 234 ++++++++---- .../baidu/palo/system/SystemInfoService.java | 344 ++++++++++++------ 4 files changed, 410 insertions(+), 181 deletions(-) diff --git a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java index 139ab7ce5d5bbd..923720dd865235 100644 --- a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java +++ b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java @@ -263,6 +263,15 @@ public synchronized boolean sendTasks() { continue; } + + // exclude backend in same hosts with the other replica + Set hosts = Sets.newHashSet(); + for (Replica replica : tablet.getReplicas()) { + if (replica.getBackendId() != backendId) { + hosts.add(clusterInfo.getBackend(replica.getBackendId()).getHost()); + } + } + // choose dest backend long destBackendId = -1L; int num = 0; @@ -273,7 +282,7 @@ public synchronized boolean sendTasks() { return true; } - if (tablet.getReplicaByBackendId(destBackendIds.get(0)) != null) { + if (hosts.contains(clusterInfo.getBackend(destBackendIds.get(0)).getHost())) { // replica can not in same backend continue; } diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index eb468fb83435ea..39633f12018762 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -3406,7 +3406,7 @@ private void createTablets(String clusterName, MaterializedIndex index, ReplicaS List chosenBackendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(replicationNum, true, true, clusterName); if (chosenBackendIds == null) { - throw new DdlException("Failed to find enough alive backends. need: " + replicationNum); + throw new DdlException("Failed to find enough host in all backends. need: " + replicationNum); } Preconditions.checkState(chosenBackendIds.size() == replicationNum); for (long backendId : chosenBackendIds) { diff --git a/fe/src/com/baidu/palo/clone/CloneChecker.java b/fe/src/com/baidu/palo/clone/CloneChecker.java index 716539925b09cc..debf51c961ee78 100644 --- a/fe/src/com/baidu/palo/clone/CloneChecker.java +++ b/fe/src/com/baidu/palo/clone/CloneChecker.java @@ -63,7 +63,9 @@ import com.google.common.base.Strings; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; /** * CloneChecker for replica supplement, migration and deletion @@ -115,7 +117,7 @@ public boolean checkTabletForSupplement(long dbId, long tableId, long partitionI LOG.warn("init backend infos error"); return false; } - Map> capacityLevelToBackendIds = initBackendCapacityInfos(backendInfos); + Map>> capacityLevelToBackendIds = initBackendCapacityInfos(backendInfos); if (capacityLevelToBackendIds == null || capacityLevelToBackendIds.isEmpty()) { LOG.warn("init backend capacity infos error"); return false; @@ -188,7 +190,7 @@ public boolean checkTabletForSupplement(long dbId, long tableId, long partitionI } finally { db.readUnlock(); } - Map> distributionLevelToBackendIds = initBackendDistributionInfos(backendInfos); + Map>> distributionLevelToBackendIds = initBackendDistributionInfos(backendInfos); if (distributionLevelToBackendIds == null || distributionLevelToBackendIds.isEmpty()) { LOG.warn("init backend distribution infos error"); return false; @@ -257,9 +259,10 @@ private void checkTablets() { LOG.warn("init backend infos error"); continue; } - final Map> clusterCapacityLevelToBackendIds = initBackendCapacityInfos( - clusterBackendInfos); - if (clusterCapacityLevelToBackendIds == null || clusterCapacityLevelToBackendIds.isEmpty()) { + + final Map>> cluserCapacityLevelToBackendIds + = initBackendCapacityInfos(clusterBackendInfos); + if (cluserCapacityLevelToBackendIds == null || cluserCapacityLevelToBackendIds.isEmpty()) { LOG.warn("init backend capacity infos error"); continue; } @@ -391,22 +394,22 @@ private void checkTablets() { } // tablet distribution level - final Map> clusterDistributionLevelToBackendIds = initBackendDistributionInfos( - clusterBackendInfos); + final Map>> clusterDistributionLevelToBackendIds + = initBackendDistributionInfos(clusterBackendInfos); if (clusterDistributionLevelToBackendIds != null && !clusterDistributionLevelToBackendIds.isEmpty()) { // supplement checkSupplement(cloneTabletMap, clusterDistributionLevelToBackendIds, - clusterCapacityLevelToBackendIds, clusterBackendInfos); + cluserCapacityLevelToBackendIds, clusterBackendInfos); // migration checkMigration(backendToTablets, clusterDistributionLevelToBackendIds, - clusterCapacityLevelToBackendIds, clusterBackendInfos); + cluserCapacityLevelToBackendIds, clusterBackendInfos); } else { LOG.warn("init backend distribution infos error"); } // tablet distribution level - final Map> distributionLevelToBackendIds = initBackendDistributionInfos( + final Map>> distributionLevelToBackendIds = initBackendDistributionInfos( clusterBackendInfos); if (distributionLevelToBackendIds != null && !distributionLevelToBackendIds.isEmpty()) { // delete redundant replicas @@ -447,16 +450,40 @@ private Map initBackendInfos(String cluster) { continue; } backendInfos.put(backendId, - new BackendInfo(backendId, backend.getTotalCapacityB(), backend.getAvailableCapacityB())); + new BackendInfo(backend.getHost(), backendId, backend.getTotalCapacityB(), + backend.getAvailableCapacityB())); } return backendInfos; } - private Map> initBackendCapacityInfos(Map backendInfos) { + /** + * maybe more than one backend in same host, so capacity of backend are saved into one + * list with same host. it need to randomly select one backend in migration and supplement, + * and select all in delete. + * + * @param backendInfos + * @return + */ + private Map>> initBackendCapacityInfos(Map backendInfos) { Preconditions.checkNotNull(backendInfos); if (backendInfos.size() == 0) { return null; } + + Map> backendInfosMap = Maps.newHashMap(); + for (BackendInfo info : backendInfos.values()) { + if (backendInfosMap.containsKey(info.getHost())) { + backendInfosMap.get(info.getHost()).add(info); + } else { + List list = Lists.newArrayList(); + list.add(info); + backendInfosMap.put(info.getHost(), list); + } + } + + if (backendInfosMap.size() <= 1) { + return null; + } // calculate avg used ratio long totalCapacityB = 0; @@ -475,16 +502,21 @@ private Map> initBackendCapacityInfos(Map ba avgUsedRatio, lowRatioThreshold, highRatioThreshold); // init capacityLevelToBackendIds - Map> capacityLevelToBackendIds = new HashMap>(); + Map>> capacityLevelToBackendIds = new HashMap>>(); for (Level level : Level.values()) { - capacityLevelToBackendIds.put(level, new HashSet()); + capacityLevelToBackendIds.put(level, new HashSet()); } - for (BackendInfo backendInfo : backendInfos.values()) { - long backendId = backendInfo.getBackendId(); - long backendTotalCapacityB = backendInfo.getTotalCapacityB(); - long backendAvailableCapacityB = backendInfo.getAvailableCapacityB(); + for (List list : backendInfosMap.values()) { + long backendTotalCapacityB = 0; + long backendAvailableCapacityB = 0; + List ids = Lists.newArrayList(); + for (BackendInfo backendInfo: list) { + backendTotalCapacityB += backendInfo.getTotalCapacityB(); + backendAvailableCapacityB += backendInfo.getAvailableCapacityB(); + ids.add(backendInfo.getBackendId()); + } double usedRatio = (double) (backendTotalCapacityB - backendAvailableCapacityB) / backendTotalCapacityB; - Set backendIds = null; + Set> backendIds = null; if (usedRatio < lowRatioThreshold) { backendIds = capacityLevelToBackendIds.get(Level.LOW); } else if (usedRatio < highRatioThreshold) { @@ -492,28 +524,54 @@ private Map> initBackendCapacityInfos(Map ba } else { backendIds = capacityLevelToBackendIds.get(Level.HIGH); } - backendIds.add(backendId); + + backendIds.add(ids); - // set clone capacity - // totalCapacity * avgUsedRatio * (1 + threshold) - usedCapacity double backendCloneCapacityB = backendTotalCapacityB * highRatioThreshold - (backendTotalCapacityB - backendAvailableCapacityB); - backendInfo.setCloneCapacityB((long) backendCloneCapacityB); + // set clone capacity + // totalCapacity * avgUsedRatio * (1 + threshold) - usedCapacity + for (BackendInfo backendInfo: list) { + backendInfo.setCloneCapacityB((long) backendCloneCapacityB); + } } LOG.debug("backend capacity infos. level map: {}", capacityLevelToBackendIds); return capacityLevelToBackendIds; } - private Map> initBackendDistributionInfos(Map backendInfos) { + /** + * maybe more than one backend in same host, so distribution of tablet are saved into one + * list with same host. it need to randomly select one backend in migration and supplement, + * and select all in delete. + * + * @param backendInfos + * @return + */ + private Map>> initBackendDistributionInfos(Map backendInfos) { Preconditions.checkNotNull(backendInfos); if (backendInfos.size() == 0) { return null; } + Map> backendInfosMap = Maps.newHashMap(); + for (BackendInfo info : backendInfos.values()) { + if (backendInfosMap.containsKey(info.getHost())) { + backendInfosMap.get(info.getHost()).add(info); + } else { + List list = Lists.newArrayList(); + list.add(info); + backendInfosMap.put(info.getHost(), list); + } + } + + if (backendInfosMap.size() <= 1) { + return null; + } + // init distributionLevelToBackendIds - Map> distributionLevelToBackendIds = new HashMap>(); + Map>> distributionLevelToBackendIds = new HashMap>>(); for (Level level : Level.values()) { - distributionLevelToBackendIds.put(level, new HashSet()); + distributionLevelToBackendIds.put(level, new HashSet()); } int totalReplicaNum = 0; @@ -524,10 +582,14 @@ private Map> initBackendDistributionInfos(Map backendIds = null; + for (List list : backendInfosMap.values()) { + int backendReplicaNum = 0; + List ids = Lists.newArrayList(); + for (BackendInfo backendInfo: list) { + backendReplicaNum += backendInfo.getTableReplicaNum(); + ids.add(backendInfo.getBackendId()); + } + Set> backendIds = null; if (backendReplicaNum < lowReplicaNumThreshold) { backendIds = distributionLevelToBackendIds.get(Level.LOW); } else if (backendReplicaNum < highReplicaNumThreshold) { @@ -535,11 +597,14 @@ private Map> initBackendDistributionInfos(Map backendInfos) { } } - private long selectRandomBackendId(List candidateBackendIds, Set excludeBackendIds) { + private long selectRandomBackendId(List candidateBackendIds, Set excludeBackendHosts, + Map backendInfos) { Collections.shuffle(candidateBackendIds); for (long backendId : candidateBackendIds) { - if (!excludeBackendIds.contains(backendId)) { + if (!excludeBackendHosts.contains(backendInfos.get(backendId).getHost())) { return backendId; } } @@ -567,8 +633,9 @@ private long selectRandomBackendId(List candidateBackendIds, Set exc * 2. if supplement, select from 2.1 low distribution and capacity 2.2 low * distribution 2.3 all order by distribution */ - private long selectCloneReplicaBackendId(Map> distributionLevelToBackendIds, - Map> capacityLevelToBackendIds, Map backendInfos, TabletInfo tabletInfo, + private long selectCloneReplicaBackendId(Map>> distributionLevelToBackendIds, + Map>> capacityLevelToBackendIds, + Map backendInfos, TabletInfo tabletInfo, JobType jobType, JobPriority priority) { Set existBackendIds = tabletInfo.getBackendIds(); long tabletSizeB = tabletInfo.getTabletSizeB(); @@ -577,32 +644,47 @@ private long selectCloneReplicaBackendId(Map> distributionLevel // candidate backend from which step for debug String step = "-1"; + final SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + Set existBackendHosts = Sets.newHashSet(); + for (Long id : existBackendIds) { + existBackendHosts.add(infoService.getBackend(id).getHost()); + } + if (priority == JobPriority.HIGH || priority == JobPriority.NORMAL) { // 1. HIGH priority List allBackendIds = Lists.newArrayList(); - for (Set backendIds : distributionLevelToBackendIds.values()) { - allBackendIds.addAll(backendIds); + for (Set> backendIds : distributionLevelToBackendIds.values()) { + for (List list : backendIds) { + // select one backend in same host + Collections.shuffle(list); + allBackendIds.add(list.get(0)); + } } Collections.shuffle(allBackendIds); - candidateBackendId = selectRandomBackendId(allBackendIds, existBackendIds); + candidateBackendId = selectRandomBackendId(allBackendIds, existBackendHosts, backendInfos); step = "0"; } else { // candidate backendIds: // low distribution and low capacity backends - Set candidateBackendIdsByDistribution = distributionLevelToBackendIds.get(Level.LOW); + Set> candidateBackendIdsByDistribution = distributionLevelToBackendIds.get(Level.LOW); LOG.debug("candidate backends by distribution: {}", candidateBackendIdsByDistribution); - Set candidateBackendIdsByCapacity = capacityLevelToBackendIds.get(Level.LOW); + Set> candidateBackendIdsByCapacity = capacityLevelToBackendIds.get(Level.LOW); LOG.debug("candidate backends by capacity: {}", candidateBackendIdsByCapacity); // select dest backendId from candidates // 2. check canCloneByCapacity && canCloneByDistribution from // candidateBackendIds - List candidateBackendIds = Lists.newArrayList(candidateBackendIdsByDistribution); + List candidateBackendIds = Lists.newArrayList(); + for (List list : candidateBackendIdsByDistribution) { + / select one backend in same host + Collections.shuffle(list); + candidateBackendIds.add(list.get(0)); + } candidateBackendIds.retainAll(candidateBackendIdsByCapacity); Collections.shuffle(candidateBackendIds); for (long backendId : candidateBackendIds) { - if (existBackendIds.contains(backendId)) { + if (existBackendHosts.contains(backendInfos.get(backendId).getHost())) { continue; } candidateBackendInfo = backendInfos.get(backendId); @@ -618,14 +700,18 @@ private long selectCloneReplicaBackendId(Map> distributionLevel if (jobType == JobType.SUPPLEMENT) { // 3.1 random from candidateBackendIds if (candidateBackendId == -1 && !candidateBackendIds.isEmpty()) { - candidateBackendId = selectRandomBackendId(candidateBackendIds, existBackendIds); + candidateBackendId = selectRandomBackendId(candidateBackendIds, existBackendHosts, backendInfos); step = "2.1"; } // 3.2 random from candidateBackendIdsByDistribution if (candidateBackendId == -1 && !candidateBackendIdsByDistribution.isEmpty()) { - candidateBackendIds = Lists.newArrayList(candidateBackendIdsByDistribution); - candidateBackendId = selectRandomBackendId(candidateBackendIds, existBackendIds); + for (List list : candidateBackendIdsByDistribution) { + // select one backend in same host + Collections.shuffle(list); + candidateBackendIds.add(list.get(0)); + } + candidateBackendId = selectRandomBackendId(candidateBackendIds, existBackendHosts, backendInfos); step = "2.2"; } @@ -637,8 +723,13 @@ private long selectCloneReplicaBackendId(Map> distributionLevel continue; } - candidateBackendIds = Lists.newArrayList(candidateBackendIdsByDistribution); - candidateBackendId = selectRandomBackendId(candidateBackendIds, existBackendIds); + for (List list : candidateBackendIdsByDistribution) { + // select one backend in same host + Collections.shuffle(list); + candidateBackendIds.add(list.get(0)); + } + candidateBackendId = selectRandomBackendId(candidateBackendIds, + existBackendHosts, backendInfos); if (candidateBackendId != -1) { step = "2.3"; break; @@ -665,7 +756,7 @@ private long selectCloneReplicaBackendId(Map> distributionLevel * offline > clone > low version > high distribution in cluster */ private void deleteRedundantReplicas(Database db, TabletInfo tabletInfo, - Map> distributionLevelToBackendIds) { + Map>> distributionLevelToBackendIds) { long tableId = tabletInfo.getTableId(); long partitionId = tabletInfo.getPartitionId(); long indexId = tabletInfo.getIndexId(); @@ -813,11 +904,16 @@ public int compare(Replica arg0, Replica arg1) { // delete where for (Level level : levels) { - List levelBackendIds = Lists.newArrayList(distributionLevelToBackendIds.get(level)); + List levelBackendIds = Lists.newArrayList(); + for (Set> sets : distributionLevelToBackendIds.values()) { + for (List list : sets) { + Collections.shuffle(list); + levelBackendIds.add(list.get(0)); + } + } Collections.shuffle(levelBackendIds); backendIds.addAll(levelBackendIds); } - for (long backendId : backendIds) { Replica replica = tablet.getReplicaByBackendId(backendId); if (tablet.deleteReplica(replica)) { @@ -843,7 +939,8 @@ public int compare(Replica arg0, Replica arg1) { } private void checkSupplement(Map cloneTabletMap, - Map> distributionLevelToBackendIds, Map> capacityLevelToBackendIds, + Map>> distributionLevelToBackendIds, + Map>> capacityLevelToBackendIds, Map backendInfos) { for (TabletInfo tabletInfo : cloneTabletMap.values()) { addCloneJob(tabletInfo, distributionLevelToBackendIds, capacityLevelToBackendIds, backendInfos, @@ -852,10 +949,11 @@ private void checkSupplement(Map cloneTabletMap, } private void checkMigration(Map> backendToTablets, - Map> distributionLevelToBackendIds, Map> capacityLevelToBackendIds, + Map>> distributionLevelToBackendIds, Map>> capacityLevelToBackendIds, Map backendInfos) { // select src tablet from high distribution or high capacity backends - Set highBackendIds = new HashSet(); + Set> highBackendIds = new HashSet(); highBackendIds.addAll(distributionLevelToBackendIds.get(Level.HIGH)); highBackendIds.addAll(capacityLevelToBackendIds.get(Level.HIGH)); if (highBackendIds.isEmpty()) { @@ -864,9 +962,11 @@ private void checkMigration(Map> backendToTablets, } Set candidateMigrationTablets = new HashSet(); - for (long backendId : highBackendIds) { - if (backendToTablets.containsKey(backendId)) { - candidateMigrationTablets.addAll(backendToTablets.get(backendId)); + for (List backendIds : highBackendIds) { + // select one backend in same host + Collections.shuffle(backendIds); + if (backendToTablets.containsKey(backendIds.get(0))) { + candidateMigrationTablets.addAll(backendToTablets.get(backendIds.get(0))); } } if (candidateMigrationTablets.isEmpty()) { @@ -892,8 +992,8 @@ private void checkMigration(Map> backendToTablets, } } - private void addCloneJob(TabletInfo tabletInfo, Map> distributionLevelToBackendIds, - Map> capacityLevelToBackendIds, Map backendInfos, JobType jobType) { + private void addCloneJob(TabletInfo tabletInfo, Map>> distributionLevelToBackendIds, + Map>> capacityLevelToBackendIds, Map backendInfos, JobType jobType) { // priority short onlineReplicaNum = tabletInfo.getOnlineReplicaNum(); short replicationNum = tabletInfo.getReplicationNum(); @@ -1181,7 +1281,8 @@ public void setDbState(DbState dbState) { private class BackendInfo { private long backendId; - + private String host; + private long totalCapacityB; private long availableCapacityB; // capacity for clone @@ -1191,15 +1292,24 @@ private class BackendInfo { // replica num for clone private int cloneReplicaNum; - public BackendInfo(long backendId, long totalCapacityB, long availableCapacityB) { + public BackendInfo(String host, long backendId, long totalCapacityB, long availableCapacityB) { this.backendId = backendId; this.totalCapacityB = totalCapacityB; this.availableCapacityB = availableCapacityB; + this.host = host; this.cloneCapacityB = 0L; this.tableReplicaNum = 0; this.cloneReplicaNum = 0; } + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + public long getBackendId() { return backendId; } diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index e92fe9627193ea..0ccaa2f5a39d3e 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -14,16 +14,20 @@ // under the License. package com.baidu.palo.system; - + +import java.util.Collections; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,6 +61,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.eventbus.EventBus; public class SystemInfoService extends Daemon { @@ -85,7 +90,22 @@ public class SystemInfoService extends Daemon { private long lastBackendIdForCreation = -1; private long lastBackendIdForOther = -1; - + + // used as a sort of backends in host + private static final Comparator> hostListComparator = new Comparator> (){ + @Override + public int compare(List o1, List o2) { + List list1 = (List)o1; + List list2 = (List)o2; + if (list1.size() > list2.size()) { + return -1; + } else { + return 1; + } + } + + }; + public SystemInfoService() { super("cluster info service", FeConstants.heartbeat_interval_second * 1000); idToBackendRef = new AtomicReference>(ImmutableMap. of()); @@ -291,7 +311,7 @@ public List getBackendIds(boolean needAlive) { } /** - * chose be to create cluster + * choose be to create cluster * * @param clusterName * @param num @@ -301,31 +321,48 @@ public List createCluster(String clusterName, int num) { final List ret = Lists.newArrayList(); final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); final Map> classMap = getHostBackendsMap(true, true, false); - - if (num > classMap.size()) { - return null; - } - // to select host where has more free be - int count = num; - while (count-- > 0) { - List tmp = null; - for (List backendList : classMap.values()) { - if (tmp == null) { - tmp = backendList; - } else { - if (tmp.size() < backendList.size()) { - tmp = backendList; - } - } - } - // random select a backend - if (tmp != null && tmp.size() > 0) { - ret.add(tmp.get(0).getId()); - classMap.remove(tmp.get(0).getHost()); - } - } - - if (ret.size() != num) { + + LOG.info("calculate create cluster backend. cluster:" + name + " num:" + num); + int allBackendCount = 0; + List> hostList = Lists.newArrayList(); + for (List list : classMap.values()) { + allBackendCount += list.size(); + hostList.add(list); + } + + if (num > allBackendCount) { + LOG.warn("an excessive number of backends, require :" + num + " all:" + allBackendCount); + return null; + } + + // sort by number of backend in host + Collections.sort(hostList, hostListComparator); + + // hostIsEmpty is userd to mark if host is empty, so avoid + // iterating hostIsEmpty with numOfHost in every circle + boolean[] hostIsEmpty = new boolean[hostList.size()]; + for (int i = 0; i < hostList.size(); i++) { + hostIsEmpty[i] = false; + } + // to select backend in circle + int numOfHost = hostList.size(); + for (int i = 0; ; i = ++i % hostList.size()) { + if (hostList.get(i).size() > 0) { + ret.add(hostList.get(i).remove(0).getId()); + } else { + // avoid counting repeatedly + if (hostIsEmpty[i] == false) { + hostIsEmpty[i] = true; + numOfHost--; + } + } + if (ret.size() == num || numOfHost == 0) { + break; + } + } + + if (ret.size() != num) { + LOG.warn("an excessive number of backends, require :" + num + " get:" + ret.size()); return null; } @@ -390,54 +427,52 @@ public void releaseBackends(String clusterName, boolean log) { public List calculateDecommissionBackends(String name, int num) { final List ret = new ArrayList(); final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final Map> classMap = getHostBackendsMap(false, false, true); final List backendList = getClusterBackendIds(name); - final Map> clusterClassMap = Maps.newHashMap(); - if (backendList.size() < num) { + final Map> clusterMap = Maps.newHashMap(); + + LOG.info("calculate decommission backend cluster:" + name + " decommission num:" + num); + if (backendList.size() <= num) { + LOG.warn("an excessive number of backends, require :" + num + " all:" + backendList.size()); return null; - } - - Iterator iterator = backendList.iterator(); - while (iterator.hasNext()) { - final Long id = iterator.next(); - final Backend backend = copiedBackends.get(id); - clusterClassMap.put(backend.getHost(), classMap.get(backend.getHost())); - } - int count = num; - while (count-- > 0) { - List tmp = null; - String host = null; - final Iterator>> iter = clusterClassMap.entrySet().iterator(); - // to select host where has least free be - while (iter.hasNext()) { - final Map.Entry> entry = (Map.Entry>) iter.next(); - String key = (String) entry.getKey(); - List value = (List) entry.getValue(); - - if (tmp == null) { - tmp = value; - host = key; - } else { - if (tmp.size() > value.size()) { - tmp = value; - host = key; - } - } - } - if (tmp != null) { - iterator = backendList.iterator(); - while (iterator.hasNext()) { - final Long id = iterator.next(); - final Backend backend = copiedBackends.get(id); - if (backend.getHost().equals(host)) { - ret.add(id); - } - } - clusterClassMap.remove(host); - } - } - - if (ret.size() < num) { + } + + // put backend in same host in list + for (Long id : backendList) { + final Backend backend = copiedBackends.get(id); + if (clusterMap.containsKey(backend.getHost())) { + clusterMap.get(backend.getHost()).add(backend); + } else { + List list = Lists.newArrayList(); + list.add(backend); + clusterMap.put(backend.getHost(), list); + } + } + + List> hostList = Lists.newArrayList(); + for (List list : clusterMap.values()) { + hostList.add(list); + } + + // sort by number of backend in host + Collections.sort(hostList, hostListComparator); + + // acquire a backend from host with the most backend in every circle + // break if all host are empty or get enough backends + while (true) { + if (hostList.get(0).size() > 0) { + ret.add(hostList.get(0).remove(0).getId()); + if (ret.size() == num) { + //enough + break; + } + Collections.sort(hostList, hostListComparator); + } else { + // all hosts empty + break; + } + } + + if (ret.size() != num) { return null; } @@ -445,7 +480,10 @@ public List calculateDecommissionBackends(String name, int num) { } /** - * get expansion's backend id list + * to increase number of backend in cluster, firstly acquire backend from host not in cluster + * if not enough, secondly acquire backend from host in cluster, returns a list sorted by the + * descending order of the number of backend in the first two ways, and get backend from the + * list in cycle. * * @param name * @param num @@ -454,48 +492,101 @@ public List calculateDecommissionBackends(String name, int num) { public List calculateExpansionBackends(String name, int num) { final List ret = new ArrayList(); final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + // free backend in all host final Map> classMap = getHostBackendsMap(true, true, false); final List clusterBackends = getClusterBackendIds(name); - - Iterator iterator = clusterBackends.iterator(); - while (iterator.hasNext()) { - final Long id = iterator.next(); - final Backend backend = copiedBackends.get(id); - if (classMap.containsKey(backend.getHost())) { - classMap.remove(backend.getHost()); - } - } - - if (num > classMap.size()) { - return null; - } - - int count = num; - while (count-- > 0) { - List tmp = null; - for (List list : classMap.values()) { - if (tmp == null) { - tmp = list; - } else { - if (tmp.size() < list.size()) { - tmp = list; - } - } - } - - // random select a backend - if (tmp != null && tmp.size() > 0) { - ret.add(tmp.get(0).getId()); - classMap.remove(tmp.get(0).getHost()); - } - } - + + LOG.info("calculate expansion backend cluster:" + name + " expansion num:" + num); + // host not in cluster + List> otherHostList = Lists.newArrayList(); + // host in cluster + List> hostList = Lists.newArrayList(); + int allBackendCount = 0; + + Set hostsSet = Sets.newHashSet(); + for (Long id : clusterBackends) { + hostsSet.add(getBackend(id).getHost()); + } + + // distinguish backend in or out of cluster + for (List list : classMap.values()) { + allBackendCount += list.size(); + if (hostsSet.contains(list.get(0).getHost())) { + hostList.add(list); + } else { + otherHostList.add(list); + } + } + + if (num > allBackendCount) { + LOG.warn("an excessive number of backends, require :" + num + " all:" + allBackendCount); + return null; + } + + // sort by number of backend in other host not in cluster + Collections.sort(otherHostList, hostListComparator); + + // sort by number of backend in cluster's host + Collections.sort(hostList, hostListComparator); + + // first select backend in other host + if (otherHostList.size() > 0) { + // hostIsEmpty is userd to mark if host is empty, so + // avoid iterating hostIsEmpty with numOfHost in every circle + boolean[] hostIsEmpty = new boolean[otherHostList.size()]; + for (int i = 0; i < otherHostList.size(); i++) { + hostIsEmpty[i] = false; + } + int numOfHost = otherHostList.size(); + for (int i = 0;; i = ++i % otherHostList.size()) { + if (otherHostList.get(i).size() > 0) { + ret.add(otherHostList.get(i).remove(0).getId()); + } else { + // avoid counting repeatedly + if (hostIsEmpty[i] == false) { + hostIsEmpty[i] = true; + numOfHost--; + } + } + if (ret.size() == num || numOfHost == 0) { + break; + } + } + } + + // secondly select backend in cluster' host + if (hostList.size() > 0 && ret.size() != num) { + boolean[] hostIsEmpty = new boolean[hostList.size()]; + for (int i = 0; i < hostList.size(); i++) { + hostIsEmpty[i] = false; + } + int numOfHost = hostList.size(); + for (int i = 0;; i = ++i % hostList.size()) { + if (hostList.get(i).size() > 0) { + ret.add(hostList.get(i).remove(0).getId()); + } else { + if (hostIsEmpty[i] == false) { + hostIsEmpty[i] = true; + numOfHost--; + } + } + if (ret.size() == num || numOfHost == 0) { + break; + } + } + + if (ret.size() != num) { + LOG.warn("an excessive number of backends, require :" + num + " get:" + ret.size()); + return null; + } + } + if (ret.size() != num) { return null; } // set be state and owner/ - iterator = ret.iterator(); + Iterator iterator = ret.iterator(); while (iterator.hasNext()) { final Long id = iterator.next(); final Backend backend = copiedBackends.get(id); @@ -676,10 +767,31 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA lastBackendIdForOtherMap.put(clusterName, lastBackendId); } } - } - + } + + // put backend with same host in same list + final List srcBackends = getClusterBackends(clusterName); + Map> backendMaps = Maps.newHashMap(); + for (Backend backend : srcBackends) { + if (backendMaps.containsKey(backend.getHost())){ + backendMaps.get(backend.getHost()).add(backend); + } else { + List list = Lists.newArrayList(); + list.add(backend); + backendMaps.put(backend.getHost(), list); + } + } + + // if more than one backend exists in same host, select a backend at random + List backends = Lists.newArrayList(); + for (List list : backendMaps.values()) { + Collections.shuffle(list); + backends.add(list.get(0)); + } + + Collections.shuffle(backends); + List backendIds = Lists.newArrayList(); - final List backends = getClusterBackends(clusterName); // get last backend index int lastBackendIndex = -1; int index = -1; @@ -689,8 +801,7 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA lastBackendIndex = index; break; } - } - + } Iterator iterator = Iterators.cycle(backends); index = -1; boolean failed = false; @@ -723,7 +834,7 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA break; } } - + if (clusterName.equals(DEFAULT_CLUSTER)) { if (isCreate) { lastBackendIdForCreation = lastBackendId; @@ -738,7 +849,6 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA lastBackendIdForOtherMap.put(clusterName, lastBackendId); } } - if (backendIds.size() != backendNum) { failed = true; } From 843855e7ab5fd3d3ff21aa088a1d71d740dc6534 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Fri, 25 Aug 2017 17:42:56 +0800 Subject: [PATCH 2/8] compile failed --- fe/src/com/baidu/palo/clone/CloneChecker.java | 2 +- fe/src/com/baidu/palo/system/SystemInfoService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/src/com/baidu/palo/clone/CloneChecker.java b/fe/src/com/baidu/palo/clone/CloneChecker.java index debf51c961ee78..88e7dc95c54471 100644 --- a/fe/src/com/baidu/palo/clone/CloneChecker.java +++ b/fe/src/com/baidu/palo/clone/CloneChecker.java @@ -677,7 +677,7 @@ private long selectCloneReplicaBackendId(Map>> distributio // candidateBackendIds List candidateBackendIds = Lists.newArrayList(); for (List list : candidateBackendIdsByDistribution) { - / select one backend in same host + // select one backend in same host Collections.shuffle(list); candidateBackendIds.add(list.get(0)); } diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index 0ccaa2f5a39d3e..a19849fb503f7d 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -322,7 +322,7 @@ public List createCluster(String clusterName, int num) { final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); final Map> classMap = getHostBackendsMap(true, true, false); - LOG.info("calculate create cluster backend. cluster:" + name + " num:" + num); + LOG.info("calculate create cluster backend. cluster:" + clusterName + " num:" + num); int allBackendCount = 0; List> hostList = Lists.newArrayList(); for (List list : classMap.values()) { From 99714177b8462fb5d68ccc8ced2529024946cf54 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Fri, 25 Aug 2017 17:45:07 +0800 Subject: [PATCH 3/8] count* 's subquery with orderby exec error --- fe/src/com/baidu/palo/planner/SingleNodePlanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java index a3ac1e914f865c..9595d46467b637 100644 --- a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java +++ b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java @@ -654,7 +654,7 @@ private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer, long // for case: select count(*) from (select col from table) t // for simple, we just materialize sub tree if has count star - if (aggInfo != null) { + if (aggInfo != null && !(root instanceof SortNode)) { for (FunctionCallExpr aggExpr : aggInfo.getAggregateExprs()) { if (aggExpr.isCountStar()) { analyzer.markRefdSlots(analyzer, root, null, null); From c632137435d1515cb1615a08b3d725081388dbf7 Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Fri, 25 Aug 2017 17:49:37 +0800 Subject: [PATCH 4/8] modify show processlist result and check name format error --- fe/src/com/baidu/palo/analysis/AlterClusterStmt.java | 2 +- fe/src/com/baidu/palo/analysis/ShowProcesslistStmt.java | 1 + fe/src/com/baidu/palo/cluster/ClusterNamespace.java | 8 ++++---- fe/src/com/baidu/palo/common/ErrorCode.java | 2 +- fe/src/com/baidu/palo/qe/ConnectContext.java | 5 +++-- 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/fe/src/com/baidu/palo/analysis/AlterClusterStmt.java b/fe/src/com/baidu/palo/analysis/AlterClusterStmt.java index 3822c7e965dc30..3bb55a5da0d6ab 100644 --- a/fe/src/com/baidu/palo/analysis/AlterClusterStmt.java +++ b/fe/src/com/baidu/palo/analysis/AlterClusterStmt.java @@ -56,7 +56,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_NO_PARAMETER); } - if (instanceNum < 0) { + if (instanceNum <= 0) { ErrorReport.reportAnalysisException(ErrorCode.ERR_CLUSTER_CREATE_ISTANCE_NUM_ERROR); } } diff --git a/fe/src/com/baidu/palo/analysis/ShowProcesslistStmt.java b/fe/src/com/baidu/palo/analysis/ShowProcesslistStmt.java index 446f47a99fe5af..9d26a0b1dc917d 100644 --- a/fe/src/com/baidu/palo/analysis/ShowProcesslistStmt.java +++ b/fe/src/com/baidu/palo/analysis/ShowProcesslistStmt.java @@ -28,6 +28,7 @@ public class ShowProcesslistStmt extends ShowStmt { .addColumn(new Column("Id", ColumnType.createType(PrimitiveType.BIGINT))) .addColumn(new Column("User", ColumnType.createVarchar(16))) .addColumn(new Column("Host", ColumnType.createVarchar(16))) + .addColumn(new Column("Cluster", ColumnType.createVarchar(16))) .addColumn(new Column("Db", ColumnType.createVarchar(16))) .addColumn(new Column("Command", ColumnType.createVarchar(16))) .addColumn(new Column("Time", ColumnType.createType(PrimitiveType.INT))) diff --git a/fe/src/com/baidu/palo/cluster/ClusterNamespace.java b/fe/src/com/baidu/palo/cluster/ClusterNamespace.java index a0d50bd0b0f7ea..15569ed6ebecf1 100644 --- a/fe/src/com/baidu/palo/cluster/ClusterNamespace.java +++ b/fe/src/com/baidu/palo/cluster/ClusterNamespace.java @@ -50,7 +50,7 @@ private static boolean checkName(String str) { return false; } final String[] ele = str.split(CLUSTER_DELIMITER); - return (ele.length > 1) ? false : true; + return (ele.length > 1) ? true : false; } private static String linkString(String str1, String str2) { @@ -72,7 +72,7 @@ private static String linkString(String str1, String str2) { * @return */ public static String getDbNameFromFullName(String db) { - if (checkName(db)) { + if (!checkName(db)) { return null; } return extract(db, 1); @@ -85,7 +85,7 @@ public static String getDbNameFromFullName(String db) { * @return */ public static String getUsrNameFromFullName(String usr) { - if (checkName(usr)) { + if (!checkName(usr)) { return null; } return extract(usr, 1); @@ -98,7 +98,7 @@ public static String getUsrNameFromFullName(String usr) { * @return */ public static String getClusterNameFromFullName(String str) { - if (checkName(str)) { + if (!checkName(str)) { return null; } return extract(str, 0); diff --git a/fe/src/com/baidu/palo/common/ErrorCode.java b/fe/src/com/baidu/palo/common/ErrorCode.java index 1cd1dce7087d6e..8485f17e1e712f 100644 --- a/fe/src/com/baidu/palo/common/ErrorCode.java +++ b/fe/src/com/baidu/palo/common/ErrorCode.java @@ -155,7 +155,7 @@ public enum ErrorCode { ERR_CLUSTER_NAME_NULL(5041, new byte[] {'4', '2', '0', '0', '0'}, "No cluster name"), ERR_CLUSTER_NO_PERMISSIONS(5042, new byte[] {'4', '2', '0', '0', '0'}, "No permissions"), ERR_CLUSTER_CREATE_ISTANCE_NUM_ERROR(5043, new byte[] {'4', '2', '0', '0', '0'}, - "Instance num can't be less than 0"), + "Instance num can't be less than or equal 0"), ERR_CLUSTER_LINK_NO_DES(5044, new byte[] {'4', '2', '0', '0', '0'}, "No dest"), ERR_CLUSTER_LINK_NO_SRC(5045, new byte[] {'4', '2', '0', '0', '0'}, "No src"), ERR_CLUSTER_SRC_CLUSTER_NO_EXIT(5046, new byte[] {'4', '2', '0', '0', '0'}, "Src cluster '%s' no exist"), diff --git a/fe/src/com/baidu/palo/qe/ConnectContext.java b/fe/src/com/baidu/palo/qe/ConnectContext.java index 548369b9e42630..5cbad4c90ee93c 100644 --- a/fe/src/com/baidu/palo/qe/ConnectContext.java +++ b/fe/src/com/baidu/palo/qe/ConnectContext.java @@ -19,6 +19,7 @@ import java.util.List; import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.cluster.ClusterNamespace; import com.baidu.palo.mysql.MysqlCapability; import com.baidu.palo.mysql.MysqlChannel; import com.baidu.palo.mysql.MysqlCommand; @@ -313,10 +314,10 @@ public class ThreadInfo { public List toRow(long nowMs) { List row = Lists.newArrayList(); row.add("" + connectionId); - row.add(user); + row.add(ClusterNamespace.getDbNameFromFullName(user)); row.add(mysqlChannel.getRemote()); row.add(clusterName); - row.add(currentDb); + row.add(ClusterNamespace.getDbNameFromFullName(currentDb)); row.add(command.toString()); row.add("" + (nowMs - startTime) / 1000); row.add(""); From e5d4105dc43449ac49c411e2147140ba753e9e6b Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Fri, 25 Aug 2017 18:04:57 +0800 Subject: [PATCH 5/8] merge log and add cancel cluster for root, show backends for users in cluster --- .../palo/alter/DecommissionBackendJob.java | 22 +- .../baidu/palo/analysis/AlterSystemStmt.java | 2 +- .../palo/analysis/CancelAlterSystemStmt.java | 4 +- .../baidu/palo/analysis/ShowBackendsStmt.java | 45 ++++ fe/src/com/baidu/palo/catalog/Catalog.java | 40 +++- fe/src/com/baidu/palo/cluster/Cluster.java | 11 +- .../palo/common/proc/BackendsProcDir.java | 194 +++++++++++------- .../com/baidu/palo/journal/JournalEntity.java | 5 + fe/src/com/baidu/palo/persist/EditLog.java | 10 + .../com/baidu/palo/persist/OperationType.java | 2 + .../persist/UpdateClusterAndBackends.java | 61 ++++++ fe/src/com/baidu/palo/qe/ShowExecutor.java | 11 + fe/src/com/baidu/palo/system/Backend.java | 48 +++-- .../baidu/palo/system/SystemInfoService.java | 28 ++- gensrc/parser/sql_parser.y | 12 +- 15 files changed, 347 insertions(+), 148 deletions(-) create mode 100644 fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java create mode 100644 fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java diff --git a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java index 923720dd865235..f7cf2e0af753ee 100644 --- a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java +++ b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java @@ -43,12 +43,12 @@ import com.baidu.palo.clone.Clone; import com.baidu.palo.clone.CloneJob.JobPriority; import com.baidu.palo.cluster.Cluster; +import com.baidu.palo.persist.UpdateClusterAndBackends; import com.baidu.palo.common.Config; import com.baidu.palo.common.DdlException; import com.baidu.palo.common.FeMetaVersion; import com.baidu.palo.common.MetaNotFoundException; import com.baidu.palo.common.io.Text; -import com.baidu.palo.persist.ClusterInfo; import com.baidu.palo.system.Backend; import com.baidu.palo.system.Backend.BackendState; import com.baidu.palo.system.SystemInfoService; @@ -478,23 +478,19 @@ public synchronized int tryFinishJob() { // Shrinking capacity in cluser if (decomissionType == DecomissionType.ClusterDecomission) { for (String clusterName : clusterBackendsMap.keySet()) { - final Map map = clusterBackendsMap.get(clusterName); + final Map idToBackend = clusterBackendsMap.get(clusterName); final Cluster cluster = Catalog.getInstance().getCluster(clusterName); - final List removeIds = Lists.newArrayList(); - for (long id : map.keySet()) { - final Backend backend = map.get(id); - backend.setOwnerClusterName(""); + List backendList = Lists.newArrayList(); + for (long id : idToBackend.keySet()) { + final Backend backend = idToBackend.get(id); + backend.clearClusterName(); backend.setBackendState(BackendState.free); backend.setDecommissioned(false); + backendList.add(id); cluster.removeBackend(id); - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - removeIds.add(id); } - cluster.removeBackends(removeIds); - ClusterInfo info = new ClusterInfo(); - info.setClusterName(cluster.getName()); - info.setBackendIdList(cluster.getBackendIdList()); - Catalog.getInstance().getEditLog().logUpdateCluster(info); + UpdateClusterAndBackends updateInfo = new UpdateClusterAndBackends(backendList); + Catalog.getInstance().getEditLog().logUpdateClusterAndBackendState(updateInfo); } } } diff --git a/fe/src/com/baidu/palo/analysis/AlterSystemStmt.java b/fe/src/com/baidu/palo/analysis/AlterSystemStmt.java index 5695c1e671b906..c09e21a9995080 100644 --- a/fe/src/com/baidu/palo/analysis/AlterSystemStmt.java +++ b/fe/src/com/baidu/palo/analysis/AlterSystemStmt.java @@ -56,7 +56,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("ALTER CLUSTER ").append(alterClause.toSql()); + sb.append("ALTER SYSTEM ").append(alterClause.toSql()); return sb.toString(); } diff --git a/fe/src/com/baidu/palo/analysis/CancelAlterSystemStmt.java b/fe/src/com/baidu/palo/analysis/CancelAlterSystemStmt.java index ed08378c214150..9026044ac1b6ce 100644 --- a/fe/src/com/baidu/palo/analysis/CancelAlterSystemStmt.java +++ b/fe/src/com/baidu/palo/analysis/CancelAlterSystemStmt.java @@ -25,8 +25,8 @@ public class CancelAlterSystemStmt extends CancelStmt { - private List hostPorts; - protected List> hostPortPairs; + protected List hostPorts; + private List> hostPortPairs; public CancelAlterSystemStmt(List hostPorts) { this.hostPorts = hostPorts; diff --git a/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java b/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java new file mode 100644 index 00000000000000..fec1578511c160 --- /dev/null +++ b/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java @@ -0,0 +1,45 @@ +// Copyright 2017 The Apache Software Foundation +// Modifications copyright (C) 2017, Baidu.com, Inc. + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.analysis; + +import com.baidu.palo.catalog.Column; +import com.baidu.palo.catalog.ColumnType; +import com.baidu.palo.common.proc.BackendsProcDir; +import com.baidu.palo.qe.ShowResultSetMetaData; + +public class ShowBackendsStmt extends ShowStmt { + + public ShowBackendsStmt() { + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : BackendsProcDir.TITLE_NAMES) { + if (title.equals("HostName") || title.equals("HeartbeatPort") + || title.equals("BePort") || title.equals("HttpPort")) { + continue; + } + builder.addColumn(new Column(title, ColumnType.createVarchar(30))); + } + return builder.build(); + } +} diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 39633f12018762..66a10c98c27d2f 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -130,6 +130,7 @@ import com.baidu.palo.persist.Storage; import com.baidu.palo.persist.StorageInfo; import com.baidu.palo.persist.TableInfo; +import com.baidu.palo.persist.UpdateClusterAndBackends; import com.baidu.palo.qe.ConnectContext; import com.baidu.palo.qe.JournalObservable; import com.baidu.palo.qe.SessionVariable; @@ -138,6 +139,7 @@ import com.baidu.palo.system.Backend; import com.baidu.palo.system.Frontend; import com.baidu.palo.system.SystemInfoService; +import com.baidu.palo.system.Backend.BackendState; import com.baidu.palo.task.AgentBatchTask; import com.baidu.palo.task.AgentTask; import com.baidu.palo.task.AgentTaskExecutor; @@ -180,6 +182,7 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetAddress; +import java.util.Iterator; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; @@ -4389,8 +4392,8 @@ public List> showWhiteList(String user) { */ public void createCluster(CreateClusterStmt stmt) throws DdlException { final String clusterName = stmt.getClusterName(); + writeLock(); try { - writeLock(); if (nameToCluster.containsKey(clusterName)) { ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_HAS_EXIST, clusterName); } else { @@ -4428,7 +4431,13 @@ private void unprotectCreateCluster(Cluster cluster) { return; } } - + final Iterator iterator = cluster.getBackendIdList().iterator(); + while (iterator.hasNext()) { + final Long id = iterator.next(); + final Backend backend = systemInfo.getBackend(id); + backend.setOwnerClusterName(cluster.getName()); + backend.setBackendState(BackendState.using); + } idToCluster.put(cluster.getId(), cluster); nameToCluster.put(cluster.getName(), cluster); final InfoSchemaDb db = new InfoSchemaDb(cluster.getName()); @@ -4473,8 +4482,8 @@ public void replayCreateCluster(Cluster cluster) { * @throws DdlException */ public void dropCluster(DropClusterStmt stmt) throws DdlException { + writeLock(); try { - writeLock(); final Cluster cluster = nameToCluster.get(stmt.getClusterName()); final String clusterName = stmt.getClusterName(); if (cluster == null) { @@ -4537,9 +4546,8 @@ public void replayUpdateCluster(ClusterInfo info) { public void processModityCluster(AlterClusterStmt stmt) throws DdlException { final String clusterName = stmt.getAlterClusterName(); final int newInstanceNum = stmt.getInstanceNum(); - + writeLock(); try { - writeLock(); if (!nameToCluster.containsKey(clusterName)) { ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, clusterName); } @@ -4682,8 +4690,8 @@ public void migrateDb(MigrateDbStmt stmt) throws DdlException { final String srcDbName = stmt.getSrcDb(); final String desDbName = stmt.getDesDb(); + writeLock(); try { - writeLock(); if (!nameToCluster.containsKey(srcClusterName)) { ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NO_EXIT, srcClusterName); } @@ -4826,9 +4834,8 @@ public void linkDb(LinkDbStmt stmt) throws DdlException { final String desClusterName = stmt.getDesCluster(); final String srcDbName = stmt.getSrcDb(); final String desDbName = stmt.getDesDb(); - + writeLock(); try { - writeLock(); if (!nameToCluster.containsKey(srcClusterName)) { ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_SRC_CLUSTER_NO_EXIT, srcClusterName); } @@ -5072,4 +5079,21 @@ public long loadBrokers(DataInputStream dis, long checksum) throws IOException, } return checksum; } + + public void replayUpdateClusterAndBackends(UpdateClusterAndBackends info) { + for (long id : info.getBackendList()) { + final Backend backend = systemInfo.getBackend(id); + writeLock(); + try { + final Cluster cluster = nameToCluster.get(backend.getOwnerClusterName()); + cluster.removeBackend(id); + } finally { + writeUnlock(); + } + backend.setDecommissioned(false); + backend.clearClusterName(); + backend.setBackendState(BackendState.free); + } + } + } diff --git a/fe/src/com/baidu/palo/cluster/Cluster.java b/fe/src/com/baidu/palo/cluster/Cluster.java index e7e30b2efb457b..b577e60524a76f 100644 --- a/fe/src/com/baidu/palo/cluster/Cluster.java +++ b/fe/src/com/baidu/palo/cluster/Cluster.java @@ -359,21 +359,22 @@ public void readFields(DataInput in) throws IOException { } } - public void removeBackend(long id) { + public void removeBackend(long removedBackendId) { writeLock(); try { - backendIdList.remove(id); + backendIdList.remove((Long)removedBackendId); } finally { writeUnlock(); } } - public void removeBackends(List list) { + public void removeBackends(List removedBackendIds) { writeLock(); try { - backendIdList.remove(list); + backendIdList.remove(removedBackendIds); } finally { writeUnlock(); } } -} \ No newline at end of file + +} diff --git a/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java b/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java index d026a5564b7e74..965cc6c9a19844 100644 --- a/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java +++ b/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java @@ -22,6 +22,7 @@ import com.baidu.palo.alter.DecommissionBackendJob.DecomissionType; import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.cluster.Cluster; import com.baidu.palo.common.AnalysisException; import com.baidu.palo.common.util.ListComparator; import com.baidu.palo.common.util.TimeUtils; @@ -38,13 +39,12 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; - -public class BackendsProcDir implements ProcDirInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("Cluster") - .add("BackendId").add("IP").add("HostName").add("HeartbeatPort").add("BePort").add("HttpPort") - .add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned") + +public class BackendsProcDir implements ProcDirInterface { + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("Cluster") + .add("BackendId").add("IP").add("HostName").add("HeartbeatPort").add("BePort").add("HttpPort") + .add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned") .add("ClusterDecommissioned").add("TabletNum").build(); - public static final int IP_INDEX = 1; public static final int HOSTNAME_INDEX = 2; @@ -53,79 +53,115 @@ public class BackendsProcDir implements ProcDirInterface { public BackendsProcDir(SystemInfoService clusterInfoService) { this.clusterInfoService = clusterInfoService; } - - @Override - public ProcResult fetchResult() throws AnalysisException { - Preconditions.checkNotNull(clusterInfoService); - - BaseProcResult result = new BaseProcResult(); - - result.setNames(TITLE_NAMES); - List backendIds = clusterInfoService.getBackendIds(false); - if (backendIds == null) { - // empty - return result; - } - - List> backendInfos = new LinkedList>(); - for (long backendId : backendIds) { - Backend backend = clusterInfoService.getBackend(backendId); - if (backend == null) { - continue; - } - - String ip = "N/A"; - String hostName = "N/A"; - try { - InetAddress address = InetAddress.getByName(backend.getHost()); - ip = address.getHostAddress(); - hostName = address.getHostName(); - } catch (UnknownHostException e) { - continue; - } - - Integer tabletNum = Catalog.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); - List backendInfo = Lists.newArrayList(); - backendInfo.add(backend.getOwnerClusterName()); - backendInfo.add(String.valueOf(backendId)); - backendInfo.add(ip); - backendInfo.add(hostName); - backendInfo.add(String.valueOf(backend.getHeartbeatPort())); - backendInfo.add(String.valueOf(backend.getBePort())); - backendInfo.add(String.valueOf(backend.getHttpPort())); - backendInfo.add(TimeUtils.longToTimeString(backend.getLastStartTime())); - backendInfo.add(TimeUtils.longToTimeString(backend.getLastUpdateMs())); - backendInfo.add(String.valueOf(backend.isAlive())); - if (backend.isDecommissioned() && backend.getDecommissionType() == DecomissionType.ClusterDecomission) { - backendInfo.add(String.valueOf("false")); - backendInfo.add(String.valueOf("true")); - } else if (backend.isDecommissioned() - && backend.getDecommissionType() == DecomissionType.SystemDecomission) { - backendInfo.add(String.valueOf("true")); - backendInfo.add(String.valueOf("false")); - } else { - backendInfo.add(String.valueOf("false")); - backendInfo.add(String.valueOf("false")); - } - backendInfo.add(tabletNum.toString()); - - backendInfos.add(backendInfo); - } - - // sort by id, ip hostName - ListComparator> comparator = new ListComparator>(0, 1, 2); - Collections.sort(backendInfos, comparator); - - for (List backendInfo : backendInfos) { - List oneInfo = new ArrayList(backendInfo.size()); - for (Comparable element : backendInfo) { - oneInfo.add(element.toString()); - } - result.addRow(oneInfo); - } - return result; - } - + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(clusterInfoService); + + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + + final List> backendInfos = getBackendInfos(); + for (List backendInfo : backendInfos) { + List oneInfo = new ArrayList(backendInfo.size()); + for (String info : backendInfo) { + oneInfo.add(info); + } + result.addRow(oneInfo); + } + return result; + } + + /** + * get all backends of system + * @return + */ + public static List> getBackendInfos() { + return getClusterBackendInfos(null); + } + + /** + * get backends of cluster + * @param clusterName + * @return + */ + public static List> getClusterBackendInfos(String clusterName) { + final SystemInfoService clusterInfoService = Catalog.getCurrentSystemInfo(); + List> backendInfos = new LinkedList>(); + List backendIds = null; + if (!Strings.isNullOrEmpty(clusterName)) { + final Cluster cluster = Catalog.getInstance().getCluster(clusterName); + // root not in any cluster + if (null == cluster) { + return backendInfos; + } + backendIds = cluster.getBackendIdList(); + } else { + backendIds = clusterInfoService.getBackendIds(false); + if (backendIds == null) { + return backendInfos; + } + } + List> comparableBackendInfos = new LinkedList>(); + for (long backendId : backendIds) { + Backend backend = clusterInfoService.getBackend(backendId); + if (backend == null) { + continue; + } + + String ip = "N/A"; + String hostName = "N/A"; + try { + InetAddress address = InetAddress.getByName(backend.getHost()); + hostName = address.getHostName(); + } catch (UnknownHostException e) { + continue; + } + + Integer tabletNum = Catalog.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + List backendInfo = Lists.newArrayList(); + backendInfo.add(backend.getOwnerClusterName()); + backendInfo.add(String.valueOf(backendId)); + backendInfo.add(backend.getHost()); + if (Strings.isNullOrEmpty(clusterName)) { + backendInfo.add(hostName); + backendInfo.add(String.valueOf(backend.getHeartbeatPort())); + backendInfo.add(String.valueOf(backend.getBePort())); + backendInfo.add(String.valueOf(backend.getHttpPort())); + } + backendInfo.add(TimeUtils.longToTimeString(backend.getLastStartTime())); + backendInfo.add(TimeUtils.longToTimeString(backend.getLastUpdateMs())); + backendInfo.add(String.valueOf(backend.isAlive())); + if (backend.isDecommissioned() && backend.getDecommissionType() == DecomissionType.ClusterDecomission) { + backendInfo.add(String.valueOf("false")); + backendInfo.add(String.valueOf("true")); + } else if (backend.isDecommissioned() + && backend.getDecommissionType() == DecomissionType.SystemDecomission) { + backendInfo.add(String.valueOf("true")); + backendInfo.add(String.valueOf("false")); + } else { + backendInfo.add(String.valueOf("false")); + backendInfo.add(String.valueOf("false")); + } + backendInfo.add(tabletNum.toString()); + comparableBackendInfos.add(backendInfo); + } + + // sort by id, ip hostName + ListComparator> comparator = new ListComparator>(0, 1, 2); + Collections.sort(comparableBackendInfos, comparator); + + for (List backendInfo : comparableBackendInfos) { + List oneInfo = new ArrayList(backendInfo.size()); + for (Comparable element : backendInfo) { + oneInfo.add(element.toString()); + } + backendInfos.add(oneInfo); + } + + return backendInfos; + } + @Override public boolean register(String name, ProcNodeInterface node) { return false; diff --git a/fe/src/com/baidu/palo/journal/JournalEntity.java b/fe/src/com/baidu/palo/journal/JournalEntity.java index 4ab475f99a8131..b93579f4c66c7a 100644 --- a/fe/src/com/baidu/palo/journal/JournalEntity.java +++ b/fe/src/com/baidu/palo/journal/JournalEntity.java @@ -48,6 +48,7 @@ import com.baidu.palo.persist.RecoverInfo; import com.baidu.palo.persist.ReplicaPersistInfo; import com.baidu.palo.persist.TableInfo; +import com.baidu.palo.persist.UpdateClusterAndBackends; import com.baidu.palo.qe.SessionVariable; import com.baidu.palo.system.Backend; import com.baidu.palo.system.Frontend; @@ -302,6 +303,10 @@ public void readFields(DataInput in) throws IOException { data = new Text(); break; } + case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: { + data = new UpdateClusterAndBackends(); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/src/com/baidu/palo/persist/EditLog.java b/fe/src/com/baidu/palo/persist/EditLog.java index 5e1bc7bc9c0809..3944211b28e33c 100644 --- a/fe/src/com/baidu/palo/persist/EditLog.java +++ b/fe/src/com/baidu/palo/persist/EditLog.java @@ -516,6 +516,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.getLoadInstance().setLoadErrorHubInfo(param); break; } + case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: { + final UpdateClusterAndBackends info = (UpdateClusterAndBackends) journal.getData(); + catalog.replayUpdateClusterAndBackends(info); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -888,4 +893,9 @@ public void logExportUpdateState(long jobId, ExportJob.JobState newState) { ExportJob.StateTransfer transfer = new ExportJob.StateTransfer(jobId, newState); logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer); } + + public void logUpdateClusterAndBackendState(UpdateClusterAndBackends info) { + logEdit(OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS, info); + } + } diff --git a/fe/src/com/baidu/palo/persist/OperationType.java b/fe/src/com/baidu/palo/persist/OperationType.java index d87b323ec533b3..5bbb7e0f32f60c 100644 --- a/fe/src/com/baidu/palo/persist/OperationType.java +++ b/fe/src/com/baidu/palo/persist/OperationType.java @@ -109,4 +109,6 @@ public class OperationType { public static final short OP_ADD_BROKER = 85; public static final short OP_DROP_BROKER = 86; public static final short OP_DROP_ALL_BROKER = 87; + + public static final short OP_UPDATE_CLUSTER_AND_BACKENDS = 88; } diff --git a/fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java b/fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java new file mode 100644 index 00000000000000..127e53ba72382a --- /dev/null +++ b/fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java @@ -0,0 +1,61 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.persist; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import com.baidu.palo.common.io.Writable; +import com.google.common.collect.Lists; + +public class UpdateClusterAndBackends implements Writable { + private List backendIds; + + public UpdateClusterAndBackends() { + this.backendIds = Lists.newArrayList(); + } + + public UpdateClusterAndBackends(List backends) { + this.backendIds = backends; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(backendIds.size()); + for (Long id : backendIds) { + out.writeLong(id); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int backendCount = in.readInt(); + while (backendCount-- > 0) { + backendIds.add(in.readLong()); + } + } + + public List getBackendList() { + return backendIds; + } + + public void setBackendList(List backendList) { + this.backendIds = backendList; + } + +} diff --git a/fe/src/com/baidu/palo/qe/ShowExecutor.java b/fe/src/com/baidu/palo/qe/ShowExecutor.java index c5bdf9cb5ef92b..7b822456861d0f 100644 --- a/fe/src/com/baidu/palo/qe/ShowExecutor.java +++ b/fe/src/com/baidu/palo/qe/ShowExecutor.java @@ -19,6 +19,7 @@ import com.baidu.palo.analysis.HelpStmt; import com.baidu.palo.analysis.ShowAlterStmt; import com.baidu.palo.analysis.ShowAuthorStmt; +import com.baidu.palo.analysis.ShowBackendsStmt; import com.baidu.palo.analysis.ShowBackupStmt; import com.baidu.palo.analysis.ShowBrokerStmt; import com.baidu.palo.analysis.ShowClusterStmt; @@ -86,6 +87,7 @@ import java.util.ArrayList; import java.util.Iterator; + import java.util.List; import java.util.Map; import java.util.Set; @@ -166,6 +168,8 @@ public ShowResultSet execute() throws AnalysisException { handleShowBroker(); } else if (stmt instanceof ShowExportStmt) { handleShowExport(); + } else if (stmt instanceof ShowBackendsStmt) { + handleShowBackends(); } else { handleEmtpy(); } @@ -893,4 +897,11 @@ private void handleShowExport() throws AnalysisException { resultSet = new ShowResultSet(showExportStmt.getMetaData(), rows); } + + private void handleShowBackends() { + final ShowBackendsStmt showStmt = (ShowBackendsStmt) stmt; + final List> backendInfos = BackendsProcDir.getClusterBackendInfos(showStmt.getClusterName()); + resultSet = new ShowResultSet(showStmt.getMetaData(), backendInfos); + } + } diff --git a/fe/src/com/baidu/palo/system/Backend.java b/fe/src/com/baidu/palo/system/Backend.java index f2b06def2c4aae..65b1d1134b0ed4 100644 --- a/fe/src/com/baidu/palo/system/Backend.java +++ b/fe/src/com/baidu/palo/system/Backend.java @@ -68,7 +68,7 @@ public enum BackendState { private AtomicBoolean isDecommissioned; private AtomicInteger decommissionType; - private String ownerClusterName; + private AtomicReference ownerClusterName; // to index the state in some cluster private AtomicInteger backendState; // private BackendState backendState; @@ -88,7 +88,7 @@ public Backend() { this.beRpcPort = new AtomicInteger(); this.disksRef = new AtomicReference>(ImmutableMap. of()); - this.ownerClusterName = ""; + this.ownerClusterName = new AtomicReference(""); this.backendState = new AtomicInteger(BackendState.free.ordinal()); this.decommissionType = new AtomicInteger(DecomissionType.SystemDecomission.ordinal()); @@ -108,7 +108,7 @@ public Backend(long id, String host, int heartbeatPort) { this.isAlive = new AtomicBoolean(false); this.isDecommissioned = new AtomicBoolean(false); - this.ownerClusterName = ""; + this.ownerClusterName = new AtomicReference(""); this.backendState = new AtomicInteger(BackendState.free.ordinal()); this.decommissionType = new AtomicInteger(DecomissionType.SystemDecomission.ordinal()); } @@ -372,7 +372,7 @@ public void write(DataOutput out) throws IOException { entry.getValue().write(out); } - Text.writeString(out, ownerClusterName); + Text.writeString(out, ownerClusterName.get()); out.writeInt(backendState.get()); out.writeInt(decommissionType.get()); @@ -409,16 +409,15 @@ public void readFields(DataInput in) throws IOException { disksRef.set(ImmutableMap.copyOf(disks)); } - - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_30) { - ownerClusterName = Text.readString(in); - backendState.set(in.readInt()); - decommissionType.set(in.readInt()); - } else { - ownerClusterName = SystemInfoService.DEFAULT_CLUSTER; - backendState.set(BackendState.using.ordinal()); - decommissionType.set(DecomissionType.SystemDecomission.ordinal()); - } + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_30) { + ownerClusterName.set(Text.readString(in)); + backendState.set(in.readInt()); + decommissionType.set(in.readInt()); + } else { + ownerClusterName.set(SystemInfoService.DEFAULT_CLUSTER); + backendState.set(BackendState.using.ordinal()); + decommissionType.set(DecomissionType.SystemDecomission.ordinal()); + } } @Override @@ -442,14 +441,18 @@ public String toString() { + "]"; } - public String getOwnerClusterName() { - return ownerClusterName; - } - - public void setOwnerClusterName(String ownerClusterName) { - this.ownerClusterName = ownerClusterName; - } - + public String getOwnerClusterName() { + return ownerClusterName.get(); + } + + public void setOwnerClusterName(String name) { + ownerClusterName.set(name); + } + + public void clearClusterName() { + ownerClusterName.set(""); + } + public BackendState getBackendState() { switch (backendState.get()) { case 0: @@ -473,4 +476,5 @@ public DecomissionType getDecommissionType() { } return DecomissionType.SystemDecomission; } + } diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index a19849fb503f7d..583dfc0345c52c 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -41,6 +41,7 @@ import com.baidu.palo.catalog.Catalog; import com.baidu.palo.catalog.Database; +import com.baidu.palo.cluster.Cluster; import com.baidu.palo.common.AnalysisException; import com.baidu.palo.common.ClientPool; import com.baidu.palo.common.DdlException; @@ -239,7 +240,12 @@ private void dropBackend(String host, int heartbeatPort) throws DdlException { copiedHeartbeatHandlersMap.remove(droppedBackend.getId()); ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); - + + // update cluster + final Cluster cluster = Catalog.getInstance().getCluster(droppedBackend.getOwnerClusterName()); + if (null != cluster) { + cluster.removeBackend(droppedBackend.getId()); + } // log Catalog.getInstance().getEditLog().logDropBackend(droppedBackend); LOG.info("drop {}", droppedBackend); @@ -365,19 +371,6 @@ public List createCluster(String clusterName, int num) { LOG.warn("an excessive number of backends, require :" + num + " get:" + ret.size()); return null; } - - // set be state and owner - final Iterator iterator = ret.iterator(); - while (iterator.hasNext()) { - final Long id = iterator.next(); - final Backend backend = copiedBackends.get(id); - backend.setOwnerClusterName(clusterName); - backend.setBackendState(BackendState.using); - copiedBackends.put(backend.getId(), backend); - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - } - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); lastBackendIdForCreationMap.put(clusterName, (long) -1); lastBackendIdForOtherMap.put(clusterName, (long) -1); @@ -403,7 +396,7 @@ public void releaseBackends(String clusterName, boolean log) { } else { final Backend backend = copiedBackends.get(id); backend.setBackendState(BackendState.free); - backend.setOwnerClusterName(""); + backend.clearClusterName(); if (log) { Catalog.getInstance().getEditLog().logBackendStateChange(backend); } @@ -1022,6 +1015,11 @@ public void replayDropBackend(Backend backend) { copiedHeartbeatHandlersMap.remove(backend.getId()); ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); + // update cluster + final Cluster cluster = Catalog.getInstance().getCluster(backend.getOwnerClusterName()); + if (null != cluster) { + cluster.removeBackend(backend.getId()); + } } public void updateBackendState(Backend be) { diff --git a/gensrc/parser/sql_parser.y b/gensrc/parser/sql_parser.y index c0812ac0350d07..ab022423404ad2 100644 --- a/gensrc/parser/sql_parser.y +++ b/gensrc/parser/sql_parser.y @@ -1,5 +1,5 @@ -// Modifications copyright (C) 2017, Baidu.com, Inc. // Copyright 2017 The Apache Software Foundation +// Modifications copyright (C) 2017, Baidu.com, Inc. // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file @@ -194,7 +194,7 @@ parser code {: // Total keywords of palo terminal String KW_ADD, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, - KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BY, + KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_SYSTEM, KW_CLUSTER, KW_CLUSTERS, KW_LINK, KW_MIGRATE, KW_MIGRATIONS, KW_ENTER, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CURRENT, KW_CURRENT_USER, @@ -1655,6 +1655,10 @@ show_param ::= {: RESULT = new ShowBrokerStmt(); :} + | KW_BACKENDS + {: + RESULT = new ShowBackendsStmt(); + :} ; keys_or_index ::= @@ -1827,7 +1831,7 @@ cancel_param ::= {: RESULT = new CancelAlterTableStmt(type, table); :} - | KW_ALTER KW_SYSTEM KW_DECOMMISSION KW_BACKEND string_list:hostPorts + | KW_DECOMMISSION KW_BACKEND string_list:hostPorts {: RESULT = new CancelAlterSystemStmt(hostPorts); :} @@ -3410,6 +3414,8 @@ keyword ::= {: RESULT = id; :} | KW_BROKER:id {: RESULT = id; :} + | KW_BACKENDS:id + {: RESULT = id; :} | KW_CHAIN:id {: RESULT = id; :} | KW_CHARSET:id From 9ded63a484128619cdf606fbc21128b1de4eb9fc Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Sat, 26 Aug 2017 08:51:40 +0800 Subject: [PATCH 6/8] returns null when the param does not match --- fe/src/com/baidu/palo/cluster/ClusterNamespace.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/src/com/baidu/palo/cluster/ClusterNamespace.java b/fe/src/com/baidu/palo/cluster/ClusterNamespace.java index 15569ed6ebecf1..2f87e77a9cf5ba 100644 --- a/fe/src/com/baidu/palo/cluster/ClusterNamespace.java +++ b/fe/src/com/baidu/palo/cluster/ClusterNamespace.java @@ -73,7 +73,7 @@ private static String linkString(String str1, String str2) { */ public static String getDbNameFromFullName(String db) { if (!checkName(db)) { - return null; + return db; } return extract(db, 1); } @@ -86,7 +86,7 @@ public static String getDbNameFromFullName(String db) { */ public static String getUsrNameFromFullName(String usr) { if (!checkName(usr)) { - return null; + return usr; } return extract(usr, 1); } From cfdd8f88f17a9f04d12dd5901ee18b45eef5559a Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Sat, 26 Aug 2017 20:00:36 +0800 Subject: [PATCH 7/8] update UpdateClusterAndBackends to BackendIdsUpdateInfo --- fe/src/com/baidu/palo/alter/DecommissionBackendJob.java | 4 ++-- fe/src/com/baidu/palo/catalog/Catalog.java | 4 ++-- fe/src/com/baidu/palo/journal/JournalEntity.java | 4 ++-- ...ateClusterAndBackends.java => BackendIdsUpdateInfo.java} | 6 +++--- fe/src/com/baidu/palo/persist/EditLog.java | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) rename fe/src/com/baidu/palo/persist/{UpdateClusterAndBackends.java => BackendIdsUpdateInfo.java} (90%) diff --git a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java index f7cf2e0af753ee..e2609acf5ebc61 100644 --- a/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java +++ b/fe/src/com/baidu/palo/alter/DecommissionBackendJob.java @@ -43,7 +43,7 @@ import com.baidu.palo.clone.Clone; import com.baidu.palo.clone.CloneJob.JobPriority; import com.baidu.palo.cluster.Cluster; -import com.baidu.palo.persist.UpdateClusterAndBackends; +import com.baidu.palo.persist.BackendIdsUpdateInfo; import com.baidu.palo.common.Config; import com.baidu.palo.common.DdlException; import com.baidu.palo.common.FeMetaVersion; @@ -489,7 +489,7 @@ public synchronized int tryFinishJob() { backendList.add(id); cluster.removeBackend(id); } - UpdateClusterAndBackends updateInfo = new UpdateClusterAndBackends(backendList); + BackendIdsUpdateInfo updateInfo = new BackendIdsUpdateInfo(backendList); Catalog.getInstance().getEditLog().logUpdateClusterAndBackendState(updateInfo); } } diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 66a10c98c27d2f..ec82e0e0a2311b 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -130,7 +130,7 @@ import com.baidu.palo.persist.Storage; import com.baidu.palo.persist.StorageInfo; import com.baidu.palo.persist.TableInfo; -import com.baidu.palo.persist.UpdateClusterAndBackends; +import com.baidu.palo.persist.BackendIdsUpdateInfo; import com.baidu.palo.qe.ConnectContext; import com.baidu.palo.qe.JournalObservable; import com.baidu.palo.qe.SessionVariable; @@ -5080,7 +5080,7 @@ public long loadBrokers(DataInputStream dis, long checksum) throws IOException, return checksum; } - public void replayUpdateClusterAndBackends(UpdateClusterAndBackends info) { + public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) { for (long id : info.getBackendList()) { final Backend backend = systemInfo.getBackend(id); writeLock(); diff --git a/fe/src/com/baidu/palo/journal/JournalEntity.java b/fe/src/com/baidu/palo/journal/JournalEntity.java index b93579f4c66c7a..fa0d01bbbe3820 100644 --- a/fe/src/com/baidu/palo/journal/JournalEntity.java +++ b/fe/src/com/baidu/palo/journal/JournalEntity.java @@ -48,7 +48,7 @@ import com.baidu.palo.persist.RecoverInfo; import com.baidu.palo.persist.ReplicaPersistInfo; import com.baidu.palo.persist.TableInfo; -import com.baidu.palo.persist.UpdateClusterAndBackends; +import com.baidu.palo.persist.BackendIdsUpdateInfo; import com.baidu.palo.qe.SessionVariable; import com.baidu.palo.system.Backend; import com.baidu.palo.system.Frontend; @@ -304,7 +304,7 @@ public void readFields(DataInput in) throws IOException { break; } case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: { - data = new UpdateClusterAndBackends(); + data = new BackendIdsUpdateInfo(); break; } default: { diff --git a/fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java b/fe/src/com/baidu/palo/persist/BackendIdsUpdateInfo.java similarity index 90% rename from fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java rename to fe/src/com/baidu/palo/persist/BackendIdsUpdateInfo.java index 127e53ba72382a..f835be7a1d06bc 100644 --- a/fe/src/com/baidu/palo/persist/UpdateClusterAndBackends.java +++ b/fe/src/com/baidu/palo/persist/BackendIdsUpdateInfo.java @@ -23,14 +23,14 @@ import com.baidu.palo.common.io.Writable; import com.google.common.collect.Lists; -public class UpdateClusterAndBackends implements Writable { +public class BackendIdsUpdateInfo implements Writable { private List backendIds; - public UpdateClusterAndBackends() { + public BackendIdsUpdateInfo() { this.backendIds = Lists.newArrayList(); } - public UpdateClusterAndBackends(List backends) { + public BackendIdsUpdateInfo(List backends) { this.backendIds = backends; } diff --git a/fe/src/com/baidu/palo/persist/EditLog.java b/fe/src/com/baidu/palo/persist/EditLog.java index 3944211b28e33c..dd0b332fff5251 100644 --- a/fe/src/com/baidu/palo/persist/EditLog.java +++ b/fe/src/com/baidu/palo/persist/EditLog.java @@ -517,7 +517,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { break; } case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: { - final UpdateClusterAndBackends info = (UpdateClusterAndBackends) journal.getData(); + final BackendIdsUpdateInfo info = (UpdateIdsUpdateInfo) journal.getData(); catalog.replayUpdateClusterAndBackends(info); break; } @@ -894,7 +894,7 @@ public void logExportUpdateState(long jobId, ExportJob.JobState newState) { logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer); } - public void logUpdateClusterAndBackendState(UpdateClusterAndBackends info) { + public void logUpdateClusterAndBackendState(BackendIdsUpdateInfo info) { logEdit(OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS, info); } From 69aec04546a80386a970351c49bc1b1ac75952dc Mon Sep 17 00:00:00 2001 From: chenhao16 Date: Sat, 26 Aug 2017 21:23:06 +0800 Subject: [PATCH 8/8] correct Licenses --- .../com/baidu/palo/analysis/ShowBackendsStmt.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java b/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java index fec1578511c160..e2eb9559d48f0e 100644 --- a/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java +++ b/fe/src/com/baidu/palo/analysis/ShowBackendsStmt.java @@ -1,13 +1,8 @@ -// Copyright 2017 The Apache Software Foundation -// Modifications copyright (C) 2017, Baidu.com, Inc. +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 //