Skip to content

HDDS-6572. EC: ReplicationManager - add move manager for container move#3963

Closed
JacksonYao287 wants to merge 3 commits intoapache:masterfrom
JacksonYao287:HDDS-6572
Closed

HDDS-6572. EC: ReplicationManager - add move manager for container move#3963
JacksonYao287 wants to merge 3 commits intoapache:masterfrom
JacksonYao287:HDDS-6572

Conversation

@JacksonYao287
Copy link
Contributor

What changes were proposed in this pull request?

add move manager for container move

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-6572

How was this patch tested?

unit test

@JacksonYao287 JacksonYao287 force-pushed the HDDS-6572 branch 2 times, most recently from e6d15cd to a87de27 Compare November 15, 2022 15:37
@sodonnel
Copy link
Contributor

On interesting observation I have with this is related to throttling.

One goal of the new RM is to hold back work in SCM and avoid overloading the datanodes. We are going to do that by tracking the number of queued commands and the number pending on the DN on each heartbeat and avoid scheduling too many commands for each datanode.

The balancer is going to schedule replicate and delete commands too, which will use up some (or all) of the replicationManagers limit. That is not a problem itself, although it could mean that running the balancer + replication manager together will tend to slow down replication.

What limits to we have on the balancer to stop too many commands getting scheduled on an under-used datanode? Eg if there are only a few "empty" nodes in a large cluster, then what is the max number of replication commands they can received in what time period? I'd like to make sure that there are good limits in the balancer to prevent it queuing too much work on the datanodes.

If I remember correctly, a significant complexity in the balancer is that it replicates its pending moves across the SCMs, and part of the reason for this is that a deep queue on the DN could result in stale balance commands continuing to run on the DNs, and the new leader SCM is not aware of them. If the queue on the DNs is kept under control, then perhaps that replication is not needed.

I also have a bit of a concern about the balancer needing to do some over replication checks around the container it intends to delete. I understand that is needed, but I wonder if we could re-use the replication manager over-replicated-processor to do this instead. Eg, MoveManager calls into ReplicationManager.processOverReplicated(...) and we also pass in the replica we would like removed - it will check if it is possible or not and then schedule the delete command.

I also had a quick scan of the balancer config - is it not topology aware by default?

  @Config(key = "move.networkTopology.enable", type = ConfigType.BOOLEAN,
      defaultValue = "false", tags = {ConfigTag.BALANCER},
      description = "whether to take network topology into account when " +
          "selecting a target for a source. " +
          "This configuration is false by default.")
  private boolean networkTopologyEnable = false;

Why is this? Surely we will have a lot of "failed moves" that don't pass the placement check, certainly for EC when the number of replicas is larger.

@JacksonYao287 JacksonYao287 marked this pull request as draft November 17, 2022 03:09
@JacksonYao287
Copy link
Contributor Author

JacksonYao287 commented Nov 17, 2022

@sodonnel thanks for the comments.
IMHO, container balancer, as a tool of optimizing the data distribution of the whole cluster, should be run when the cluster is not busy with workload. although we can run it anytime, but running a balancer in a busy cluster is not recommended. if we have to run it at busy time, we can set some parameters(for example, size.moved.max.per.iteration) to limit the influence it will bring to the busy cluster. so in majority of cases, for a good cluster administrator, running the balancer + replication manager together will not be a big problem.

What limits to we have on the balancer to stop too many commands getting scheduled on an under-used datanode?

seems we do not have any limit about this for now. we can add a configure item, maybe max.replication.command.sent.to.one.target.per.iteration, to limit the count of replicaiton command sent to a single target datanode in one iteration.

I understand that is needed, but I wonder if we could re-use the replication manager over-replicated-processor to do this instead.

i get your point here that we would better let all the command sent from a single place, and this will also share the traffic control of RM.
there are two problems i can think up for now:
1 RM is a background service and run periodically , but movemanger will take action as soon as it is notified some op is commpleted. i am not quite sure if RM is sleeping, is it correct to call ReplicationManager.processOverReplicated(...)

2 in majority cases, if a replication command is completed, the container is overreplicated and movemanager will delete the replica on source datanode. but for RM, it does not konw whether the container is being moved. when RM found this container is overreplicated, it may delete the replica in target datanode, which is not we want.

I also had a quick scan of the balancer config - is it not topology aware by default?

