Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity #335

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9e68a57
FALCON-298. Feed update with replication delay creates holes
sandeepSamudrala Jul 25, 2016
a94d4fe
rebasing from master
sandeepSamudrala Aug 5, 2016
271318b
FALCON-2097. Adding UT to the new method for getting next instance ti…
sandeepSamudrala Aug 5, 2016
1a4dcd2
rebased and resolved the conflicts from master
sandeepSamudrala Aug 5, 2016
c065566
reverting last line changes made
sandeepSamudrala Aug 5, 2016
1bb8d3c
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Aug 9, 2016
d6dc8bf
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Aug 14, 2016
a178805
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Sep 27, 2016
d0393e9
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Oct 19, 2016
250cc46
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Nov 14, 2016
48f6afa
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Nov 18, 2016
bbca081
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Nov 21, 2016
9cf36e9
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Nov 25, 2016
f96a084
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 1, 2016
e0ad358
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 7, 2016
194f36a
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 15, 2016
0a433fb
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 15, 2016
b1546ed
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 22, 2016
4a2e23e
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 26, 2016
0cf9af6
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 26, 2016
456d4ee
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 27, 2016
089b10d
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 29, 2016
cc28658
Merge branch 'master' of https://github.com/apache/falcon
sandeepSamudrala Dec 30, 2016
3558af3
Merge branch 'master' of https://github.com/apache/falcon into FALCON…
sandeepSamudrala Jan 2, 2017
5cb25e8
FALCON-2229 BacklogEmitterMetricService fix for deleting entities' in…
sandeepSamudrala Jan 3, 2017
22a80b6
FALCON-2229 Incorporated review comments. Renamed the method
sandeepSamudrala Jan 4, 2017
280a079
FALCON-2229 Removed cluster from the named query that deletes all the…
sandeepSamudrala Jan 4, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
@NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.Map;

/**
* Backlog Metric Store for entitties.
* Backlog Metric Store for entities' backlog instances.
*/
public class BacklogMetricStore {

Expand Down Expand Up @@ -70,18 +70,19 @@ public synchronized void deleteMetricInstance(String entityName, String cluster,
q.setParameter("clusterName", cluster);
q.setParameter("nominalTime", nominalTime);
q.setParameter("entityType", entityType.name());
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}

public void deleteEntityInstance(String entityName){
public void deleteEntityBackLogInstances(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
q.setParameter("entityType", entityType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add clusterName also here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. In a colo on removal of an entity, its removed from all the clusters, which means deletion of the instances across the clusters per colo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the query for the PersistenceConstants DELETE_ALL_BACKLOG_ENTITY_INSTANCES expects three variables.
query = delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"
Which means from there you need to remove clusterName.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it. I had missed it. I will remove cluster from the named query.

try {
q.executeUpdate();
} finally {
Expand Down Expand Up @@ -110,7 +111,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (CollectionUtils.isEmpty(result)) {
return null;
}
} finally{
} finally {
entityManager.close();
}

Expand All @@ -121,7 +122,7 @@ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
if (!backlogMetrics.containsKey(entity)) {
backlogMetrics.put(entity, new ArrayList<MetricInfo>());
}
List<MetricInfo> metrics = backlogMetrics.get(entity);
List<MetricInfo> metrics = backlogMetrics.get(entity);
MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
.format(backlogMetricBean.getNominalTime()),
backlogMetricBean.getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -82,7 +81,7 @@ public final class BacklogMetricEmitterService implements FalconService,
Services.get().getService(MetricNotificationService.SERVICE_NAME);

private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

public static BacklogMetricEmitterService get() {
return SERVICE;
Expand All @@ -107,18 +106,18 @@ protected SimpleDateFormat initialValue() {
private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();

@Override
public void onAdd(Entity entity) throws FalconException{
public void onAdd(Entity entity) throws FalconException {
addToBacklog(entity);
}

@Override
public void onRemove(Entity entity) throws FalconException{
if (entity.getEntityType() != EntityType.PROCESS){
public void onRemove(Entity entity) throws FalconException {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() != null) {
backlogMetricStore.deleteEntityInstance(entity.getName());
backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for (Cluster cluster : process.getClusters().getClusters()) {
Expand All @@ -127,7 +126,7 @@ public void onRemove(Entity entity) throws FalconException{
}
}

public void dropMetric(String clusterName, Process process){
private void dropMetric(String clusterName, Process process) {
String pipelinesStr = process.getPipelines();
String metricName;

Expand All @@ -144,15 +143,15 @@ public void dropMetric(String clusterName, Process process){
}

@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
if (oldEntity.getEntityType() != EntityType.PROCESS){
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
if (oldEntity.getEntityType() != EntityType.PROCESS) {
return;
}
Process newProcess = (Process) newEntity;
Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null) {
if (oldProcess.getSla() != null) {
backlogMetricStore.deleteEntityInstance(newProcess.getName());
backlogMetricStore.deleteEntityBackLogInstances(newProcess.getName(), newEntity.getEntityType().name());
entityBacklogs.remove(newProcess);
for (Cluster cluster : oldProcess.getClusters().getClusters()) {
dropMetric(cluster.getName(), oldProcess);
Expand All @@ -164,16 +163,16 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
}

@Override
public void onReload(Entity entity) throws FalconException{
public void onReload(Entity entity) throws FalconException {
addToBacklog(entity);
}

public void addToBacklog(Entity entity) {
private void addToBacklog(Entity entity) {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() == null){
if (process.getSla() == null) {
return;
}
entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
Expand Down Expand Up @@ -277,9 +276,9 @@ public void onWait(WorkflowExecutionContext context) throws FalconException {
}

/**
* Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
* Service that executes backlog evaluation and publishes metrics to Graphite for entities in parallel.
*/
public static class BacklogMetricEmitter implements Runnable {
private static class BacklogMetricEmitter implements Runnable {
private ThreadPoolExecutor executor;

@Override
Expand Down Expand Up @@ -311,9 +310,9 @@ private void waitForFuturesToComplete(List<Future> futures) {
}

/**
* Service which calculates backlog for given entity and publish to graphite.
* Service that calculates backlog for given entity and publishes them to graphite.
*/
public static class BacklogCalcService implements Runnable {
private static class BacklogCalcService implements Runnable {

private Entity entityObj;
private List<MetricInfo> metrics;
Expand All @@ -329,18 +328,17 @@ public void run() {
MetricInfo metricInfo = null;
HashMap<String, Long> backLogsCluster = new HashMap<>();
synchronized (metrics) {
if (metrics.isEmpty()){
Process process = (Process)entityObj;
if (metrics.isEmpty()) {
Process process = (Process) entityObj;
Clusters clusters = process.getClusters();
for (Cluster cluster : clusters.getClusters()){
for (Cluster cluster : clusters.getClusters()) {
publishBacklog(process, cluster.getName(), 0L);
}
}else{
} else {
long currentTime = System.currentTimeMillis();
Iterator iter = metrics.iterator();
while (iter.hasNext()) {
for (MetricInfo metric : metrics) {
try {
metricInfo = (MetricInfo) iter.next();
metricInfo = metric;
long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
? backLogsCluster.get(metricInfo.getCluster()) : 0;
Expand All @@ -366,7 +364,7 @@ public void run() {
}


public static void publishBacklog(Process process, String clusterName, Long backlog){
private static void publishBacklog(Process process, String clusterName, Long backlog) {
String pipelinesStr = process.getPipelines();
String metricName;

Expand All @@ -382,19 +380,17 @@ public static void publishBacklog(Process process, String clusterName, Long back
}
}

public static String getMetricName(String clusterName, String processName, String pipeline){
String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
private static String getMetricName(String clusterName, String processName, String pipeline) {
return METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ METRIC_SEPARATOR + processName + METRIC_SEPARATOR
+ "backlogInMins";
return metricName;
}

/**
* Service runs periodically and removes succeeded instances from backlog list.
*/
public static class BacklogCheckService implements Runnable {

private static class BacklogCheckService implements Runnable {
@Override
public void run() {
LOG.trace("BacklogCheckService running for entities");
Expand All @@ -414,7 +410,7 @@ public void run() {
authenticateUser(entity);
if (wfEngine.isMissing(entity)) {
LOG.info("Entity of name {} was deleted so removing instance of "
+ "nominaltime {} ", entity.getName(), nominalTimeStr);
+ "nominal time {} ", entity.getName(), nominalTimeStr);
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
iterator.remove();
Expand Down Expand Up @@ -444,7 +440,7 @@ public void run() {
}
}

private static void authenticateUser(Entity entity){
private static void authenticateUser(Entity entity) {
if (!CurrentUser.isAuthenticated()) {
if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
CurrentUser.authenticate(entity.getACL().getOwner());
Expand All @@ -453,5 +449,4 @@ private static void authenticateUser(Entity entity){
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -80,12 +80,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList

private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();

public static final String TAG_CRITICAL = "Missed-SLA-High";
public static final String TAG_WARN = "Missed-SLA-Low";
static final String TAG_CRITICAL = "Missed-SLA-High";
static final String TAG_WARN = "Missed-SLA-Low";
private static final long MINUTE_DELAY = 60000L;

private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));

private EntitySLAMonitoringService() {

Expand Down Expand Up @@ -176,7 +176,7 @@ private void startEntityMonitoring(Entity entity, boolean isEntityUpdated) throw
}
}

public Boolean checkFeedClusterSLA(Feed feed){
private Boolean checkFeedClusterSLA(Feed feed){
for(Cluster cluster : feed.getClusters().getClusters()){
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null){
Expand All @@ -187,7 +187,7 @@ public Boolean checkFeedClusterSLA(Feed feed){
}


public Boolean checkProcessClusterSLA(Process process){
private Boolean checkProcessClusterSLA(Process process){
Clusters clusters = process.getClusters();
for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
Expand Down Expand Up @@ -292,7 +292,7 @@ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException
}
}

void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
private void updatePendingInstances(String entityName, List<String> slaRemovedClusters, String entityType){
for(String clusterName :slaRemovedClusters){
MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
entityType);
Expand Down Expand Up @@ -384,7 +384,7 @@ private void addPendingInstances(String entityType, Entity entity,
}
}

void addPendingEntityInstances(Date checkPointTime) throws FalconException {
private void addPendingEntityInstances(Date checkPointTime) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
Expand Down Expand Up @@ -611,8 +611,8 @@ Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Dat
return result;
}

Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
Date end, List<Date> missingInstances) throws FalconException {
private Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
Date end, List<Date> missingInstances) throws FalconException {
Date now = new Date();
Frequency slaHigh = sla.getShouldEndIn();
Set<Pair<Date, String>> result = new HashSet<>();
Expand Down
Loading