-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
ReplicationSourceManager.java
758 lines (710 loc) · 28.8 KB
/
ReplicationSourceManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
* <ul>
* <li> Normal sources are persistent and one per peer cluster</li>
* <li> Old sources are recovered from a failed region server and our
* only goal is to finish replicating the WAL queue it had up in ZK</li>
* </ul>
*
* When a region server dies, this class uses a watcher to get notified and it
* tries to grab a lock in order to transfer all the queues in a local
* old source.
*
* This class implements the ReplicationListener interface so that it can track changes in
* replication state.
*/
@InterfaceAudience.Private
public class ReplicationSourceManager implements ReplicationListener {
private static final Log LOG =
LogFactory.getLog(ReplicationSourceManager.class);
// List of all the sources that read this RS's logs
private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
private final ReplicationQueues replicationQueues;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
private final UUID clusterId;
// All about stopping
private final Server server;
// All logs we are currently tracking
// Index structure of the map is: peer_id->logPrefix/logGroup->logs
private final Map<String, Map<String, SortedSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
private Set<Path> latestPaths;
// Path to the wals directories
private final Path logDir;
// Path to the wal archive
private final Path oldLogDir;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
* @param server the server for this region server
* @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.conf = conf;
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
this.sleepBeforeFailover =
conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
this.clusterId = clusterId;
this.replicationTracker.registerListener(this);
this.replicationPeers.getAllPeerIds();
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
100, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/**
* Provide the id of the peer and a log key and this method will figure which
* wal it belongs to and will log, for this region server, the current
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
public void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
}
/**
* Cleans a log file and all older files from ZK. Called when we are sure that a
* log file is closed and has no more entries.
* @param key Path to the log
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
} else {
synchronized (this.walsById) {
SortedSet<String> wals = walsById.get(id).get(logPrefix);
if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
}
}
}
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
for (String wal : walSet) {
this.replicationQueues.removeLog(id, wal);
}
walSet.clear();
}
/**
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
// when a peer was added before replication for bulk loaded data was enabled.
this.replicationQueues.addPeerToHFileRefs(id);
}
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
return;
}
List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+ otherRegionServers);
// Look if there's anything to process after a restart
for (String rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
/**
* Add sources for the given peer cluster on this region server. For the newly added peer, we only
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
* @throws IOException
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
this.walsById.put(id, walsByGroup);
// Add the latest wal to that source's queue
synchronized (latestPaths) {
if (this.latestPaths.size() > 0) {
for (Path logPath : latestPaths) {
String name = logPath.getName();
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
SortedSet<String> logs = new TreeSet<String>();
logs.add(name);
walsByGroup.put(walPrefix, logs);
try {
this.replicationQueues.addLog(id, name);
} catch (ReplicationException e) {
String message =
"Cannot add log to queue when creating a new source, queueId=" + id
+ ", filename=" + name;
server.stop(message);
throw e;
}
src.enqueueLog(logPath);
}
}
}
}
src.startup();
return src;
}
/**
* Delete a complete queue of wals associated with a peer cluster
* @param peerId Id of the peer cluster queue of wals to delete
*/
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
this.replicationPeers.peerRemoved(peerId);
}
}
/**
* Terminate the replication on this region server
*/
public void join() {
this.executor.shutdown();
for (ReplicationSourceInterface source : this.sources) {
source.terminate("Region server is closing");
}
}
/**
* Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names
*/
protected Map<String, Map<String, SortedSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
/**
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
/**
* Get a list of all the normal sources of this rs
* @return lis of all sources
*/
public List<ReplicationSourceInterface> getSources() {
return this.sources;
}
/**
* Get a list of all the old sources of this rs
* @return list of all old sources
*/
public List<ReplicationSourceInterface> getOldSources() {
return this.oldsources;
}
void preLogRoll(Path newLog) throws IOException {
recordLog(newLog);
String logName = newLog.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
synchronized (latestPaths) {
Iterator<Path> iterator = latestPaths.iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
if (path.getName().contains(logPrefix)) {
iterator.remove();
break;
}
}
this.latestPaths.add(newLog);
}
}
/**
* Check and enqueue the given log to the correct source. If there's still no source for the
* group to which the given log belongs, create one
* @param logPath the log path to check and enqueue
* @throws IOException
*/
private void recordLog(Path logPath) throws IOException {
String logName = logPath.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
// update replication queues on ZK
synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
// the to-be-removed peer
for (String id : replicationPeers.getPeerIds()) {
try {
this.replicationQueues.addLog(id, logName);
} catch (ReplicationException e) {
throw new IOException("Cannot add log to replication queue"
+ " when creating a new source, queueId=" + id + ", filename=" + logName, e);
}
}
}
// update walsById map
synchronized (walsById) {
for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
String peerId = entry.getKey();
Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since
// we only consider the last one when a new slave comes in
wals.clear();
}
if (logPrefix.equals(walsEntry.getKey())) {
wals.add(logName);
existingPrefix = true;
}
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
SortedSet<String> wals = new TreeSet<String>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
}
}
}
void postLogRoll(Path newLog) throws IOException {
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);
}
}
/**
* Factory method to create a replication source
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
*/
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Server server, final String peerId, final UUID clusterId,
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
Class c = Class.forName(conf.get("replication.replicationsource.implementation",
ReplicationSource.class.getCanonicalName()));
src = (ReplicationSourceInterface) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
}
ReplicationEndpoint replicationEndpoint = null;
try {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
@SuppressWarnings("rawtypes")
Class c = Class.forName(replicationEndpointImpl);
replicationEndpoint = (ReplicationEndpoint) c.newInstance();
if(rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = rsServerHost
.postCreateReplicationEndPoint(replicationEndpoint);
if(newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
} catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors"
+ " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e);
}
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
clusterId, replicationEndpoint, metrics);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
return src;
}
/**
* Transfer all the queues of the specified to this region server.
* First it tries to grab a lock and if it works it will move the
* znodes and finally will delete the old znodes.
*
* It creates one old source for any type of source of the old rs.
* @param rsZnode
*/
private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer =
new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
this.clusterId);
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
}
}
/**
* Clear the references to the specified old source
* @param src source to clear
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
deleteSource(src.getPeerClusterZnode(), false);
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
}
/**
* Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
* @param id The id of the peer cluster
*/
public void removePeer(String id) {
LOG.info("Closing the following queue " + id + ", currently have "
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user";
List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
if (id.equals(src.getPeerClusterId())) {
oldSourcesToDelete.add(src);
}
}
for (ReplicationSourceInterface src : oldSourcesToDelete) {
src.terminate(terminateMessage);
closeRecoveredQueue((src));
}
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
// for the to-be-removed peer
for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) {
srcToRemove.add(src);
}
}
if (srcToRemove.size() == 0) {
LOG.error("The queue we wanted to close is missing " + id);
return;
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
this.sources.remove(toRemove);
}
deleteSource(id, true);
}
}
@Override
public void regionServerRemoved(String regionserver) {
transferQueues(regionserver);
}
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);
this.replicationQueues.removePeerFromHFileRefs(peerId);
}
@Override
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
} catch (Exception e) {
LOG.error("Error while adding a new peer", e);
}
}
}
/**
* Class responsible to setup new ReplicationSources to take care of the
* queues from dead region servers.
*/
class NodeFailoverWorker extends Thread {
private String rsZnode;
private final ReplicationQueues rq;
private final ReplicationPeers rp;
private final UUID clusterId;
/**
*
* @param rsZnode
*/
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final UUID clusterId) {
super("Failover-for-"+rsZnode);
this.rsZnode = rsZnode;
this.rq = replicationQueues;
this.rp = replicationPeers;
this.clusterId = clusterId;
}
@Override
public void run() {
if (this.rq.isThisOurRegionServer(rsZnode)) {
return;
}
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (server.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
Map<String, Set<String>> newQueues = null;
newQueues = this.rq.claimQueues(rsZnode);
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
// We either didn't get the lock or the failed region server didn't have any outstanding
// WALs to replicate, so we are done.
return;
}
for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
Set<String> walsSet = entry.getValue();
try {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
} catch (ReplicationException ex) {
LOG.warn("Received exception while getting replication peer config, skipping replay"
+ ex);
}
if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
continue;
}
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
walsByIdRecoveredQueues.put(peerId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
SortedSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<String>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
// enqueue sources
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer);
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
oldsources.add(src);
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
}
}
}
}
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
*/
public Path getOldLogDir() {
return this.oldLogDir;
}
/**
* Get the directory where wals are stored by their RSs
* @return the directory where wals are stored by their RSs
*/
public Path getLogDir() {
return this.logDir;
}
/**
* Get the handle on the local file system
* @return Handle on the local file system
*/
public FileSystem getFs() {
return this.fs;
}
/**
* Get a string representation of all the sources' metrics
*/
public String getStats() {
StringBuffer stats = new StringBuffer();
for (ReplicationSourceInterface source : sources) {
stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();
}
public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
throws ReplicationException {
for (ReplicationSourceInterface source : this.sources) {
source.addHFileRefs(tableName, family, files);
}
}
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
}
}