Skip to content

Commit

Permalink
YARN-4074. [timeline reader] implement support for querying for flows…
Browse files Browse the repository at this point in the history
… and flow runs (sjlee via vrushali)
  • Loading branch information
Vrushali authored and sjlee committed Jul 10, 2016
1 parent a68e383 commit 10fa6da
Show file tree
Hide file tree
Showing 24 changed files with 1,931 additions and 593 deletions.
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;

import java.util.Collection;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;

import javax.xml.bind.annotation.XmlElement;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;

/**
* Entity that represents a record for flow activity. It's essentially a
* container entity for flow runs with limited information.
*/
@Public
@Unstable
public class FlowActivityEntity extends TimelineEntity {
public static final String CLUSTER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER";
public static final String DATE_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE";
public static final String USER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
public static final String FLOW_NAME_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";

private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>();

public FlowActivityEntity() {
super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
// set config to null
setConfigs(null);
}

public FlowActivityEntity(String cluster, long time, String user,
String flowName) {
this();
setCluster(cluster);
setDate(time);
setUser(user);
setFlowName(flowName);
}

public FlowActivityEntity(TimelineEntity entity) {
super(entity);
if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) {
throw new IllegalArgumentException("Incompatible entity type: " +
getId());
}
// set config to null
setConfigs(null);
}

@XmlElement(name = "id")
@Override
public String getId() {
// flow activity: cluster/day/user@flow_name
String id = super.getId();
if (id == null) {
StringBuilder sb = new StringBuilder();
sb.append(getCluster());
sb.append('/');
sb.append(getDate().getTime());
sb.append('/');
sb.append(getUser());
sb.append('@');
sb.append(getFlowName());
id = sb.toString();
setId(id);
}
return id;
}

@Override
public int compareTo(TimelineEntity entity) {
int comparison = getType().compareTo(entity.getType());
if (comparison == 0) {
// order by cluster, date (descending), user, and flow name
FlowActivityEntity other = (FlowActivityEntity)entity;
int clusterComparison = getCluster().compareTo(other.getCluster());
if (clusterComparison != 0) {
return clusterComparison;
}
int dateComparisonDescending =
(int)(other.getDate().getTime() - getDate().getTime()); // descending
if (dateComparisonDescending != 0) {
return dateComparisonDescending; // descending
}
int userComparison = getUser().compareTo(other.getUser());
if (userComparison != 0) {
return userComparison;
}
return getFlowName().compareTo(other.getFlowName());
} else {
return comparison;
}
}

/**
* Reuse the base class equals method.
*/
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}

/**
* Reuse the base class hashCode method.
*/
@Override
public int hashCode() {
return super.hashCode();
}

public String getCluster() {
return (String)getInfo().get(CLUSTER_INFO_KEY);
}

public void setCluster(String cluster) {
addInfo(CLUSTER_INFO_KEY, cluster);
}

public Date getDate() {
return (Date)getInfo().get(DATE_INFO_KEY);
}

public void setDate(long time) {
Date date = new Date(time);
addInfo(DATE_INFO_KEY, date);
}

public String getUser() {
return (String)getInfo().get(USER_INFO_KEY);
}

public void setUser(String user) {
addInfo(USER_INFO_KEY, user);
}

public String getFlowName() {
return (String)getInfo().get(FLOW_NAME_INFO_KEY);
}

public void setFlowName(String flowName) {
addInfo(FLOW_NAME_INFO_KEY, flowName);
}

public void addFlowRun(FlowRunEntity run) {
flowRuns.add(run);
}

public void addFlowRuns(Collection<FlowRunEntity> runs) {
flowRuns.addAll(runs);
}

@XmlElement(name = "flowruns")
public NavigableSet<FlowRunEntity> getFlowRuns() {
return flowRuns;
}

public int getNumberOfRuns() {
return flowRuns.size();
}
}
Expand Up @@ -17,14 +17,14 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;

