Skip to content

Commit

Permalink
HDDS-238. Add Node2Pipeline Map in SCM to track ratis/standalone pipe…
Browse files Browse the repository at this point in the history
…lines. Contributed by Mukul Kumar Singh.
  • Loading branch information
xiaoyuyao committed Jul 13, 2018
1 parent f89e265 commit 3f3f722
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 23 deletions.
Expand Up @@ -456,4 +456,15 @@ public ContainerInfo build() {
replicationFactor, replicationType); replicationFactor, replicationType);
} }
} }

/**
* Check if a container is in open state, this will check if the
* container is either open or allocated or creating. Any containers in
* these states is managed as an open container by SCM.
*/
public boolean isContainerOpen() {
return state == HddsProtos.LifeCycleState.ALLOCATED ||
state == HddsProtos.LifeCycleState.CREATING ||
state == HddsProtos.LifeCycleState.OPEN;
}
} }
Expand Up @@ -477,7 +477,7 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos = reports.getReportsList(); containerInfos = reports.getReportsList();


for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
containerInfos) { containerInfos) {
byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
lock.lock(); lock.lock();
Expand All @@ -498,7 +498,9 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
containerStore.put(dbKey, newState.toByteArray()); containerStore.put(dbKey, newState.toByteArray());


// If the container is closed, then state is already written to SCM // If the container is closed, then state is already written to SCM
Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType()); Pipeline pipeline =
pipelineSelector.getPipeline(newState.getPipelineName(),
newState.getReplicationType());
if(pipeline == null) { if(pipeline == null) {
pipeline = pipelineSelector pipeline = pipelineSelector
.getReplicationPipeline(newState.getReplicationType(), .getReplicationPipeline(newState.getReplicationType(),
Expand Down Expand Up @@ -713,4 +715,9 @@ public void flushContainerInfo() throws IOException {
public MetadataStore getContainerStore() { public MetadataStore getContainerStore() {
return containerStore; return containerStore;
} }

@VisibleForTesting
public PipelineSelector getPipelineSelector() {
return pipelineSelector;
}
} }
Expand Up @@ -17,6 +17,7 @@


package org.apache.hadoop.hdds.scm.container; package org.apache.hadoop.hdds.scm.container;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -522,4 +523,9 @@ public boolean removeContainerReplica(ContainerID containerID,
DatanodeDetails dn) throws SCMException { DatanodeDetails dn) throws SCMException {
return containers.removeContainerReplica(containerID, dn); return containers.removeContainerReplica(containerID, dn);
} }

@VisibleForTesting
public ContainerStateMap getContainerStateMap() {
return containers;
}
} }
Expand Up @@ -51,7 +51,7 @@
* Container State Map acts like a unified map for various attributes that are * Container State Map acts like a unified map for various attributes that are
* used to select containers when we need allocated blocks. * used to select containers when we need allocated blocks.
* <p> * <p>
* This class provides the ability to query 4 classes of attributes. They are * This class provides the ability to query 5 classes of attributes. They are
* <p> * <p>
* 1. LifeCycleStates - LifeCycle States of container describe in which state * 1. LifeCycleStates - LifeCycle States of container describe in which state
* a container is. For example, a container needs to be in Open State for a * a container is. For example, a container needs to be in Open State for a
Expand All @@ -72,6 +72,9 @@
* Replica and THREE Replica. User can specify how many copies should be made * Replica and THREE Replica. User can specify how many copies should be made
* for a ozone key. * for a ozone key.
* <p> * <p>
* 5.Pipeline - The pipeline constitute the set of Datanodes on which the
* open container resides physically.
* <p>
* The most common access pattern of this class is to select a container based * The most common access pattern of this class is to select a container based
* on all these parameters, for example, when allocating a block we will * on all these parameters, for example, when allocating a block we will
* select a container that belongs to user1, with Ratis replication which can * select a container that belongs to user1, with Ratis replication which can
Expand All @@ -86,6 +89,14 @@ public class ContainerStateMap {
private final ContainerAttribute<String> ownerMap; private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationFactor> factorMap; private final ContainerAttribute<ReplicationFactor> factorMap;
private final ContainerAttribute<ReplicationType> typeMap; private final ContainerAttribute<ReplicationType> typeMap;
// This map constitutes the pipeline to open container mappings.
// This map will be queried for the list of open containers on a particular
// pipeline and issue a close on corresponding containers in case of
// following events:
//1. Dead datanode.
//2. Datanode out of space.
//3. Volume loss or volume out of space.
private final ContainerAttribute<String> openPipelineMap;


private final Map<ContainerID, ContainerInfo> containerMap; private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container. // Map to hold replicas of given container.
Expand All @@ -106,6 +117,7 @@ public ContainerStateMap() {
ownerMap = new ContainerAttribute<>(); ownerMap = new ContainerAttribute<>();
factorMap = new ContainerAttribute<>(); factorMap = new ContainerAttribute<>();
typeMap = new ContainerAttribute<>(); typeMap = new ContainerAttribute<>();
openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>(); containerMap = new HashMap<>();
autoLock = new AutoCloseableLock(); autoLock = new AutoCloseableLock();
contReplicaMap = new HashMap<>(); contReplicaMap = new HashMap<>();
Expand Down Expand Up @@ -140,6 +152,9 @@ public void addContainer(ContainerInfo info)
ownerMap.insert(info.getOwner(), id); ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getReplicationFactor(), id); factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id); typeMap.insert(info.getReplicationType(), id);
if (info.isContainerOpen()) {
openPipelineMap.insert(info.getPipelineName(), id);
}
LOG.trace("Created container with {} successfully.", id); LOG.trace("Created container with {} successfully.", id);
} }
} }
Expand Down Expand Up @@ -329,6 +344,11 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
throw new SCMException("Updating the container map failed.", ex, throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE); FAILED_TO_CHANGE_CONTAINER_STATE);
} }
// In case the container is set to closed state, it needs to be removed from
// the pipeline Map.
if (newState == LifeCycleState.CLOSED) {
openPipelineMap.remove(info.getPipelineName(), id);
}
} }


