Skip to content

Commit

Permalink
YARN-4210. HBase reader throws NPE if Get returns no rows (Varun Saxe…
Browse files Browse the repository at this point in the history
…na via vrushali)
  • Loading branch information
Vrushali Channapattan authored and sjlee committed Jul 10, 2016
1 parent da2b7bd commit 708fa8b
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 59 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
Expand Down Expand Up @@ -85,24 +86,10 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
}

@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
// If getEntities() is called for an application, there can be at most
// one entity. If the entity passes the filter, it is returned. Otherwise,
// an empty set is returned.
byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
flowRunId, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = table.getResult(hbaseConf, conn, get);
TimelineEntity entity = parseEntity(result);
Set<Result> set;
if (entity != null) {
set = Collections.singleton(result);
} else {
set = Collections.emptySet();
}
return set;
throw new UnsupportedOperationException(
"we don't support multiple apps query");
}

@Override
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
Expand Down Expand Up @@ -88,18 +89,22 @@ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
augmentParams(hbaseConf, conn);

NavigableSet<TimelineEntity> entities = new TreeSet<>();
Iterable<Result> results = getResults(hbaseConf, conn);
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() == limit) {
break;
ResultScanner results = getResults(hbaseConf, conn);
try {
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() == limit) {
break;
}
}
return entities;
} finally {
results.close();
}
return entities;
}

@Override
Expand All @@ -123,7 +128,7 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
}

@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
Expand Down Expand Up @@ -96,7 +97,7 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
}

@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
throw new UnsupportedOperationException(
"multiple entity query is not supported");
Expand All @@ -110,14 +111,14 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
flowRun.setRunId(flowRunId);

// read the start time
Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result);
if (startTime != null) {
flowRun.setStartTime(startTime);
flowRun.setStartTime(startTime.longValue());
}
// read the end time if available
Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result);
if (endTime != null) {
flowRun.setMaxEndTime(endTime);
flowRun.setMaxEndTime(endTime.longValue());
}

// read the flow version
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
Expand Down Expand Up @@ -176,7 +177,7 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
}

@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Expand Down
Expand Up @@ -25,9 +25,12 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
Expand All @@ -40,6 +43,7 @@
* entities that are being requested.
*/
abstract class TimelineEntityReader {
private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
protected final boolean singleEntityRead;

protected String userId;
Expand Down Expand Up @@ -131,6 +135,11 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
augmentParams(hbaseConf, conn);

Result result = getResult(hbaseConf, conn);
if (result == null || result.isEmpty()) {
// Could not find a matching row.
LOG.info("Cannot find matching entity of type " + entityType);
return null;
}
return parseEntity(result);
}

Expand All @@ -145,18 +154,22 @@ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
augmentParams(hbaseConf, conn);

NavigableSet<TimelineEntity> entities = new TreeSet<>();
Iterable<Result> results = getResults(hbaseConf, conn);
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() > limit) {
entities.pollLast();
ResultScanner results = getResults(hbaseConf, conn);
try {
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() > limit) {
entities.pollLast();
}
}
return entities;
} finally {
results.close();
}
return entities;
}

/**
Expand Down Expand Up @@ -184,9 +197,9 @@ protected abstract Result getResult(Configuration hbaseConf, Connection conn)
throws IOException;

/**
* Fetches an iterator for {@link Result} instances for a multi-entity read.
* Fetches a {@link ResultScanner} for a multi-entity read.
*/
protected abstract Iterable<Result> getResults(Configuration hbaseConf,
protected abstract ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException;

/**
Expand Down
Expand Up @@ -59,6 +59,7 @@
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
Expand Down Expand Up @@ -281,6 +282,16 @@ private static boolean verifyMetrics(
return false;
}

private static void verifyHttpResponse(Client client, URI uri,
Status status) {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertNotNull(resp);
assertTrue("Response from server should have been " + status,
resp.getClientResponseStatus().equals(status));
}

@Test
public void testGetFlowRun() throws Exception {
Client client = createClient();
Expand Down Expand Up @@ -354,6 +365,35 @@ public void testGetFlows() throws Exception {
}
}

@Test
public void testGetFlowRunNotPresent() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
verifyHttpResponse(client, uri, Status.NOT_FOUND);
} finally {
client.destroy();
}
}

@Test
public void testGetFlowsNotPresent() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster2");
ClientResponse resp = getResponse(client, uri);
Set<FlowActivityEntity> entities =
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(0, entities.size());
} finally {
client.destroy();
}
}

@After
public void stop() throws Exception {
if (server != null) {
Expand Down
Expand Up @@ -249,11 +249,7 @@ public void testWriteApplicationToHBase() throws Exception {
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
appId, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());

// verify attributes
assertEquals(appId, e1.getId());
Expand Down Expand Up @@ -610,18 +606,9 @@ public void testEvents() throws IOException {
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertNotNull(e2);
assertEquals(e1, e2);
assertEquals(1, es1.size());
assertEquals(1, es2.size());
assertEquals(es1, es2);

// check the events
NavigableSet<TimelineEvent> events = e1.getEvents();
Expand Down

0 comments on commit 708fa8b

Please sign in to comment.