-
Notifications
You must be signed in to change notification settings - Fork 364
/
JobMonitoringCoordinator.java
238 lines (223 loc) · 9.93 KB
/
JobMonitoringCoordinator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.genie.web.tasks.job;
import com.netflix.genie.common.dto.JobExecution;
import com.netflix.genie.core.events.JobFinishedEvent;
import com.netflix.genie.core.events.JobScheduledEvent;
import com.netflix.genie.core.events.JobStartedEvent;
import com.netflix.genie.core.jobs.JobConstants;
import com.netflix.genie.core.services.JobCountService;
import com.netflix.genie.core.services.JobSearchService;
import com.netflix.genie.web.properties.JobOutputMaxProperties;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.Executor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
/**
* A Task to monitor running jobs on a Genie node.
*
* @author tgianos
* @since 3.0.0
*/
@Component
@Slf4j
public class JobMonitoringCoordinator implements JobCountService {
private final Map<String, ScheduledFuture<?>> jobMonitors = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Future<?>> scheduledJobs = Collections.synchronizedMap(new HashMap<>());
private final String hostName;
private final JobSearchService jobSearchService;
private final TaskScheduler scheduler;
private final ApplicationEventPublisher publisher;
private final Executor executor;
private final Registry registry;
private final File jobsDir;
private final JobOutputMaxProperties outputMaxProperties;
private final Counter unableToCancel;
/**
* Constructor.
*
* @param hostName The name of the host this Genie process is running on
* @param jobSearchService The search service to use to find jobs
* @param publisher The event publisher to use to publish events
* @param scheduler The task scheduler to use to register scheduling of job checkers
* @param executor The executor to use to launch processes
* @param registry The metrics registry
* @param jobsDir The directory where job output is stored
* @param outputMaxProperties The properties for the maximum length of job output files
* @throws IOException on error with the filesystem
*/
@Autowired
public JobMonitoringCoordinator(
final String hostName,
final JobSearchService jobSearchService,
final ApplicationEventPublisher publisher,
final TaskScheduler scheduler,
final Executor executor,
final Registry registry,
final Resource jobsDir,
final JobOutputMaxProperties outputMaxProperties
) throws IOException {
this.hostName = hostName;
this.jobSearchService = jobSearchService;
this.publisher = publisher;
this.scheduler = scheduler;
this.executor = executor;
this.registry = registry;
this.jobsDir = jobsDir.getFile();
this.outputMaxProperties = outputMaxProperties;
// Automatically track the number of jobs running on this node
this.registry.mapSize("genie.jobs.running.gauge", this.jobMonitors);
this.registry.mapSize("genie.jobs.scheduled.gauge", this.scheduledJobs);
this.registry.methodValue("genie.jobs.active.gauge", this, "getNumJobs");
this.unableToCancel = registry.counter("genie.jobs.unableToCancel.rate");
}
/**
* When this application is fully up and running this method should be triggered by an event. It will query the
* database to find any jobs already running on this node that aren't in the map. The use case for this is if
* the Genie application crashes when it comes back up it can find the jobs again and not leave them orphaned.
*
* @param event The spring boot application ready event indicating the application is ready to start taking load
*/
@EventListener
public void attachToRunningJobs(final ApplicationReadyEvent event) {
log.info("Application is ready according to event {}. Attempting to re-attach to any running jobs", event);
final Set<JobExecution> executions = this.jobSearchService.getAllRunningJobExecutionsOnHost(this.hostName);
if (executions.isEmpty()) {
log.info("No jobs currently running on this node.");
return;
} else {
log.info("{} jobs currently running on this node at startup", executions.size());
}
for (final JobExecution execution : executions) {
if (this.jobMonitors.containsKey(execution.getId())) {
log.info("Job {} is already being tracked. Ignoring.");
} else {
this.scheduleMonitor(execution);
log.info("Re-attached a job monitor to job {}", execution.getId());
}
}
}
/**
* This event is fired when a job is scheduled to run on this Genie node. We'll track the future here in case
* it needs to be killed while still in INIT state. Once it's running the onJobStarted event will clear it out.
*
* @param event The job scheduled event with information for tracking the job through the INIT stage
*/
@EventListener
public synchronized void onJobScheduled(final JobScheduledEvent event) {
this.scheduledJobs.put(event.getId(), event.getTask());
}
/**
* This event is fired when a job is started on this Genie node. Will create a JobMonitor and schedule it
* for monitoring.
*
* @param event The event of the started job
*/
@EventListener
public synchronized void onJobStarted(final JobStartedEvent event) {
final String jobId = event.getJobExecution().getId();
this.scheduledJobs.remove(jobId);
if (!this.jobMonitors.containsKey(event.getJobExecution().getId())) {
this.scheduleMonitor(event.getJobExecution());
}
}
/**
* When a job is finished this event is fired. This method will cancel the task monitoring the job process.
*
* @param event the event of the finished job
*/
@EventListener
public synchronized void onJobFinished(final JobFinishedEvent event) {
final String jobId = event.getId();
if (this.jobMonitors.containsKey(jobId)) {
//TODO: should we add back off it is unable to cancel?
if (this.jobMonitors.get(jobId).cancel(true)) {
log.debug("Successfully cancelled task monitoring job {}", jobId);
} else {
log.error("Unable to cancel task monitoring job {}", jobId);
this.unableToCancel.increment();
}
this.jobMonitors.remove(jobId);
} else if (this.scheduledJobs.containsKey(jobId)) {
final Future<?> task = this.scheduledJobs.get(jobId);
// If this job setup isn't actually done try killing it
// TODO: If we can't kill should we have back-off?
if (!task.isDone()) {
if (task.cancel(true)) {
log.debug("Successfully cancelled job init task for job {}", jobId);
} else {
log.error("Unable to cancel job init task for job {}", jobId);
this.unableToCancel.increment();
}
}
this.scheduledJobs.remove(jobId);
}
}
/**
* Get the number of jobs currently running on this node.
*
* @return the number of jobs currently running on this node
*/
public synchronized int getNumJobs() {
return this.jobMonitors.size() + this.scheduledJobs.size();
}
private void scheduleMonitor(final JobExecution jobExecution) {
final File stdOut = new File(this.jobsDir, jobExecution.getId() + "/" + JobConstants.STDOUT_LOG_FILE_NAME);
final File stdErr = new File(this.jobsDir, jobExecution.getId() + "/" + JobConstants.STDERR_LOG_FILE_NAME);
final JobMonitor monitor = new JobMonitor(
jobExecution,
stdOut,
stdErr,
this.executor,
this.publisher,
this.registry,
this.outputMaxProperties
);
final ScheduledFuture<?> future;
switch (monitor.getScheduleType()) {
case TRIGGER:
future = this.scheduler.schedule(monitor, monitor.getTrigger());
break;
case FIXED_DELAY:
future = this.scheduler.scheduleWithFixedDelay(monitor, monitor.getFixedDelay());
break;
case FIXED_RATE:
future = this.scheduler.scheduleAtFixedRate(monitor, monitor.getFixedRate());
break;
default:
throw new UnsupportedOperationException("Unknown schedule type: " + monitor.getScheduleType());
}
this.jobMonitors.put(jobExecution.getId(), future);
log.info("Scheduled job monitoring for Job {}", jobExecution.getId());
}
}