/** /**
Expand Down Expand Up @@ -359,6 +379,20 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
} }
} }


/**
* Returns Open containers in the SCM by the Pipeline
*
* @param pipeline - Pipeline name.
* @return NavigableSet<ContainerID>
*/
public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String pipeline) {
Preconditions.checkNotNull(pipeline);

try (AutoCloseableLock lock = autoLock.acquire()) {
return openPipelineMap.getCollection(pipeline);
}
}

/** /**
* Returns Containers by replication factor. * Returns Containers by replication factor.
* *
Expand Down
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.apache.hadoop.hdds.scm.pipelines;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;

import java.util.Set;
import java.util.UUID;
import java.util.Map;
import java.util.HashSet;
import java.util.Collections;

import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.DUPLICATE_DATANODE;


/**
* This data structure maintains the list of pipelines which the given datanode
* is a part of.
* This information will be added whenever a new pipeline allocation happens.
*
* TODO: this information needs to be regenerated from pipeline reports on
* SCM restart
*/
public class Node2PipelineMap {
private final Map<UUID, Set<Pipeline>> dn2PipelineMap;

/**
* Constructs a Node2PipelineMap Object.
*/
public Node2PipelineMap() {
dn2PipelineMap = new ConcurrentHashMap<>();
}

/**
* Returns true if this a datanode that is already tracked by
* Node2PipelineMap.
*
* @param datanodeID - UUID of the Datanode.
* @return True if this is tracked, false if this map does not know about it.
*/
private boolean isKnownDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
return dn2PipelineMap.containsKey(datanodeID);
}

/**
* Insert a new datanode into Node2Pipeline Map.
*
* @param datanodeID -- Datanode UUID
* @param pipelines - set of pipelines.
*/
private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines)
throws SCMException {
Preconditions.checkNotNull(pipelines);
Preconditions.checkNotNull(datanodeID);
if(dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
throw new SCMException("Node already exists in the map",
DUPLICATE_DATANODE);
}
}

/**
* Removes datanode Entry from the map.
* @param datanodeID - Datanode ID.
*/
public synchronized void removeDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
}

/**
* Returns null if there no pipelines associated with this datanode ID.
*
* @param datanode - UUID
* @return Set of pipelines or Null.
*/
public Set<Pipeline> getPipelines(UUID datanode) {
Preconditions.checkNotNull(datanode);
return dn2PipelineMap.computeIfPresent(datanode, (k, v) ->
Collections.unmodifiableSet(v));
}

/**
* Adds a pipeline entry to a given dataNode in the map.
* @param pipeline Pipeline to be added
*/
public synchronized void addPipeline(Pipeline pipeline) throws SCMException {
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
UUID dnId = details.getUuid();
dn2PipelineMap
.computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>()))
.add(pipeline);
}
}

public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
return Collections.unmodifiableMap(dn2PipelineMap);
}
}
Expand Up @@ -40,11 +40,13 @@ public abstract class PipelineManager {
private final List<Pipeline> activePipelines; private final List<Pipeline> activePipelines;
private final Map<String, Pipeline> activePipelineMap; private final Map<String, Pipeline> activePipelineMap;
private final AtomicInteger pipelineIndex; private final AtomicInteger pipelineIndex;
private final Node2PipelineMap node2PipelineMap;


public PipelineManager() { public PipelineManager(Node2PipelineMap map) {
activePipelines = new LinkedList<>(); activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0); pipelineIndex = new AtomicInteger(0);
activePipelineMap = new WeakHashMap<>(); activePipelineMap = new WeakHashMap<>();
node2PipelineMap = map;
} }