import javax.xml.bind.annotation.XmlElement;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import javax.xml.bind.annotation.XmlElement;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class FlowEntity extends HierarchicalTimelineEntity {
public class FlowRunEntity extends HierarchicalTimelineEntity {
public static final String USER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
public static final String FLOW_NAME_INFO_KEY =
Expand All @@ -33,31 +33,35 @@ public class FlowEntity extends HierarchicalTimelineEntity {
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION";
public static final String FLOW_RUN_ID_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID";
public static final String FLOW_RUN_END_TIME =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME";

public FlowEntity() {
super(TimelineEntityType.YARN_FLOW.toString());
public FlowRunEntity() {
super(TimelineEntityType.YARN_FLOW_RUN.toString());
// set config to null
setConfigs(null);
}

public FlowEntity(TimelineEntity entity) {
public FlowRunEntity(TimelineEntity entity) {
super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) {
if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId());
}
// set config to null
setConfigs(null);
}

@XmlElement(name = "id")
@Override
public String getId() {
//Flow id schema: user@flow_name(or id)/version/run_id
//Flow id schema: user@flow_name(or id)/run_id
String id = super.getId();
if (id == null) {
StringBuilder sb = new StringBuilder();
sb.append(getInfo().get(USER_INFO_KEY).toString());
sb.append('@');
sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
sb.append('/');
sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString());
sb.append('/');
sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
id = sb.toString();
setId(id);
Expand All @@ -66,26 +70,23 @@ public String getId() {
}

public String getUser() {
Object user = getInfo().get(USER_INFO_KEY);
return user == null ? null : user.toString();
return (String)getInfo().get(USER_INFO_KEY);
}

public void setUser(String user) {
addInfo(USER_INFO_KEY, user);
}

public String getName() {
Object name = getInfo().get(FLOW_NAME_INFO_KEY);
return name == null ? null : name.toString();
return (String)getInfo().get(FLOW_NAME_INFO_KEY);
}

public void setName(String name) {
addInfo(FLOW_NAME_INFO_KEY, name);
}

public String getVersion() {
Object version = getInfo().get(FLOW_VERSION_INFO_KEY);
return version == null ? null : version.toString();
return (String)getInfo().get(FLOW_VERSION_INFO_KEY);
}

public void setVersion(String version) {
Expand All @@ -100,4 +101,21 @@ public long getRunId() {
public void setRunId(long runId) {
addInfo(FLOW_RUN_ID_INFO_KEY, runId);
}

public long getStartTime() {
return getCreatedTime();
}

public void setStartTime(long startTime) {
setCreatedTime(startTime);
}

public long getMaxEndTime() {
Object time = getInfo().get(FLOW_RUN_END_TIME);
return time == null ? 0L : (Long)time;
}

public void setMaxEndTime(long endTime) {
addInfo(FLOW_RUN_END_TIME, endTime);
}
}
Expand Up @@ -24,21 +24,25 @@
@InterfaceStability.Unstable
public enum TimelineEntityType {
YARN_CLUSTER,
YARN_FLOW,
YARN_FLOW_RUN,
YARN_APPLICATION,
YARN_APPLICATION_ATTEMPT,
YARN_CONTAINER,
YARN_USER,
YARN_QUEUE;
YARN_QUEUE,
YARN_FLOW_ACTIVITY;

/**
* Whether the input type can be a parent of this entity.
*/
public boolean isParent(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
return false;
case YARN_FLOW:
return YARN_FLOW == type || YARN_CLUSTER == type;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION:
return YARN_FLOW == type || YARN_CLUSTER == type;
return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_APPLICATION == type;
case YARN_CONTAINER:
Expand All @@ -50,12 +54,15 @@ public boolean isParent(TimelineEntityType type) {
}
}

/**
* Whether the input type can be a child of this entity.
*/
public boolean isChild(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
return YARN_FLOW == type || YARN_APPLICATION == type;
case YARN_FLOW:
return YARN_FLOW == type || YARN_APPLICATION == type;
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_FLOW_RUN:
return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_APPLICATION:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_APPLICATION_ATTEMPT:
Expand All @@ -68,4 +75,12 @@ public boolean isChild(TimelineEntityType type) {
return false;
}
}

/**
* Whether the type of this entity matches the type indicated by the input
* argument.
*/
public boolean matches(String typeString) {
return toString().equals(typeString);
}
}

0 comments on commit 10fa6da

Please sign in to comment.