Skip to content

Commit

Permalink
YARN-5585. [Atsv2] Reader side changes for entity prefix and support …
Browse files Browse the repository at this point in the history
…for pagination via additional filters (Rohith Sharma K S via Varun Saxena)
  • Loading branch information
varunsaxena committed Aug 30, 2017
1 parent 2556c01 commit 02a9710
Show file tree
Hide file tree
Showing 20 changed files with 512 additions and 160 deletions.
Expand Up @@ -550,20 +550,10 @@ public boolean equals(Object obj) {
public int compareTo(TimelineEntity other) {
int comparison = getType().compareTo(other.getType());
if (comparison == 0) {
if (getCreatedTime() == null) {
if (other.getCreatedTime() == null) {
return getId().compareTo(other.getId());
} else {
return 1;
}
}
if (other.getCreatedTime() == null) {
return -1;
}
if (getCreatedTime() > other.getCreatedTime()) {
// Order by created time desc
if (getIdPrefix() > other.getIdPrefix()) {
// Descending order by entity id prefix
return -1;
} else if (getCreatedTime() < other.getCreatedTime()) {
} else if (getIdPrefix() < other.getIdPrefix()) {
return 1;
} else {
return getId().compareTo(other.getId());
Expand Down
Expand Up @@ -214,7 +214,7 @@ private static void loadData() throws Exception {
entity4.addMetrics(metrics);
te4.addEntity(entity4);

TimelineEntities te5 = new TimelineEntities();
TimelineEntities userEntities = new TimelineEntities();
TimelineEntity entity5 = new TimelineEntity();
entity5.setId("entity1");
entity5.setType("type1");
Expand Down Expand Up @@ -270,7 +270,7 @@ private static void loadData() throws Exception {
relatesTo1.put("type3",
Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
entity5.addRelatesToEntities(relatesTo1);
te5.addEntity(entity5);
userEntities.addEntity(entity5);

TimelineEntity entity6 = new TimelineEntity();
entity6.setId("entity2");
Expand Down Expand Up @@ -329,7 +329,16 @@ private static void loadData() throws Exception {
relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66"));
relatesTo2.put("type3", Sets.newHashSet("entity31"));
entity6.addRelatesToEntities(relatesTo2);
te5.addEntity(entity6);
userEntities.addEntity(entity6);

for (long i = 1; i <= 10; i++) {
TimelineEntity userEntity = new TimelineEntity();
userEntity.setType("entitytype");
userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(System.currentTimeMillis());
userEntities.addEntity(userEntity);
}

HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
Expand All @@ -342,7 +351,7 @@ private static void loadData() throws Exception {
hbi.write(cluster, user, flow2,
flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", te5);
"application_1111111111_1111", userEntities);
hbi.flush();
} finally {
if (hbi != null) {
Expand Down Expand Up @@ -806,7 +815,8 @@ public void testGetEntitiesByUID() throws Exception {
assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(
new TimelineReaderContext(context.getClusterId(),
context.getUserId(), context.getFlowName(),
context.getFlowRunId(), context.getAppId(), "type1",
context.getFlowRunId(), context.getAppId(), "type1",
entity.getIdPrefix(),
entity.getId())), entityUID);
}
}
Expand Down Expand Up @@ -882,8 +892,8 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") ||
uid.equals(appUIDWithFlowInfo + "!type1!entity2"));
assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!0!entity1")
|| uid.equals(appUIDWithFlowInfo + "!type1!0!entity2"));
}

String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111";
Expand All @@ -909,11 +919,11 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") ||
uid.equals(appUIDWithoutFlowInfo + "!type1!entity2"));
assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity1")
|| uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity2"));
}

String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1";
String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!0!entity1";
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
"entity-uid/" + entityUIDWithFlowInfo);
resp = getResponse(client, uri);
Expand All @@ -923,7 +933,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
assertEquals("entity1", singleEntity1.getId());

String entityUIDWithoutFlowInfo =
appUIDWithoutFlowInfo + "!type1!entity1";
appUIDWithoutFlowInfo + "!type1!0!entity1";
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
"entity-uid/" + entityUIDWithoutFlowInfo);
resp = getResponse(client, uri);
Expand Down Expand Up @@ -2187,4 +2197,74 @@ public void stop() throws Exception {
server = null;
}
}

@Test
public void testGenericEntitiesForPagination() throws Exception {
Client client = createClient();
try {
int limit = 10;
String queryParam = "?limit=" + limit;
String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/"
+ "entities/entitytype";
URI uri = URI.create(resourceUri + queryParam);

ClientResponse resp = getResponse(client, uri);
List<TimelineEntity> entities =
resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-1 in descending order.
verifyPaginatedEntites(entities, limit, limit);

limit = 4;
queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-7 in descending order.
TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10);

queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix()
+ "&&fromid=" + entity.getId();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-7 to entity-4 in descending order.
entity = verifyPaginatedEntites(entities, limit, 7);

queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-4 to entity-1 in descending order.
entity = verifyPaginatedEntites(entities, limit, 4);

queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// always entity-1 will be retrieved
entity = verifyPaginatedEntites(entities, 1, 1);
} finally {
client.destroy();
}
}

