Skip to content

Commit

Permalink
HBASE-15441 Fix WAL splitting when region has moved multiple times
Browse files Browse the repository at this point in the history
Summary:
Currently WAL splitting is broken when a region has been opened multiple times in recent minutes.

Region open and region close write event markers to the wal. These markers should have the sequence id in them. However it is currently getting 1. That means that if a region has moved multiple times in the last few mins then multiple split log workers will try and create the recovered edits file for sequence id 1. One of the workers will fail and on failing they will delete the recovered edits. Causing all split wal attempts to fail.

We need to:

It appears that the close event with a sequence id of one is coming from region warm up.

This patch fixes that by making sure the close on warm up doesn't happen. Also splitting will ignore any of the events that are already in the logs.

Test Plan: Unit tests pass

Differential Revision: https://reviews.facebook.net/D55557
  • Loading branch information
elliottneilclark committed Mar 16, 2016
1 parent 3adcc75 commit ecec35a
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 19 deletions.
Expand Up @@ -2629,15 +2629,36 @@ public static FlushDescriptor toFlushDescriptor(FlushAction action, HRegionInfo
public static RegionEventDescriptor toRegionEventDescriptor( public static RegionEventDescriptor toRegionEventDescriptor(
EventType eventType, HRegionInfo hri, long seqId, ServerName server, EventType eventType, HRegionInfo hri, long seqId, ServerName server,
Map<byte[], List<Path>> storeFiles) { Map<byte[], List<Path>> storeFiles) {
final byte[] tableNameAsBytes = hri.getTable().getName();
final byte[] encodedNameAsBytes = hri.getEncodedNameAsBytes();
final byte[] regionNameAsBytes = hri.getRegionName();
return toRegionEventDescriptor(eventType,
tableNameAsBytes,
encodedNameAsBytes,
regionNameAsBytes,
seqId,

server,
storeFiles);
}

public static RegionEventDescriptor toRegionEventDescriptor(EventType eventType,
byte[] tableNameAsBytes,
byte[] encodedNameAsBytes,
byte[] regionNameAsBytes,
long seqId,

ServerName server,
Map<byte[], List<Path>> storeFiles) {
RegionEventDescriptor.Builder desc = RegionEventDescriptor.newBuilder() RegionEventDescriptor.Builder desc = RegionEventDescriptor.newBuilder()
.setEventType(eventType) .setEventType(eventType)
.setTableName(ByteStringer.wrap(hri.getTable().getName())) .setTableName(ByteStringer.wrap(tableNameAsBytes))
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes())) .setEncodedRegionName(ByteStringer.wrap(encodedNameAsBytes))
.setRegionName(ByteStringer.wrap(hri.getRegionName())) .setRegionName(ByteStringer.wrap(regionNameAsBytes))
.setLogSequenceNumber(seqId) .setLogSequenceNumber(seqId)
.setServer(toServerName(server)); .setServer(toServerName(server));


for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) { for (Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
StoreDescriptor.Builder builder = StoreDescriptor.newBuilder() StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey())) .setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); .setStoreHomeDir(Bytes.toString(entry.getKey()));
Expand Down
Expand Up @@ -966,10 +966,13 @@ public HStore call() throws IOException {


private void initializeWarmup(final CancelableProgressable reporter) throws IOException { private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);

// Initialize all the HStores // Initialize all the HStores
status.setStatus("Warming up all the Stores"); status.setStatus("Warming up all the Stores");
initializeStores(reporter, status); try {
initializeStores(reporter, status);
} finally {
status.markComplete("Done warming up.");
}
} }