for now, we have two FindTargetStrategy, FindTargetStrategyByUsageInfo and FindTargetStrategyByNetworkTopology. in some case , when choosing a target , the two strategy may conflict. for example , a datanode may have a lower disk usage, but have a farer distance from a given source datanode. for now , we use
FindTargetStrategyByUsageInfo by default.

@JacksonYao287 JacksonYao287 marked this pull request as ready for review November 17, 2022 04:15
@siddhantsangwan
Copy link
Contributor

Why is this? Surely we will have a lot of "failed moves" that don't pass the placement check, certainly for EC when the number of replicas is larger.

@sodonnel Balancer does check if a potential move satisfies the placement policy before scheduling it, even if "move.networkTopology.enable" is disabled. Enabling that configuration means balancer will match a source with a target by sorting wrt network distance instead of sorting wrt utilisation values.

@adoroszlai adoroszlai added the EC label Nov 18, 2022
@siddhantsangwan
Copy link
Contributor

Thanks for working on this @JacksonYao287 . It would be great to have a short summary of the changes you're proposing here. That would really help me speed up review.

@JacksonYao287
Copy link
Contributor Author

JacksonYao287 commented Nov 19, 2022

@siddhantsangwan we have already have a design doc for move , please take a look!

the reasons for creating a new separate MoveManage are :

1 if we want to manager move operations, we need to be aware of inflight actions. before ContainerReplicaPendingOps is added, the inflight actions are managed by legacyRM. to be aware of these inflight actions, we have to create MoveScheduler inside it. but , that is not a good code structure. as we know that we are implementing the new ReplicaitonManager, and legacyRM will be dropped ultimately. so this is a good chance to add new separate MoveManage.

2 RM is a background service which will run periodically. so if a replication or deletion command in move operation is
completed or expired, we can not take actions in time and have to wait until the RM begin running. this will delay the completion of move dramatically. now inflight actions are managed by ContainerReplicaPendingOps, and move manager can be notified of move related event in time , so that we can take actions as soon as possible

3 the current movescheduler in legacyRM only support moving non-EC container , the newly add move Manager should move containers of all type.

ReplicationManagerReport report = new ReplicationManagerReport();
ReplicationQueue newRepQueue = new ReplicationQueue();
Map<ContainerID, MoveDataNodePair> pendingMoves =
moveManaer.getPendingMove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the ReplicationManager really need to know about the pending moves?

RM is spilt into two parts. The first, is finding containers with a problem (this bit), containers with a problem get added to a queue which is then processed later. Even if the container was identified as over-replicated here, MoveManager would have scheduled a delete of the required container before it is processed on the second queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more - what if the over-replication processing was "space aware". Given two replicas, always remove the one on the host with the least free space.

The balancer will always be adding containers to hosts with more free space, and removing replicas from hosts with less free space, so it seems that the standard over-replication handling could take care of this if it considered the free space on host when deciding which to remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the ReplicationManager really need to know about the pending moves?

in vast majority of cases, no need. but i take two corner cases into account.
1 if a ratis container has only two replicas, r1(dn1) , r2(dn2) , now it is under-replicated. if we schedule a move for r1 and everything goes well, there will be a r3 in dn3 ultimately and this container is not under-replicated again.
if RM is not aware of this move, it will schedule to replication to a randomly selected dn4 to handle this under-replicated state . after the replication is done, the container is over-replicated(4 replicas), another deletion will be scheduled. so if RM is aware of this move, the deletion can be avoided.

2 if a container is over-replicated(r1, r2, r3, r4), and we want to move r1 to dn5, then movemanager will send a replication command to dn5. if RM is not ware of this move and it find this over-replicated container, it may send a deletion command to r1, which will fail the move.

what if the over-replication processing was "space aware"