private TimelineEntity verifyPaginatedEntites(List<TimelineEntity> entities,
int limit, int startFrom) {
assertNotNull(entities);
assertEquals(limit, entities.size());
TimelineEntity entity = null;
for (TimelineEntity timelineEntity : entities) {
assertEquals("entitytype", timelineEntity.getType());
assertEquals("entityid-" + startFrom, timelineEntity.getId());
assertEquals(11 - startFrom--, timelineEntity.getIdPrefix());
entity = timelineEntity;
}
return entity;
}
}
Expand Up @@ -181,6 +181,23 @@ public static <T> FilterList createSingleColValueFiltersByRange(
return list;
}

/**
* Creates a HBase {@link SingleColumnValueFilter} with specified column.
* @param <T> Describes the type of column prefix.
* @param column Column which value to be filtered.
* @param value Value to be filtered.
* @param op Compare operator
* @return a SingleColumnValue Filter
* @throws IOException if any exception.
*/
public static <T> Filter createHBaseSingleColValueFilter(Column<T> column,
Object value, CompareOp op) throws IOException {
Filter singleColValFilter = createHBaseSingleColValueFilter(
column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
column.getValueConverter().encodeValue(value), op, true);
return singleColValFilter;
}

/**
* Creates a HBase {@link SingleColumnValueFilter}.
*
Expand Down
Expand Up @@ -33,13 +33,13 @@ public class EntityRowKey {
private final Long flowRunId;
private final String appId;
private final String entityType;
private final long entityIdPrefix;
private final Long entityIdPrefix;
private final String entityId;
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
new EntityRowKeyConverter();

public EntityRowKey(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, long entityIdPrefix,
Long flowRunId, String appId, String entityType, Long entityIdPrefix,
String entityId) {
this.clusterId = clusterId;
this.userId = userId;
Expand Down Expand Up @@ -79,7 +79,7 @@ public String getEntityId() {
return entityId;
}

public long getEntityIdPrefix() {
public Long getEntityIdPrefix() {
return entityIdPrefix;
}

Expand Down Expand Up @@ -180,14 +180,24 @@ public byte[] encode(EntityRowKey rowKey) {
Separator.encode(rowKey.getEntityType(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);

if (rowKey.getEntityIdPrefix() == null) {
return Separator.QUALIFIERS.join(first, second, third, entityType,
Separator.EMPTY_BYTES);
}

byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());

byte[] entityId =
rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS);
if (rowKey.getEntityId() == null) {
return Separator.QUALIFIERS.join(first, second, third, entityType,
enitityIdPrefix, Separator.EMPTY_BYTES);
}

byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);

byte[] fourth =
Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId);

return Separator.QUALIFIERS.join(first, second, third, fourth);
}

Expand Down Expand Up @@ -227,7 +237,7 @@ public EntityRowKey decode(byte[] rowKey) {
Separator.decode(Bytes.toString(rowKeyComponents[5]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);

long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);
Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);

String entityId =
Separator.decode(Bytes.toString(rowKeyComponents[7]),
Expand Down
Expand Up @@ -31,18 +31,20 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
* Creates a prefix which generates the following rowKeyPrefixes for the
* entity table:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
*
* @param clusterId identifying the cluster
* @param userId identifying the user
* @param flowName identifying the flow
* @param flowRunId identifying the individual run of this flow
* @param appId identifying the application
* @param entityType which entity type
* @param entityIdPrefix for entityId
* @param entityId for an entity
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType) {
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null);
Long flowRunId, String appId, String entityType, Long entityIdPrefix,
String entityId) {
super(clusterId, userId, flowName, flowRunId, appId, entityType,
entityIdPrefix, entityId);
}

/**
Expand All @@ -58,8 +60,7 @@ public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId) {
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null);
this(clusterId, userId, flowName, flowRunId, appId, null, null, null);
}

/*
Expand Down
Expand Up @@ -64,7 +64,7 @@ class ApplicationEntityReader extends GenericEntityReader {

public ApplicationEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}

public ApplicationEntityReader(TimelineReaderContext ctxt,
Expand Down
Expand Up @@ -60,7 +60,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {

public FlowActivityEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}

public FlowActivityEntityReader(TimelineReaderContext ctxt,
Expand Down
Expand Up @@ -63,7 +63,7 @@ class FlowRunEntityReader extends TimelineEntityReader {

public FlowRunEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}

public FlowRunEntityReader(TimelineReaderContext ctxt,
Expand Down

0 comments on commit 02a9710

Please sign in to comment.