/** /**
Expand Down Expand Up @@ -6427,9 +6430,8 @@ public static void warmupHRegion(final HRegionInfo info,
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
} }


HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices); HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
r.initializeWarmup(reporter); r.initializeWarmup(reporter);
r.close();
} }




Expand Down
Expand Up @@ -89,10 +89,14 @@ public class WALEdit implements Writable, HeapSize {


// TODO: Get rid of this; see HBASE-8457 // TODO: Get rid of this; see HBASE-8457
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW"); @VisibleForTesting
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); public static final byte [] METAROW = Bytes.toBytes("METAROW");
static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH"); @VisibleForTesting
static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT"); public static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
@VisibleForTesting
public static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
@VisibleForTesting
public static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
@VisibleForTesting @VisibleForTesting
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");


Expand Down Expand Up @@ -343,7 +347,7 @@ public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDe
return new WALEdit().add(kv); //replication scope null so that this won't be replicated return new WALEdit().add(kv); //replication scope null so that this won't be replicated
} }


private static byte[] getRowForRegion(HRegionInfo hri) { public static byte[] getRowForRegion(HRegionInfo hri) {
byte[] startKey = hri.getStartKey(); byte[] startKey = hri.getStartKey();
if (startKey.length == 0) { if (startKey.length == 0) {
// empty row key is not allowed in mutations because it is both the start key and the end key // empty row key is not allowed in mutations because it is both the start key and the end key
Expand Down
Expand Up @@ -366,6 +366,11 @@ public boolean flush() throws IOException {
return super.flush(); return super.flush();
} }


@Override
public boolean keepRegionEvents() {
return true;
}

@Override @Override
public List<Path> finishWritingAndClose() throws IOException { public List<Path> finishWritingAndClose() throws IOException {
finishWriting(true); finishWriting(true);
Expand Down
Expand Up @@ -359,6 +359,11 @@ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws
editsSkipped++; editsSkipped++;
continue; continue;
} }
// Don't send Compaction/Close/Open region events to recovered edit type sinks.
if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvents()) {
editsSkipped++;
continue;
}
entryBuffers.appendEntry(entry); entryBuffers.appendEntry(entry);
editsCount++; editsCount++;
int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
Expand Down Expand Up @@ -1266,6 +1271,15 @@ protected boolean finishWriting(boolean interrupt) throws IOException {
public boolean flush() throws IOException { public boolean flush() throws IOException {
return false; return false;
} }

/**
* Some WALEdit's contain only KV's for account on what happened to a region.
* Not all sinks will want to get those edits.
*
* @return Return true if this sink wants to get all WALEdit's regardless of if it's a region
* event.
*/
public abstract boolean keepRegionEvents();
} }


/** /**
Expand Down Expand Up @@ -1609,6 +1623,11 @@ public void append(RegionEntryBuffer buffer) throws IOException {
} }
} }


@Override
public boolean keepRegionEvents() {
return false;
}

/** /**
* @return a map from encoded region ID to the number of edits written out for that region. * @return a map from encoded region ID to the number of edits written out for that region.
*/ */
Expand Down Expand Up @@ -2060,6 +2079,11 @@ public boolean flush() throws IOException {
return false; return false;
} }


@Override
public boolean keepRegionEvents() {
return true;
}

void addWriterError(Throwable t) { void addWriterError(Throwable t) {
thrown.add(t); thrown.add(t);
} }
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand All @@ -55,18 +56,24 @@
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader; import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -422,7 +429,7 @@ public void testSplitPreservesEdits() throws IOException{
REGIONS.clear(); REGIONS.clear();
REGIONS.add(REGION); REGIONS.add(REGION);


generateWALs(1, 10, -1); generateWALs(1, 10, -1, 0);
useDifferentDFSClient(); useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Expand All @@ -432,6 +439,22 @@ public void testSplitPreservesEdits() throws IOException{
assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
} }


@Test (timeout=300000)
public void testSplitRemovesRegionEventsEdits() throws IOException{
final String REGION = "region__1";
REGIONS.clear();
REGIONS.add(REGION);

generateWALs(1, 10, -1, 100);
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
assertEquals(1, splitLog.length);

assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
}