i have a jira to talk about this.
beside what @lokesh said, another reason is scm leader switch. let`s say a container has r1 , r2, r3, r4 on dn1, dn2, dn3, dn4. if rm1 in scm1(leader) find this container is over-replicated and r1 has the least free space, it will send a deletion to dn1. scm1 steps down and scm2 becomes the leader. if some new data has been writen to dn3 and it gets the least free space now, rm2 in scm2 will send a deletion command to dn3 to delete r3. now , two datanodes receives the deletion command while only one replica need to be deleted.

this jira make sure all the scms will delete replicas in a certain sequence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fundamental design problem with the old RM was it placing many hours or even days worth or commands on the DNs without anyway to see what progress they are making.

It seems the balancer has make this same mistake, in that it does not limit what is queued on a DN. Ideally, the balancer should schedule no more work than can be done in a heartbeat or two. If we have under utilized nodes, they are going to be the target for replicate commands, so the limit for the number queued should be quite small per heartbeat. The delete commands might be more scattered, but we should even limit them, but to a higher number.

If we avoid deep queues on the DNs, then we also avoid the problem with a leader switch on SCM.

At the moment ReplicationManager does not replicate its pending ops or commands to each SCM. The goal is to make the queue on the DN small, so that even if a leader switch did happen, there is not much more work to do, otherwise RM can fall into this same problem or scheduling multiple deletes.

Better than that, we could also make the DN reject certain commands that don't have the current SCM Leader term. For example, any replicate, delete container, reconstruct container can be discarded if the SCM term has increased and the replication manager will just re-create commands on its next pass.

There isn't really any reason that the Balancer could not do the same. Ie, schedule work only on DNs with the capacity to process it, and then drop all commands on a leader switch. It would make the balancer a lot more simple overall.

Then, we could have the over-replication handling try to take space into account and remove the replicas from the most over-used nodes, making it do the correct thing more of the time. In the event of a failover, some over-replicated containers caused by the balancer might have the wrong replica removed, but if the free space is so finely balanced between the nodes that it removes the wrong one, it doesn't really matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 if a ratis container has only two replicas, r1(dn1) , r2(dn2) , now it is under-replicated. if we schedule a move for r1 and everything goes well, there will be a r3 in dn3 ultimately and this container is not under-replicated again.

RM is aware of this via the pending ops. It will see the container as under-replicated, but with pending ops it will see it as OK and will not schedule anything further.

if a container is over-replicated(r1, r2, r3, r4), and we want to move r1 to dn5, then movemanager will send a replication command to dn5. if RM is not ware of this move and it find this over-replicated container, it may send a deletion command to r1, which will fail the move.

This example is slightly strange - what you are saying could happen, but really rather than "moving" a container from the over-used node to a lesser used node, we should just delete it from the over-used node.

Also, ideally, a replicate command for a move could have any of the source replicas as the place to read from. If you only have a few over used nodes on the cluster, then they will be a bottleneck for copying from. You don't have to read from the source you are going to delete. You can read from any source. Infact, I think the replicate command lets you pass as many sources as you want, and then it can retry if the copy fails from one source.

return containerStateManager.getContainerIDs(state).size();
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a getContainerReplicaCount method on ReplicationManager. I am not sure we should add this to the ContainerManager interface, as its more "replication" related that container replicas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicationManager#getContainerReplicaCount uses legacyReplicationManager#getContainerReplicaCount to get ContainerReplicaCount for ratis container . legacy RM will be removed ultimately, so legacyReplicationManager#getContainerReplicaCount will not exist.

i suggest to add it to ContainerManager interface for now, since it is a general interface that we can get container related info without considering the container type. after removing legacy RM, we can refactor it if needed, and i think it will not be too much work

.orElseThrow(() -> new ContainerNotFoundException("ID " + id));
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a utility method rather than part of the interface, as it just needs a Set to iterate over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure , make sense , will fix

* @param tgt The destination datanode to replicate
* @param src List of source nodes from where we can replicate
*/
private void sendReplicateCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a common method on ReplicationManager to send the command. At the moment there are a few places in RM that create and send commands, so it would be good to bring them together into a single place, as there are a few things that need to be taken care of together, which are hard to check when the logic is split into various places.

  1. Updating pending ops.
  2. Adding the SCM term.
  3. Ensuring we add the correct replica index.

We should refactor the current RM calls into a single place in RM in another Jira and then we can use the same interface here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense , we can create another jira to handle this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3990 creates a common point to send commands to the DNs via the RM. Once it is committed we can use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since #3990 is merged, let us use it

found = true;
iterator.remove();
pendingOpCount.get(op.getOpType()).decrementAndGet();
moveManager.notifyContainerOpCompleted(op, containerID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than placing a MoveManager inside ContainerReplicaPending ops, we should probably provide a Notify interface where objects can register for a call back. At the moment, MoveManager depends on this class and this class depends on MoveManager.

If we make only MoveManager have a dependency on this one, then it can register itself for a notification when a call comes in. It is also flexible, as if we want to register other objects later, we can do that too.

We could have just a single method to call opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) for both timeouts and normal completions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also thought of this. let me do it in this patch.

return;
}

final ContainerInfo cif = containerManager.getContainer(cid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this logic needs to go via ReplicationManager.processOverReplicatedContainer so we are not duplicating the OverRep handling logic here. We can change this interface a bit to make it more suitable and allow for passing some "preferred delete" replica.

We still need to add the handlers for Ratis and make the interface work with both of them.

I am also not sure these processUnderReplicatedContainer / processOverReplicatedContainer method need to be given a ContainerHealthResult object as their input. That is what is on the underReplicated queue, but I am not sure if we could get away with just passing the containerInfo instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this logic needs to go via ReplicationManager.processOverReplicatedContainer

agree, but it probably brings much refactor work. we can create a separate jira for refactoring

but I am not sure if we could get away with just passing the containerInfo instead.

underReplicated queue is a little more complex than other two queues, since it is a priority queue, which need some additional informations(WeightedRedundancy and RequeueCount) to sort the items.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is much refactoring to do here.

ReplicationManager.processOverReplicatedContainer processes the container for over replication and then returns a list of commands (ie deleteContainer commands). The Ratis side of things is not implemented yet, but Siddhant is planning to do that starting this week. The part we would need to change, is to pass a "hint" into the processOverReplication that we want to delete a certain replica if it makes sense to do so. It means all the placement checks etc are covered in one place in that handler, and not duplicate in the move manager too.

return found;
}

public void registerOpCompletionHandler(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a new Jira for these changes to ContainerReplicaPendingOps so it is a self contained change and not mixed into this larger one? We also need a couple of simple tests (or changes to existing tests) to ensure the handler is called in both scenarios.

It is so much easier to review a series of smaller Jira that a large one like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, we can put all the changes of ContainerReplicaPendingOps into another separate jira

pendingMoveFuture.clear();

//initialize with db record
try (TableIterator<ContainerID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When leader is ready, check if records are flushed as present in transaction buffer on switch to leader. Otherwise table will not have any records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is really a problem, thanks for pointing out, will fix it

@sodonnel
Copy link
Contributor

The more I think about this, I believe that the Balancer / Move Manager and the Replication Manager need to be aligned in how they work.

Replication Manager does not replicate its pendingOps across the standby SCM instances, so in the event of a failover, the "inflight" operations are not known to the new leader SCM. If RM is not replicating its pending moves, then why do we need the balancer to replicate its commands too? Failovers should be rare, and the worst case is that some balancing work does not complete as intended, but it can easily be rescheduled.

When a SCM leader switches, the leader term gets updated and the datanodes receive that through their heartbeat after the next heartbeat to the new leader SCM. All commands scheduled on a datanode have the SCM leader term within them.

So all we need to do, is tell the DN to drop any commands that are from an old SCM leader and we need to wait for a short period of time before the RM or Balancer runs after switching the SCM leader perhaps a few heartbeats, or until all DNs report an empty queue (this may be more tricky). Then the processes can startup and schedule new work.

Additionally, if we changed the over replication logic, so that it prefers to delete a replica from a node with less free space than the others, then replication becomes somewhat self balancing, and perhaps the delete part of the balancer isn't need. As it stands we cannot make that over-replication optimization in SCM, as the delete order is deterministic.

With EC, we could also make the pipeline policy "space aware" so it prefers to allocate new containers on nodes with more free space, also making the writes self balancing. Due to the long lived pipelines in Ratis, this is more difficult, but if we got to a place where pipelines are destroy after a few hours of uptime, then the Ratis pipeline policy could pick lesser used nodes too. In an ideal world, the system will self balance and the balancer would only be needed after adding new nodes to quickly move data.

Comment on lines +335 to +338
// we need to synchronize on ContainerInfo, since it is
// shared by ICR/FCR handler and this.processContainer
// TODO: use a Read lock after introducing a RW lock into ContainerInfo
ContainerInfo cif = containerManager.getContainer(cid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this comment but actually there's no synchronization in the code?

@umamaheswararao
Copy link
Contributor

Is it make sense to split up into few smaller tasks here?
@JacksonYao287 @sodonnel @siddhantsangwan

@JacksonYao287
Copy link
Contributor Author

@sodonnel sorry for not replying in time.

I believe that the Balancer / Move Manager and the Replication Manager need to be aligned in how they work.

agree, we`d better have a new design doc for container move, the old design doc is written for legacyRM.

