Skip to content

Commit

Permalink
ninja operation progress handling improved
Browse files Browse the repository at this point in the history
  • Loading branch information
1azyman committed Mar 14, 2018
1 parent 95f0112 commit 9f0e5e5
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 196 deletions.
Expand Up @@ -3,9 +3,9 @@
import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.opts.ConnectionOptions;
import com.evolveum.midpoint.ninja.util.CountStatus;
import com.evolveum.midpoint.ninja.util.Log;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.Objectable;
import com.evolveum.midpoint.prism.PrismConstants;
import com.evolveum.midpoint.prism.query.ObjectPaging;
Expand Down Expand Up @@ -50,45 +50,23 @@ protected LogTarget getInfoLogTarget() {
return LogTarget.SYSTEM_OUT;
}

protected void handleResultOnFinish(OperationResult result, CountStatus status, String finishMessage) {
result.recomputeStatus();
protected void handleResultOnFinish(OperationStatus operation, String finishMessage) {
operation.finish();

double totalTime = ((double) (System.currentTimeMillis() - status.getStart())) / 1000;
OperationResult result = operation.getResult();
result.recomputeStatus();

if (result.isAcceptable()) {
if (status == null) {
log.info("{}", finishMessage);
} else {
log.info("{}. Processed: {} objects, time {}s, avg. {}ms",
finishMessage, status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(totalTime),
NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
}
log.info("{}. {}", finishMessage, operation.print());
} else {
if (status == null) {
log.error("{}", finishMessage);
} else {
log.error("{} with some problems, reason: {}. Processed: {}, time{}s, avg. {}ms", finishMessage,
result.getMessage(), status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(totalTime),
NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
}
log.error("{} with some problems, reason: {}. {}", finishMessage, result.getMessage(), operation.print());

if (context.isVerbose()) {
log.error("Full result\n{}", result.debugDumpLazily());
}
}
}

protected void logCountProgress(CountStatus status) {
if (status.getLastPrintout() + NinjaUtils.COUNT_STATUS_LOG_INTERVAL > System.currentTimeMillis()) {
return;
}

log.info("Processed: {}, skipped: {}, avg: {}ms",
status.getCount(), status.getSkipped(), NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));

status.lastPrintoutNow();
}

public abstract void execute() throws Exception;

// deduplicate with WebModelServiceUtils.addIncludeOptionsForExportOrView
Expand Down
@@ -1,9 +1,8 @@
package com.evolveum.midpoint.ninja.action;

import com.evolveum.midpoint.ninja.impl.NinjaException;
import com.evolveum.midpoint.ninja.opts.DeleteOptions;
import com.evolveum.midpoint.ninja.util.CountStatus;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.query.InOidFilter;
import com.evolveum.midpoint.prism.query.ObjectQuery;
Expand Down Expand Up @@ -61,28 +60,28 @@ private void deleteByOid() throws SchemaException, IOException {
private void deleteByFilter(ObjectQuery query) throws SchemaException, IOException {
OperationResult result = new OperationResult(OPERATION_DELETE);

CountStatus status = new CountStatus();
status.start();
OperationStatus operation = new OperationStatus(result);
operation.start();

log.info("Starting delete");

ObjectTypes type = options.getType();
if (type != null) {
deleteByFilter(type, query, status, result);
deleteByFilter(type, query, operation, result);
} else {
for (ObjectTypes t : ObjectTypes.values()) {
if (Modifier.isAbstract(t.getClassDefinition().getModifiers())) {
continue;
}

deleteByFilter(t, query, status, result);
deleteByFilter(t, query, operation, result);
}
}

handleResultOnFinish(result, status, "Delete finished");
handleResultOnFinish(operation, "Delete finished");
}

private void deleteByFilter(ObjectTypes type, ObjectQuery query, CountStatus status, OperationResult result)
private void deleteByFilter(ObjectTypes type, ObjectQuery query, OperationStatus operation, OperationResult result)
throws SchemaException, IOException {

ResultHandler handler = (prismObject, operationResult) -> {
Expand All @@ -92,7 +91,7 @@ private void deleteByFilter(ObjectTypes type, ObjectQuery query, CountStatus sta

switch (state) {
case SKIP:
status.incrementSkipped();
operation.incrementSkipped();
return true;
case STOP:
return false;
Expand All @@ -103,11 +102,12 @@ private void deleteByFilter(ObjectTypes type, ObjectQuery query, CountStatus sta
RepositoryService repository = context.getRepository();
repository.deleteObject(prismObject.getCompileTimeClass(), prismObject.getOid(), operationResult);

status.incrementCount();
operation.incrementTotal();
} catch (ObjectNotFoundException ex) {
// object was already gone
} catch (IOException ex) {
throw new NinjaException("Couldn't delete object '" + prismObject.toDebugName() + "'", ex);
context.getLog().error("Couldn't delete object {}, reason: {}", ex, prismObject, ex.getMessage());
operation.incrementError();
}

return true;
Expand Down
Expand Up @@ -3,8 +3,8 @@
import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.impl.NinjaException;
import com.evolveum.midpoint.ninja.opts.ExportOptions;
import com.evolveum.midpoint.ninja.util.CountStatus;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.PrismSerializer;
Expand Down Expand Up @@ -79,6 +79,8 @@ private void exportByOid(Writer writer) throws SchemaException, ObjectNotFoundEx

OperationResult result = new OperationResult(OPERATION_EXPORT);

OperationStatus operation = new OperationStatus(result);
operation.start();
log.info("Starting export");

PrismObject object = repository.getObject(type.getClassDefinition(), options.getOid(), opts, result);
Expand All @@ -87,34 +89,34 @@ private void exportByOid(Writer writer) throws SchemaException, ObjectNotFoundEx
String xml = serializer.serialize(object);
writer.write(xml);

handleResultOnFinish(result, null, "Export finished");
handleResultOnFinish(operation, "Export finished");
}

private void exportByFilter(final Writer writer) throws SchemaException, IOException {
OperationResult result = new OperationResult(OPERATION_EXPORT);

CountStatus status = new CountStatus();
status.start();
OperationStatus operation = new OperationStatus(result);
operation.start();

log.info("Starting export");

ObjectTypes type = options.getType();
if (type != null) {
exportByType(type, writer, status, result);
exportByType(type, writer, operation, result);
} else {
for (ObjectTypes t : ObjectTypes.values()) {
if (Modifier.isAbstract(t.getClassDefinition().getModifiers())) {
continue;
}

exportByType(t, writer, status, result);
exportByType(t, writer, operation, result);
}
}

handleResultOnFinish(result, status, "Export finished");
handleResultOnFinish(operation, "Export finished");
}

private void exportByType(ObjectTypes type, Writer writer, CountStatus status, OperationResult result)
private void exportByType(ObjectTypes type, Writer writer, OperationStatus operation, OperationResult result)
throws SchemaException, IOException {

RepositoryService repository = context.getRepository();
Expand All @@ -137,9 +139,10 @@ private void exportByType(ObjectTypes type, Writer writer, CountStatus status, O
String xml = serializer.serialize(object);
writer.write(xml);

status.incrementCount();
operation.incrementTotal();

logCountProgress(status);
// todo log count progress
// logCountProgress(status);
} catch (Exception ex) {
return false;
}
Expand Down
Expand Up @@ -19,6 +19,10 @@
*/
public class ImportRepositoryAction extends RepositoryAction<ImportOptions> {

private static final String DOT_CLASS = ImportProducerWorker.class.getName() + ".";

private static final String OPERATION_IMPORT = DOT_CLASS + "import";

private static final int QUEUE_CAPACITY_PER_THREAD = 100;
private static final long CONSUMERS_WAIT_FOR_START = 2000L;

Expand All @@ -28,7 +32,8 @@ public class ImportRepositoryAction extends RepositoryAction<ImportOptions> {

@Override
public void execute() throws Exception {
OperationStatus progress = new OperationStatus();
OperationResult result = new OperationResult(OPERATION_IMPORT);
OperationStatus progress = new OperationStatus(result);

queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());

Expand All @@ -48,7 +53,7 @@ public void execute() throws Exception {

Thread.sleep(CONSUMERS_WAIT_FOR_START);

executor.execute(new ProgressReporterWorker(queue, progress, log));
executor.execute(new ProgressReporterWorker(context, options, queue, progress));

for (int i = 0; i < options.getMultiThread(); i++) {
ImportConsumerWorker worker = new ImportConsumerWorker(context, options, queue, progress);
Expand All @@ -58,9 +63,7 @@ public void execute() throws Exception {
executor.shutdown();
executor.awaitTermination(365, TimeUnit.DAYS);

OperationResult result = producer.getResult();

handleResultOnFinish(result, progress, "Import finished");
handleResultOnFinish(progress, "Import finished");
}

@Override
Expand All @@ -73,9 +76,6 @@ protected LogTarget getInfoLogTarget() {
}

private ImportProducerWorker importByFilter(ObjectFilter filter, boolean stopAfterFound, OperationStatus status) {
ImportProducerWorker producer = new ImportProducerWorker(context, options, queue, filter, stopAfterFound);
producer.setOperation(status);

return producer;
return new ImportProducerWorker(context, options, queue, status, filter, stopAfterFound);
}
}
Expand Up @@ -17,6 +17,7 @@
package com.evolveum.midpoint.ninja.action.worker;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.util.OperationStatus;

import java.util.concurrent.BlockingQueue;

Expand All @@ -29,9 +30,12 @@ public abstract class BaseWorker<O extends Object, T extends Object> implements
protected NinjaContext context;
protected O options;

public BaseWorker(NinjaContext context, O options, BlockingQueue<T> queue) {
protected OperationStatus operation;

public BaseWorker(NinjaContext context, O options, BlockingQueue<T> queue, OperationStatus operation) {
this.queue = queue;
this.context = context;
this.options = options;
this.operation = operation;
}
}
Expand Up @@ -32,13 +32,9 @@
*/
public class ImportConsumerWorker extends BaseWorker<ImportOptions, PrismObject> {

private OperationStatus operation;

public ImportConsumerWorker(NinjaContext context, ImportOptions options, BlockingQueue<PrismObject> queue,
OperationStatus operation) {
super(context, options, queue);

this.operation = operation;
super(context, options, queue, operation);
}

@Override
Expand All @@ -56,9 +52,10 @@ public void run() {

repository.addObject(object, opts, new OperationResult("Import object"));

operation.incrementCount();
operation.incrementTotal();
} catch (Exception ex) {
context.getLog().error("Couldn't add object {}, reason: {}", ex, object.toString(), ex.getMessage());
context.getLog().error("Couldn't add object {}, reason: {}", ex, object, ex.getMessage());
operation.incrementError();
}
}
}
Expand Down
Expand Up @@ -49,33 +49,17 @@
*/
public class ImportProducerWorker extends BaseWorker<ImportOptions, PrismObject> {

private static final String DOT_CLASS = ImportProducerWorker.class.getName() + ".";

private static final String OPERATION_IMPORT = DOT_CLASS + "import";

private ObjectFilter filter;
private boolean stopAfterFound;

private OperationStatus operation;

private OperationResult result;

public ImportProducerWorker(NinjaContext context, ImportOptions options, BlockingQueue queue,
public ImportProducerWorker(NinjaContext context, ImportOptions options, BlockingQueue queue, OperationStatus operation,
ObjectFilter filter, boolean stopAfterFound) {
super(context, options, queue);
super(context, options, queue, operation);

this.filter = filter;
this.stopAfterFound = stopAfterFound;
}

public OperationResult getResult() {
return result;
}

public void setOperation(OperationStatus operation) {
this.operation = operation;
}

@Override
public void run() {
Log log = context.getLog();
Expand Down Expand Up @@ -131,8 +115,6 @@ private void processStream(InputStream input) throws IOException {
PrismContext prismContext = appContext.getBean(PrismContext.class);
MatchingRuleRegistry matchingRuleRegistry = appContext.getBean(MatchingRuleRegistry.class);

result = new OperationResult(OPERATION_IMPORT);

EventHandler handler = new EventHandler() {

@Override
Expand Down Expand Up @@ -181,7 +163,8 @@ public void handleGlobalError(OperationResult currentResult) {

Charset charset = context.getCharset();
try (Reader reader = new InputStreamReader(input, charset)) {
validator.validate(new ReaderInputStream(reader, context.getCharset()), result, OPERATION_IMPORT);
OperationResult result = operation.getResult();
validator.validate(new ReaderInputStream(reader, context.getCharset()), result, result.getOperation());
}
}
}

0 comments on commit 9f0e5e5

Please sign in to comment.