Skip to content

Commit

Permalink
refactor #1165, #1386
Browse files Browse the repository at this point in the history
  • Loading branch information
vrindanayak committed Jul 25, 2018
1 parent 58b8de9 commit cf10dc3
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,21 @@ private int rescheduleOnDistinctDevices(QueueMessage.Status status, Predicate ma
return count;
}

private int rescheduleTasks(Predicate matchQueueMessage, Predicate matchDiffTask) throws Exception {
private int rescheduleTasks(Predicate matchQueueMessage, Predicate matchDiffTask) {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.RescheduleTasks);
try {
int count = 0;
try (DiffTaskQuery diffTasks = diffService.listDiffTasks(matchQueueMessage, matchDiffTask,
null, 0, 0)) {
for (DiffTask task : diffTasks) {
diffService.rescheduleDiffTask(task);
count++;
}
}
LOG.info("Successfully rescheduled {} tasks on device: {}.", count, device.getDeviceName());
queueEvent.setCount(count);
int rescheduled = 0;
int count;
int rescheduleTasksFetchSize = queueTasksFetchSize();
do {
List<String> diffTaskQueueMsgIDs = diffService.listDiffTaskQueueMsgIDs(matchQueueMessage, matchDiffTask, rescheduleTasksFetchSize);
for (String diffTaskQueueMsgID : diffTaskQueueMsgIDs)
diffService.rescheduleDiffTask(diffTaskQueueMsgID);
count = diffTaskQueueMsgIDs.size();
rescheduled += count;
} while (count >= rescheduleTasksFetchSize);
LOG.info("Successfully rescheduled {} tasks on device: {}.", rescheduled, device.getDeviceName());
queueEvent.setCount(rescheduled);
return count;
} catch (Exception e) {
queueEvent.setException(e);
Expand All @@ -348,7 +350,7 @@ public String deleteTasks() {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.DeleteTasks);
int deleted = 0;
int count;
int deleteTasksFetchSize = device.getDeviceExtensionNotNull(ArchiveDeviceExtension.class).getQueueTasksFetchSize();
int deleteTasksFetchSize = queueTasksFetchSize();
do {
count = diffService.deleteTasks(
matchQueueMessage(status(), deviceName, null),
Expand Down Expand Up @@ -512,4 +514,8 @@ private Predicate matchQueueMessage(QueueMessage.Status status, String devName,
null, devName, status, batchID, null, null, updatedTime, null);
}

private int queueTasksFetchSize() {
return device.getDeviceExtensionNotNull(ArchiveDeviceExtension.class).getQueueTasksFetchSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ long cancelDiffTasks(Predicate matchQueueMessage, Predicate matchDiffTask, Queue

void rescheduleDiffTask(Long pk, QueueMessageEvent queueEvent);

void rescheduleDiffTask(DiffTask diffTask);
void rescheduleDiffTask(String diffTaskQueueMsgId);

String findDeviceNameByPk(Long pk);

Expand All @@ -99,4 +99,6 @@ long cancelDiffTasks(Predicate matchQueueMessage, Predicate matchDiffTask, Queue
int deleteTasks(Predicate matchQueueMessage, Predicate matchDiffTask, int deleteTasksFetchSize);

List<String> listDistinctDeviceNames(Predicate matchQueueMessage, Predicate matchDiffTask);

List<String> listDiffTaskQueueMsgIDs(Predicate matchQueueMessage, Predicate matchDiffTask, int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,19 @@ public void rescheduleDiffTask(Long pk, QueueMessageEvent queueEvent) {
if (task == null)
return;

rescheduleDiffTask(task, queueEvent);
LOG.info("Reschedule {}", task);
rescheduleDiffTask(task.getQueueMessage().getMessageID(), queueEvent);
}

public void rescheduleDiffTask(DiffTask diffTask, QueueMessageEvent queueEvent) {
LOG.info("Reschedule {}", diffTask);
queueManager.rescheduleTask(diffTask.getQueueMessage().getMessageID(), DiffService.QUEUE_NAME, queueEvent);
public void rescheduleDiffTask(String msgId, QueueMessageEvent queueEvent) {
queueManager.rescheduleTask(msgId, DiffService.QUEUE_NAME, queueEvent);
}

public List<String> listDiffTaskQueueMsgIDs(Predicate matchQueueMsg, Predicate matchDiffTask, int limit) {
return createQuery(matchQueueMsg, matchDiffTask)
.select(QQueueMessage.queueMessage.messageID)
.limit(limit)
.fetch();
}

public String findDeviceNameByPk(Long pk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ public void rescheduleDiffTask(Long pk, QueueMessageEvent queueEvent) {
}

@Override
public void rescheduleDiffTask(DiffTask diffTask) {
ejb.rescheduleDiffTask(diffTask, null);
public void rescheduleDiffTask(String diffTaskQueueMsgId) {
ejb.rescheduleDiffTask(diffTaskQueueMsgId, null);
}

@Override
public List<String> listDiffTaskQueueMsgIDs(Predicate matchQueueMessage, Predicate matchDiffTask, int limit) {
return ejb.listDiffTaskQueueMsgIDs(matchQueueMessage, matchDiffTask, limit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ long cancelRetrieveTasks(Predicate matchQueueMessage, Predicate matchRetrieveTas

void rescheduleRetrieveTask(Long pk, QueueMessageEvent queueEvent);

void rescheduleRetrieveTask(RetrieveTask task);
void rescheduleRetrieveTask(String retrieveTaskQueueMsgId);

int deleteTasks(Predicate matchQueueMessage, Predicate matchRetrieveTask, int deleteTasksFetchSize);

List<String> listDistinctDeviceNames(Predicate matchQueueMessage, Predicate matchRetrieveTask);

List<RetrieveBatch> listRetrieveBatches(Predicate matchQueueBatch, Predicate matchRetrieveBatch,
OrderSpecifier<Date> order, int offset, int limit);

List<String> listRetrieveTaskQueueMsgIDs(Predicate matchQueueMessage, Predicate matchRetrieveTask, int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,19 @@ public void rescheduleRetrieveTask(Long pk, QueueMessageEvent queueEvent) {
if (task == null)
return;

rescheduleRetrieveTask(task, queueEvent);
LOG.info("Reschedule {}", task);
rescheduleRetrieveTask(task.getQueueMessage().getMessageID(), queueEvent);
}

public void rescheduleRetrieveTask(RetrieveTask task, QueueMessageEvent queueEvent) {
LOG.info("Reschedule {}", task);
queueManager.rescheduleTask(task.getQueueMessage().getMessageID(), RetrieveManager.QUEUE_NAME, queueEvent);
public void rescheduleRetrieveTask(String retrieveTaskQueueMsgId, QueueMessageEvent queueEvent) {
queueManager.rescheduleTask(retrieveTaskQueueMsgId, RetrieveManager.QUEUE_NAME, queueEvent);
}

public List<String> listRetrieveTaskQueueMsgIDs(Predicate matchQueueMsg, Predicate matchRetrieveTask, int limit) {
return createQuery(matchQueueMsg, matchRetrieveTask)
.select(QQueueMessage.queueMessage.messageID)
.limit(limit)
.fetch();
}

public int deleteTasks(Predicate matchQueueMessage, Predicate matchRetrieveTask, int deleteTasksFetchSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ public void rescheduleRetrieveTask(Long pk, QueueMessageEvent queueEvent) {
}

@Override
public void rescheduleRetrieveTask(RetrieveTask task) {
ejb.rescheduleRetrieveTask(task, null);
public void rescheduleRetrieveTask(String retrieveTaskQueueMsgId) {
ejb.rescheduleRetrieveTask(retrieveTaskQueueMsgId, null);
}

@Override
Expand All @@ -207,4 +207,9 @@ public List<RetrieveBatch> listRetrieveBatches(Predicate matchQueueBatch, Predic
OrderSpecifier<Date> order, int offset, int limit) {
return ejb.listRetrieveBatches(matchQueueBatch, matchRetrieveBatch, order, offset, limit);
}

@Override
public List<String> listRetrieveTaskQueueMsgIDs(Predicate matchQueueMessage, Predicate matchRetrieveTask, int limit) {
return ejb.listRetrieveTaskQueueMsgIDs(matchQueueMessage, matchRetrieveTask, limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,21 @@ private int rescheduleOnDistinctDevices(QueueMessage.Status status, Predicate ma
return count;
}

private int rescheduleTasks(Predicate matchQueueMessage, Predicate matchRetrieveTask) throws Exception {
private int rescheduleTasks(Predicate matchQueueMessage, Predicate matchRetrieveTask) {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.RescheduleTasks);
try {
int count = 0;
try (RetrieveTaskQuery retrieveTasks = mgr.listRetrieveTasks(matchQueueMessage, matchRetrieveTask,
null, 0, 0)) {
for (RetrieveTask task : retrieveTasks) {
mgr.rescheduleRetrieveTask(task);
count++;
}
}
queueEvent.setCount(count);
LOG.info("Successfully rescheduled {} tasks on device: {}.", count, device.getDeviceName());
int rescheduled = 0;
int count;
int rescheduleTasksFetchSize = queueTasksFetchSize();
do {
List<String> retrieveTaskQueueMsgIDs = mgr.listRetrieveTaskQueueMsgIDs(matchQueueMessage, matchRetrieveTask, rescheduleTasksFetchSize);
for (String retrieveTaskQueueMsgID : retrieveTaskQueueMsgIDs)
mgr.rescheduleRetrieveTask(retrieveTaskQueueMsgID);
count = retrieveTaskQueueMsgIDs.size();
rescheduled += count;
} while (count >= rescheduleTasksFetchSize);
queueEvent.setCount(rescheduled);
LOG.info("Successfully rescheduled {} tasks on device: {}.", rescheduled, device.getDeviceName());
return count;
} catch (Exception e) {
queueEvent.setException(e);
Expand All @@ -316,7 +318,7 @@ public String deleteTasks() {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.DeleteTasks);
int deleted = 0;
int count;
int deleteTasksFetchSize = device.getDeviceExtensionNotNull(ArchiveDeviceExtension.class).getQueueTasksFetchSize();
int deleteTasksFetchSize = queueTasksFetchSize();
do {
count = mgr.deleteTasks(
matchQueueMessage(status(), deviceName, null),
Expand Down Expand Up @@ -466,4 +468,8 @@ private Response errResponseAsTextPlain(Exception e) {
String exceptionAsString = sw.toString();
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(exceptionAsString).type("text/plain").build();
}

private int queueTasksFetchSize() {
return device.getDeviceExtensionNotNull(ArchiveDeviceExtension.class).getQueueTasksFetchSize();
}
}

0 comments on commit cf10dc3

Please sign in to comment.