i think scm could send all the commands(replication , deletion ,reconstruction. e.g.) through a separate class (maybe commandSendingManager). this class can limit the command sending to each datanode. it can buffer some command to be sent to a datanode until the num of commands in the datanode command queue is small enough , and it can also be notified by containerPendingOps with the op command status.

If RM is not replicating its pending moves, then why do we need the balancer to replicate its commands too?

@siddhantsangwan can you explain why balancer need to replicate its command in more details. i think you are familiar with this part.

So all we need to do, is tell the DN to drop any commands that are from an old SCM leader.

now, datanode will drop all the commands from an old scm leader. but i find there is a small problem. the scm leader term at datanoe is updated only before it is close to be handled. it means all the commands in the command queue from the stale scm leader will be handled until a command from new scm leader is handled.
we should update the scm leader term at datanode when receiving a command, so that we can make sure those stale command will not be handled.

@siddhantsangwan
Copy link
Contributor

@siddhantsangwan can you explain why balancer need to replicate its command in more details. i think you are familiar with this part.

Balancer's configurations are replicated so that they're available in the new SCM on failover. As for moves being replicated, I think the original intent was to provide consistency in scenarios such as -
Old leader scheduled a move from DN1 to DN2. Replication succeeded, and then there's a failover. How can we ensure that the new leader deletes replica from DN1 and not some other DN?