/** /**
Expand All @@ -66,24 +68,23 @@ public synchronized final Pipeline getPipeline(
* *
* 2. This allows all nodes to part of a pipeline quickly. * 2. This allows all nodes to part of a pipeline quickly.
* *
* 3. if there are not enough free nodes, return conduits in a * 3. if there are not enough free nodes, return pipeline in a
* round-robin fashion. * round-robin fashion.
* *
* TODO: Might have to come up with a better algorithm than this. * TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns conduits in round robin * Create a new placement policy that returns pipelines in round robin
* fashion. * fashion.
*/ */
Pipeline pipeline = Pipeline pipeline = allocatePipeline(replicationFactor);
allocatePipeline(replicationFactor);
if (pipeline != null) { if (pipeline != null) {
LOG.debug("created new pipeline:{} for container with " + LOG.debug("created new pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}", "replicationType:{} replicationFactor:{}",
pipeline.getPipelineName(), replicationType, replicationFactor); pipeline.getPipelineName(), replicationType, replicationFactor);
activePipelines.add(pipeline); activePipelines.add(pipeline);
activePipelineMap.put(pipeline.getPipelineName(), pipeline); activePipelineMap.put(pipeline.getPipelineName(), pipeline);
node2PipelineMap.addPipeline(pipeline);
} else { } else {
pipeline = pipeline = findOpenPipeline(replicationType, replicationFactor);
findOpenPipeline(replicationType, replicationFactor);
if (pipeline != null) { if (pipeline != null) {
LOG.debug("re-used pipeline:{} for container with " + LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}", "replicationType:{} replicationFactor:{}",
Expand Down Expand Up @@ -133,6 +134,11 @@ protected int getReplicationCount(ReplicationFactor factor) {
public abstract Pipeline allocatePipeline( public abstract Pipeline allocatePipeline(
ReplicationFactor replicationFactor) throws IOException; ReplicationFactor replicationFactor) throws IOException;


public void removePipeline(Pipeline pipeline) {
activePipelines.remove(pipeline);
activePipelineMap.remove(pipeline.getPipelineName());
}

/** /**
* Find a Pipeline that is operational. * Find a Pipeline that is operational.
* *
Expand All @@ -143,7 +149,7 @@ private Pipeline findOpenPipeline(
Pipeline pipeline = null; Pipeline pipeline = null;
final int sentinal = -1; final int sentinal = -1;
if (activePipelines.size() == 0) { if (activePipelines.size() == 0) {
LOG.error("No Operational conduits found. Returning null."); LOG.error("No Operational pipelines found. Returning null.");
return null; return null;
} }
int startIndex = getNextIndex(); int startIndex = getNextIndex();
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy; .ContainerPlacementPolicy;
Expand All @@ -41,6 +40,8 @@
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;


/** /**
Expand All @@ -55,7 +56,7 @@ public class PipelineSelector {
private final RatisManagerImpl ratisManager; private final RatisManagerImpl ratisManager;
private final StandaloneManagerImpl standaloneManager; private final StandaloneManagerImpl standaloneManager;
private final long containerSize; private final long containerSize;

private final Node2PipelineMap node2PipelineMap;
/** /**
* Constructs a pipeline Selector. * Constructs a pipeline Selector.
* *
Expand All @@ -69,12 +70,13 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
this.containerSize = OzoneConsts.GB * this.conf.getInt( this.containerSize = OzoneConsts.GB * this.conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
node2PipelineMap = new Node2PipelineMap();
this.standaloneManager = this.standaloneManager =
new StandaloneManagerImpl(this.nodeManager, placementPolicy, new StandaloneManagerImpl(this.nodeManager, placementPolicy,
containerSize); containerSize, node2PipelineMap);
this.ratisManager = this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
conf); conf, node2PipelineMap);
} }


/** /**
Expand Down Expand Up @@ -243,4 +245,18 @@ public void updateDatanodes(ReplicationType replicationType, String
.collect(Collectors.joining(","))); .collect(Collectors.joining(",")));
manager.updatePipeline(pipelineID, newDatanodes); manager.updatePipeline(pipelineID, newDatanodes);
} }

public Node2PipelineMap getNode2PipelineMap() {
return node2PipelineMap;
}

public void removePipeline(UUID dnId) {
Set<Pipeline> pipelineChannelSet =
node2PipelineMap.getPipelines(dnId);
for (Pipeline pipelineChannel : pipelineChannelSet) {
getPipelineManager(pipelineChannel.getType())
.removePipeline(pipelineChannel);
}
node2PipelineMap.removeDatanode(dnId);
}
} }

0 comments on commit 3f3f722

Please sign in to comment.