Skip to content

Commit

Permalink
YARN-5070. upgrade HBase version for first merge (Vrushali C via sjlee)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjlee committed Jul 10, 2016
1 parent 1ff6833 commit 1a22774
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 88 deletions.
4 changes: 2 additions & 2 deletions hadoop-project/pom.xml
Expand Up @@ -49,8 +49,8 @@
<xerces.jdiff.version>2.11.0</xerces.jdiff.version> <xerces.jdiff.version>2.11.0</xerces.jdiff.version>


<kafka.version>0.8.2.1</kafka.version> <kafka.version>0.8.2.1</kafka.version>
<hbase.version>1.0.1</hbase.version> <hbase.version>1.1.3</hbase.version>
<phoenix.version>4.5.0-SNAPSHOT</phoenix.version> <phoenix.version>4.7.0-HBase-1.1</phoenix.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version> <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>


<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
Expand Down
Expand Up @@ -41,7 +41,7 @@
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
Expand Down Expand Up @@ -107,8 +107,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow run table // check in flow run table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
Expand All @@ -122,8 +122,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow activity table // check in flow activity table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
Expand All @@ -137,8 +137,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in entity run table // check in entity run table
util.waitUntilAllRegionsAssigned(table); util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table); HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table); List<Region> regions = server.getOnlineRegions(table);
for (HRegion region : regions) { for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf)); hbaseConf));
} }
Expand Down Expand Up @@ -311,6 +311,9 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
// check flow run // check flow run
checkFlowRunTable(cluster, user, flow, runid, c1); checkFlowRunTable(cluster, user, flow, runid, c1);


// check various batch limits in scanning the table for this flow
checkFlowRunTableBatchLimit(cluster, user, flow, runid, c1);

// use the timeline reader to verify data // use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null; HBaseTimelineReaderImpl hbr = null;
try { try {
Expand Down Expand Up @@ -350,6 +353,157 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
} }
} }


/*
* checks the batch limits on a scan
*/
void checkFlowRunTableBatchLimit(String cluster, String user,
String flow, long runid, Configuration c1) throws IOException {

Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
s.setStartRow(startRow);
// set a batch limit
int batchLimit = 2;
s.setBatch(batchLimit);
String clusterStop = cluster + "1";
byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn
.getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);

int loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertTrue(values.size() <= batchLimit);
loopCount++;
}
assertTrue(loopCount > 0);

// test with a diff batch limit
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 1;
s.setBatch(batchLimit);
s.setMaxResultsPerColumnFamily(2);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);

loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(batchLimit, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
assertEquals(batchLimit, values.size());
loopCount++;
}
assertTrue(loopCount > 0);

// test with a diff batch limit
// set it high enough
// we expect back 3 since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 100;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);

loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertTrue(result.rawCells().length <= batchLimit);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertTrue(values.size() <= batchLimit);
assertTrue(values.size() == 3); // see comment above
loopCount++;
}
// should loop through only once
assertTrue(loopCount == 1);

// set it to a negative number
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = -671;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);

loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);

// set it to 0
// we expect all 3 back since there are
// column = m!HDFS_BYTES_READ value=57
// column = m!MAP_SLOT_MILLIS value=141
// column min_start_time value=1425016501000
s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
s.setStartRow(startRow);
// set a batch limit
batchLimit = 0;
s.setBatch(batchLimit);
s.setStopRow(stopRow);
scanner = table1.getScanner(s);

loopCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
assertEquals(3, result.rawCells().length);
Map<byte[], byte[]> values = result
.getFamilyMap(FlowRunColumnFamily.INFO.getBytes());
assertNotNull(values);
// assert that with every next invocation
// we get back <= batchLimit values
assertEquals(3, values.size());
loopCount++;
}
// should loop through only once
assertEquals(1, loopCount);
}

private void checkFlowRunTable(String cluster, String user, String flow, private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1) throws IOException { long runid, Configuration c1) throws IOException {
Scan s = new Scan(); Scan s = new Scan();
Expand Down

0 comments on commit 1a22774

Please sign in to comment.