/** /**
* @param expectedEntries -1 to not assert * @param expectedEntries -1 to not assert
* @return the count across all regions * @return the count across all regions
Expand Down Expand Up @@ -610,7 +633,7 @@ private void ignoreCorruption(final Corruptions corruption, final int entryCount
REGIONS.add(REGION); REGIONS.add(REGION);


Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
generateWALs(1, entryCount, -1); generateWALs(1, entryCount, -1, 0);
corruptWAL(c1, corruption, true); corruptWAL(c1, corruption, true);


useDifferentDFSClient(); useDifferentDFSClient();
Expand Down Expand Up @@ -1120,7 +1143,11 @@ protected Writer createWriter(Path logfile)
} }


private Writer generateWALs(int leaveOpen) throws IOException { private Writer generateWALs(int leaveOpen) throws IOException {
return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen); return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
}

private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
return generateWALs(writers, entries, leaveOpen, 7);
} }


private void makeRegionDirs(List<String> regions) throws IOException { private void makeRegionDirs(List<String> regions) throws IOException {
Expand All @@ -1134,11 +1161,12 @@ private void makeRegionDirs(List<String> regions) throws IOException {
* @param leaveOpen index to leave un-closed. -1 to close all. * @param leaveOpen index to leave un-closed. -1 to close all.
* @return the writer that's still open, or null if all were closed. * @return the writer that's still open, or null if all were closed.
*/ */
private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
makeRegionDirs(REGIONS); makeRegionDirs(REGIONS);
fs.mkdirs(WALDIR); fs.mkdirs(WALDIR);
Writer [] ws = new Writer[writers]; Writer [] ws = new Writer[writers];
int seq = 0; int seq = 0;
int numRegionEventsAdded = 0;
for (int i = 0; i < writers; i++) { for (int i = 0; i < writers; i++) {
ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
for (int j = 0; j < entries; j++) { for (int j = 0; j < entries; j++) {
Expand All @@ -1147,6 +1175,11 @@ private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOEx
String row_key = region + prefix++ + i + j; String row_key = region + prefix++ + i + j;
appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
VALUE, seq++); VALUE, seq++);

if (numRegionEventsAdded < regionEvents) {
numRegionEventsAdded ++;
appendRegionEvent(ws[i], region);
}
} }
} }
if (i != leaveOpen) { if (i != leaveOpen) {
Expand All @@ -1160,6 +1193,8 @@ private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOEx
return ws[leaveOpen]; return ws[leaveOpen];
} }




private Path[] getLogForRegion(Path rootdir, TableName table, String region) private Path[] getLogForRegion(Path rootdir, TableName table, String region)
throws IOException { throws IOException {
Path tdir = FSUtils.getTableDir(rootdir, table); Path tdir = FSUtils.getTableDir(rootdir, table);
Expand Down Expand Up @@ -1270,6 +1305,23 @@ private int countWAL(Path log) throws IOException {
return count; return count;
} }


private static void appendRegionEvent(Writer w, String region) throws IOException {
WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
TABLE_NAME.toBytes(),
region.getBytes(),
String.valueOf(region.hashCode()).getBytes(),
1,
ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
final long time = EnvironmentEdgeManager.currentTime();
KeyValue kv = new KeyValue(region.getBytes(), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
time, regionOpenDesc.toByteArray());
final WALKey walKey = new WALKey(region.getBytes(), TABLE_NAME, 1, time,
HConstants.DEFAULT_CLUSTER_ID);
w.append(
new Entry(walKey, new WALEdit().add(kv)));
}

public static long appendEntry(Writer writer, TableName table, byte[] region, public static long appendEntry(Writer writer, TableName table, byte[] region,
byte[] row, byte[] family, byte[] qualifier, byte[] row, byte[] family, byte[] qualifier,
byte[] value, long seq) byte[] value, long seq)
Expand All @@ -1286,9 +1338,11 @@ private static Entry createTestEntry(
byte[] row, byte[] family, byte[] qualifier, byte[] row, byte[] family, byte[] qualifier,
byte[] value, long seq) { byte[] value, long seq) {
long time = System.nanoTime(); long time = System.nanoTime();
WALEdit edit = new WALEdit();
seq++; seq++;
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
WALEdit edit = new WALEdit();
edit.add(cell);
return new Entry(new WALKey(region, table, seq, time, return new Entry(new WALKey(region, table, seq, time,
HConstants.DEFAULT_CLUSTER_ID), edit); HConstants.DEFAULT_CLUSTER_ID), edit);
} }
Expand Down

0 comments on commit ecec35a

Please sign in to comment.