Back then, the Ratis approach was selected to ensure consistency. But some other approaches were also considered. From the move design doc:

Approach 1.2
Maintain a list of moves i.e. {cid, src_dn, target_dn}. Once the replica set for container cid has both src_dn and target_dn and the container is over-replicated, deletion can be scheduled for replica in src_dn. It would be better for RM to have a map called inflightMoves which tracks such move requests.
In the worst case container can become under-replicated i.e. container could end up with 2 replicas instead of 3. Let’s say container c1 is being moved from d2 to d4. RM1 sees replicas
{d1, d2, d3, d4} and schedules deletion for d2. RM2(new leader) also sees replicas {d1, d2, d3, d4} (deletion not yet completed) and schedules deletion for d1 based on a deterministic algo to handle over-replicated containers. In this case remaining replicas would be {d3, d4}.
In case of no failovers, the move should be successful. On a failover move map must be cleared.

I think @sodonnel is suggesting something similar here. We can avoid the worst case scenario if balancer waits (like RM) after a failover and datanodes drop commands from the old leader.

Additionally, if we changed the over replication logic, so that it prefers to delete a replica from a node with less free space than the others, then replication becomes somewhat self balancing, and perhaps the delete part of the balancer isn't need. As it stands we cannot make that over-replication optimization in SCM, as the delete order is deterministic.

@sodonnel from the move doc:

There is a problem with deleting the replica from the most used datanode. Let’s consider a scenario - container c1 and c2 with replicas in {d1, d2, d3}. In terms of utilisation d1 > d2 > d3. Balancer schedules move c1 from d1 to d4 and c2 from d2 to d4 given there is a limit of 1 on maximum moves from a source datanode. In the Replication manager when copy is complete c1 and c2 replicas will be deleted from d1 only. Expected behavior is for c1 to be deleted from d1 and c2 to be deleted from d2.

@siddhantsangwan
Copy link
Contributor

now, datanode will drop all the commands from an old scm leader. but i find there is a small problem. the scm leader term at datanoe is updated only before it is close to be handled. it means all the commands in the command queue from the stale scm leader will be handled until a command from new scm leader is handled.
we should update the scm leader term at datanode when receiving a command, so that we can make sure those stale command will not be handled.

Yes, good observation @JacksonYao287 . I think @sodonnel is already working on adding a timeout for dropping commands at the datanode which will cover this point too. Once that and #4064 are committed, we can move things forward here too

@DaveTeng0
Copy link
Contributor

Hello @JacksonYao287 ~ there are some merge conflicts from this PR. Please help take a look when you have time, tks!

@JacksonYao287
Copy link
Contributor Author

Hello @JacksonYao287 ~ there are some merge conflicts from this PR. Please help take a look when you have time, tks!

thanks @DaveTeng0, i think we need to discuss how to implement move manager in more detail. since there is a lot of conflict, i think it is better to close this patch , and open a new PR to working on this based on the newest mainline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants