From dec29c0d1e220d1c78e57992c14dfa3b0ab5ef2d Mon Sep 17 00:00:00 2001 From: Pallavi Rao Date: Tue, 1 Mar 2016 10:16:36 +0530 Subject: [PATCH 1/2] FALCON-1826 Execution order not honoured when instances are KILLED --- .../service/impl/SchedulerService.java | 33 +++++++------------ .../service/SchedulerServiceTest.java | 11 ++++--- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index f5a7c86eb..c5c3a6bb7 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.notification.service.impl; -import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -86,7 +85,6 @@ public class SchedulerService implements FalconNotificationService, Notification private static final StateStore STATE_STORE = AbstractStateStore.get(); - private Cache instancesToIgnore; // TODO : limit the no. of awaiting instances per entity private LoadingCache> executorAwaitedInstances; @@ -96,22 +94,26 @@ public void register(NotificationRequest notifRequest) throws NotificationServic if (request.getInstance() == null) { throw new NotificationServiceException("Request must contain an instance."); } - // When the instance is getting rescheduled for run. As in the case of suspend and resume. - Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId()); - if (obj != null) { - instancesToIgnore.invalidate(request.getInstance().getId()); - } LOG.debug("Received request to schedule instance {} with sequence {}.", request.getInstance().getId(), request.getInstance().getInstanceSequence()); runQueue.execute(new InstanceRunner(request)); } @Override - public void unregister(NotificationHandler handler, ID listenerID) { + public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException { // If ID is that of an entity, do nothing if (listenerID instanceof InstanceID) { - // Not efficient to iterate over elements to remove this. Add to ignore list. - instancesToIgnore.put((InstanceID) listenerID, new Object()); + try { + InstanceID instanceID = (InstanceID) listenerID; + SortedMap instances = executorAwaitedInstances.get(instanceID + .getEntityClusterID()); + synchronized (instances) { + instances.remove(STATE_STORE.getExecutionInstance(instanceID) + .getInstance().getInstanceSequence()); + } + } catch (Exception e) { + throw new NotificationServiceException(e); + } } } @@ -155,10 +157,6 @@ public SortedMap load(EntityClusterID id) throws Exc .removalListener(this) .build(instanceCacheLoader); - instancesToIgnore = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.HOURS) - .concurrencyLevel(1) - .build(); // Interested in all job completion events. JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION) @@ -243,7 +241,6 @@ public PRIORITY getPriority() { @Override public void destroy() throws FalconException { runQueue.shutdownNow(); - instancesToIgnore.invalidateAll(); } private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException { @@ -290,12 +287,6 @@ public ExecutionInstance getInstance() { @Override public void run() { try { - // If de-registered - if (instancesToIgnore.getIfPresent(instance.getId()) != null) { - LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId()); - instancesToIgnore.invalidate(instance.getId()); - return; - } LOG.debug("Received request to run instance {}", instance.getId()); if (checkConditions()) { String externalId = instance.getExternalID(); diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index a7ce7485f..a4427381c 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -242,7 +242,7 @@ public void testSchedulingWithPriorities() throws Exception { public void testDeRegistration() throws Exception { storeEntity(EntityType.PROCESS, "summarize4"); Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4"); - mockProcess.setParallel(3); + mockProcess.setParallel(2); Date startTime = EntityUtil.getStartTime(mockProcess, cluster); ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); // Schedule 3 instances. @@ -263,14 +263,15 @@ public void testDeRegistration() throws Exception { request3.setInstance(instance3); scheduler.register(request3.build()); - // Abort second instance - scheduler.unregister(handler, instance2.getId()); + // Abort third instance + stateStore.putExecutionInstance(new InstanceState(instance3)); + scheduler.unregister(handler, instance3.getId()); Thread.sleep(100); Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); - Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1)); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); // Second instance should not run. - Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), null); } @Test From 9ace53d2d10c909a58a2ae993478a0d3a9d59d8b Mon Sep 17 00:00:00 2001 From: Pallavi Rao Date: Thu, 3 Mar 2016 09:47:57 +0530 Subject: [PATCH 2/2] FALCON-1826 Addressed review comments --- .../notification/service/impl/SchedulerService.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index c5c3a6bb7..635fec4dc 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -107,9 +107,11 @@ public void unregister(NotificationHandler handler, ID listenerID) throws Notifi InstanceID instanceID = (InstanceID) listenerID; SortedMap instances = executorAwaitedInstances.get(instanceID .getEntityClusterID()); - synchronized (instances) { - instances.remove(STATE_STORE.getExecutionInstance(instanceID) - .getInstance().getInstanceSequence()); + if (instances != null && !instances.isEmpty()) { + synchronized (instances) { + instances.remove(STATE_STORE.getExecutionInstance(instanceID) + .getInstance().getInstanceSequence()); + } } } catch (Exception e) { throw new NotificationServiceException(e);