Skip to content

Commit

Permalink
Implemented JPAPersistenceManager for jBatch
Browse files Browse the repository at this point in the history
Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Mar 21, 2020
1 parent 9c228de commit d202d34
Show file tree
Hide file tree
Showing 21 changed files with 1,975 additions and 59 deletions.
Expand Up @@ -266,7 +266,7 @@ public void resumeJobExecution(KapuaId scopeId, KapuaId jobId, KapuaId jobExecut
}

//
// Stop the JobExecution
// Resume the JobExecution
try {
JbatchDriver.resumeJob(scopeId, jobId, jobExecutionId);
} catch (Exception e) {
Expand Down
Expand Up @@ -325,73 +325,46 @@ public static void cleanJobData(@NotNull KapuaId scopeId, @NotNull KapuaId jobId
// Private methods
//
private static List<JobExecution> getRunningJobExecutions(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
int attempt = 0;
int maxAttempt = 3;
do {
try {
return getJobExecutions(scopeId, jobId).stream().filter(je -> JbatchJobRunningStatuses.getStatuses().contains(je.getBatchStatus())).collect(Collectors.toList());
} catch (Exception e) {
if (attempt++ < maxAttempt) {
LOG.error("Error while getting running executions... ({}/{}). Error: {}", attempt, maxAttempt, e.getMessage());
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
LOG.error("Error while getting running executions... ({}/{}). Throwing error!", attempt, maxAttempt, e);
throw e;
}
}
} while (true);
return getJobExecutions(scopeId, jobId).stream().filter(je -> JbatchJobRunningStatuses.getStatuses().contains(je.getBatchStatus())).collect(Collectors.toList());
}

private static List<JobExecution> getJobExecutions(@NotNull KapuaId scopeId, @NotNull KapuaId jobId) {
String jobName = getJbatchJobName(scopeId, jobId);

int attempt = 0;
int maxAttempt = 3;
do {
// Get all JobInstances with this name
List<JobInstance> jobInstances;
try {
jobInstances = JOB_OPERATOR.getJobInstances(jobName, 0, Integer.MAX_VALUE);
} catch (NoSuchJobException nsje) {
LOG.warn("Error while getting JobInstance by name: {}. Exception: {}: {}", jobName, nsje.getClass().getSimpleName(), nsje.getMessage());
return Collections.emptyList();
} catch (NullPointerException npe) {
LOG.error("Unexpected NPE!", npe);
return Collections.emptyList();
}

// For each JobInstance get its JobExecutions
List<JobExecution> jobExecutions = new ArrayList<>();
for (JobInstance ji : jobInstances) {
try {
int jobInstanceCount = JOB_OPERATOR.getJobInstanceCount(jobName);
List<JobInstance> jobInstances = JOB_OPERATOR.getJobInstances(jobName, 0, jobInstanceCount);

List<JobExecution> jobExecutions = new ArrayList<>();
jobInstances.forEach(ji -> jobExecutions.addAll(getJbatchJobExecutions(ji)));

return jobExecutions;
} catch (NoSuchJobException e) {
LOG.debug("Error while getting Job: " + jobName, e);
// This exception is thrown when there is no job, this means that the job never run before
// So we can ignore it and return `null`
} catch (NoSuchJobExecutionException e) {
LOG.debug("Error while getting execution status for Job: " + jobName, e);
// This exception is thrown when there is no execution is running.
// So we can ignore it and return `null`
} catch (Exception e) {
if (attempt++ < maxAttempt) {
LOG.error("Error while getting executions... ({}/{}). Error: {}", attempt, maxAttempt, e.getMessage());
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
LOG.error("Error while getting executions... ({}/{}). Throwing error!", attempt, maxAttempt, e);
throw e;
}
jobExecutions.addAll(getJbatchJobExecutions(ji));
} catch (NoSuchJobInstanceException nsjie) {
LOG.warn("Error while getting JobExecutions by JobInstance: {}. Exception: {}: {}. Continuing with other JobInstances", ji.getInstanceId(), nsjie.getClass().getSimpleName(), nsjie.getMessage());
} catch (NullPointerException npe) {
LOG.error("Unexpected NPE!", npe);
}
} while (true);
}

return jobExecutions;
}

private static List<JobExecution> getJbatchJobExecutions(@NotNull JobInstance jobInstance) {
try {
return JOB_OPERATOR.getJobExecutions(jobInstance);
} catch (NoSuchJobInstanceException e) {
LOG.debug("Error while getting Job Instance: " + jobInstance.getInstanceId(), e);
} catch (NoSuchJobInstanceException nsjie) {
LOG.warn("Error while getting JobExecutions by JobInstance: {}. Exception {}: {}. Ignoring exception...", jobInstance.getInstanceId(), nsjie.getClass().getSimpleName(), nsjie.getMessage());
// This exception is thrown when there is no job instance, this means that the job never run before
// So we can ignore it and return `null`
// So we can ignore it and return an empty `List<>`
}

return Collections.emptyList();
Expand Down
Expand Up @@ -71,11 +71,12 @@ public void run() {

QueuedJobExecutionListResult queuedJobExecutions = KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.query(query));

int i = 1;
int i = 0;
int failedToResumeExecution = 0;
for (QueuedJobExecution qje : queuedJobExecutions.getItems()) {
Thread.sleep(JOB_ENGINE_SETTING.getInt(JobEngineSettingKeys.JOB_ENGINE_QUEUE_PROCESSING_RUN_DELAY));

LOG.info("Resuming Job Execution ({}/{}): {}...", i, queuedJobExecutions.getSize(), qje.getJobExecutionId());
LOG.info("Resuming Job Execution ({}/{}): {}...", ++i, queuedJobExecutions.getSize(), qje.getJobExecutionId());

try {
KapuaSecurityUtils.doPrivileged(() -> JOB_ENGINE_SERVICE.resumeJobExecution(qje.getScopeId(), qje.getJobId(), qje.getJobExecutionId()));
Expand All @@ -84,14 +85,16 @@ public void run() {
KapuaSecurityUtils.doPrivileged(() -> QUEUED_JOB_EXECUTION_SERVICE.update(qje));
} catch (Exception e) {
LOG.error("Resuming Job Execution ({}/{}): {}... ERROR!", i, queuedJobExecutions.getSize(), qje.getJobExecutionId(), e);
failedToResumeExecution++;
continue;
}

LOG.info("Resuming Job Execution ({}/{}): {}... DONE!", i++, queuedJobExecutions.getSize(), qje.getJobExecutionId());
LOG.info("Resuming Job Execution ({}/{}): {}... DONE!", i, queuedJobExecutions.getSize(), qje.getJobExecutionId());
}

LOG.info("Checking Job Execution queue for: {}... DONE! Queued job failed to resume: {}.", jobExecutionId, failedToResumeExecution);
} catch (Exception e) {
LOG.error("Checking Job Execution queue for: {}... ERROR!", jobExecutionId, e);
}
LOG.info("Checking Job Execution queue for: {}... DONE!", jobExecutionId);
}
}

0 comments on commit d202d34

Please sign in to comment.