Skip to content

Commit

Permalink
multithreading for import implemented, not finished yet
Browse files Browse the repository at this point in the history
  • Loading branch information
1azyman committed Mar 14, 2018
1 parent 98f104c commit 46a1bbe
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 119 deletions.
Expand Up @@ -53,19 +53,23 @@ protected LogTarget getInfoLogTarget() {
protected void handleResultOnFinish(OperationResult result, CountStatus status, String finishMessage) {
result.recomputeStatus();

double totalTime = ((double) (System.currentTimeMillis() - status.getStart())) / 1000;

if (result.isAcceptable()) {
if (status == null) {
log.info("{}", finishMessage);
} else {
log.info("{}. Processed: {} objects, avg. {}ms",
finishMessage, status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
log.info("{}. Processed: {} objects, time {}s, avg. {}ms",
finishMessage, status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(totalTime),
NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
}
} else {
if (status == null) {
log.error("{}", finishMessage);
} else {
log.error("{} with some problems, reason: {}. Processed: {}, avg. {}ms", finishMessage,
result.getMessage(), status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
log.error("{} with some problems, reason: {}. Processed: {}, time{}s, avg. {}ms", finishMessage,
result.getMessage(), status.getCount(), NinjaUtils.DECIMAL_FORMAT.format(totalTime),
NinjaUtils.DECIMAL_FORMAT.format(status.getAvg()));
}

if (context.isVerbose()) {
Expand All @@ -89,7 +93,7 @@ protected void logCountProgress(CountStatus status) {

// deduplicate with WebModelServiceUtils.addIncludeOptionsForExportOrView
protected static void addIncludeOptionsForExport(Collection<SelectorOptions<GetOperationOptions>> options,
Class<? extends ObjectType> type) {
Class<? extends ObjectType> type) {
// todo fix this brutal hack (related to checking whether to include particular options)
boolean all = type == null
|| Objectable.class.equals(type)
Expand Down
@@ -1,56 +1,75 @@
package com.evolveum.midpoint.ninja.action;

import com.evolveum.midpoint.common.validator.EventHandler;
import com.evolveum.midpoint.common.validator.EventResult;
import com.evolveum.midpoint.common.validator.Validator;
import com.evolveum.midpoint.ninja.action.worker.ImportConsumerWorker;
import com.evolveum.midpoint.ninja.action.worker.ImportProducerWorker;
import com.evolveum.midpoint.ninja.action.worker.ProgressReporterWorker;
import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.impl.NinjaException;
import com.evolveum.midpoint.ninja.opts.ImportOptions;
import com.evolveum.midpoint.ninja.util.CountStatus;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.prism.Objectable;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.match.MatchingRuleRegistry;
import com.evolveum.midpoint.prism.query.InOidFilter;
import com.evolveum.midpoint.prism.query.ObjectFilter;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.repo.api.RepoAddOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.schema.constants.ObjectTypes;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import org.apache.commons.io.input.ReaderInputStream;
import org.springframework.context.ApplicationContext;
import org.w3c.dom.Element;
import org.w3c.dom.Node;

import java.io.*;
import java.nio.charset.Charset;
import java.util.concurrent.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

/**
* Created by Viliam Repan (lazyman).
*/
public class ImportRepositoryAction extends RepositoryAction<ImportOptions> {

private static final String DOT_CLASS = ImportRepositoryAction.class.getName() + ".";
private static final int QUEUE_CAPACITY_PER_THREAD = 100;
private static final long CONSUMERS_WAIT_FOR_START = 2000L;

private static final String OPERATION_IMPORT = DOT_CLASS + "import";
private BlockingQueue<PrismObject> queue;

private ExecutorService executor;

@Override
public void execute() throws Exception {
try (Reader reader = createReader()) {

OperationStatus progress = new OperationStatus();

queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());

// + 2 means producer and progress reporter
executor = Executors.newFixedThreadPool(options.getMultiThread() + 2);

String oid = options.getOid();

ImportProducerWorker producer;
if (oid != null) {
importByOid(reader);
producer = importByOid(reader);
} else {
importByFilter(reader);
producer = importByFilter(reader);
}
producer.setOperation(progress);

executor.execute(producer);

Thread.sleep(CONSUMERS_WAIT_FOR_START);

executor.execute(new ProgressReporterWorker(queue, progress, log));

for (int i = 0; i < options.getMultiThread(); i++) {
ImportConsumerWorker worker = new ImportConsumerWorker(context, options, queue, progress);
executor.execute(worker);
}

executor.shutdown();
executor.awaitTermination(365, TimeUnit.DAYS);

OperationResult result = producer.getResult();

handleResultOnFinish(result, progress, "Import finished");
}
}

Expand Down Expand Up @@ -80,103 +99,35 @@ private Reader createReader() throws IOException {
}

if (options.isZip()) {
is = new ZipInputStream(is);
ZipInputStream zis = new ZipInputStream(is);

ZipEntry entry = zis.getNextEntry();
while (entry != null) {
if (entry.isDirectory()) {
continue;
}

break;
}
is = zis;
}

return new InputStreamReader(is, charset);
}

private void importByOid(Reader reader) throws SchemaException, ObjectNotFoundException, IOException {
private ImportProducerWorker importByOid(Reader reader) {
InOidFilter filter = InOidFilter.createInOid(options.getOid());

importByFilter(filter, true, reader);
return importByFilter(filter, true, reader);
}

private void importByFilter(Reader reader) throws SchemaException, IOException {
private ImportProducerWorker importByFilter(Reader reader) throws SchemaException, IOException {
ObjectFilter filter = NinjaUtils.createObjectFilter(options.getFilter(), context);

importByFilter(filter, false, reader);
return importByFilter(filter, false, reader);
}

private void importByFilter(ObjectFilter filter, boolean stopAfterFound, Reader reader) {
ApplicationContext appContext = context.getApplicationContext();
PrismContext prismContext = appContext.getBean(PrismContext.class);
MatchingRuleRegistry matchingRuleRegistry = appContext.getBean(MatchingRuleRegistry.class);

CountStatus status = new CountStatus();
status.start();

OperationResult result = new OperationResult(OPERATION_IMPORT);

EventHandler handler = new EventHandler() {

@Override
public EventResult preMarshall(Element objectElement, Node postValidationTree,
OperationResult objectResult) {
return EventResult.cont();
}

@Override
public <T extends Objectable> EventResult postMarshall(PrismObject<T> object, Element objectElement,
OperationResult objectResult) {

try {
if (filter != null) {
boolean match = ObjectQuery.match(object, filter, matchingRuleRegistry);

if (!match) {
status.incrementSkipped();

return EventResult.skipObject("Object doesn't match filter");
}
}

ObjectTypes type = options.getType();
if (type != null && !type.getClassDefinition().equals(object.getCompileTimeClass())) {
status.incrementSkipped();

return EventResult.skipObject("Type doesn't match");
}

importObject(object, objectResult);

status.incrementCount();

logCountProgress(status);
} catch (Exception ex) {
throw new NinjaException("Couldn't import object, reason: " + ex.getMessage(), ex);
}

return stopAfterFound ? EventResult.stop() : EventResult.cont();
}

@Override
public void handleGlobalError(OperationResult currentResult) {
}
};

log.info("Starting import");

Validator validator = new Validator(prismContext, handler);
validator.validate(new ReaderInputStream(reader, context.getCharset()), result, OPERATION_IMPORT);

handleResultOnFinish(result, status, "Import finished");
}

private String importObject(PrismObject object, OperationResult result)
throws ObjectAlreadyExistsException, SchemaException {

RepositoryService repository = context.getRepository();
RepoAddOptions opts = createRepoAddOptions();

return repository.addObject(object, opts, result);
}

private RepoAddOptions createRepoAddOptions() {
RepoAddOptions opts = new RepoAddOptions();
opts.setOverwrite(options.isOverwrite());
opts.setAllowUnencryptedValues(options.isAllowUnencryptedValues());

return opts;
private ImportProducerWorker importByFilter(ObjectFilter filter, boolean stopAfterFound, Reader reader) {
return new ImportProducerWorker(context, options, queue, filter, stopAfterFound, reader);
}
}
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2010-2018 Evolveum
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.evolveum.midpoint.ninja.action.worker;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.opts.ImportOptions;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.repo.api.RepoAddOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.schema.result.OperationResult;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Created by Viliam Repan (lazyman).
*/
public class ImportConsumerWorker implements Runnable {

private BlockingQueue<PrismObject> queue;
private NinjaContext context;
private ImportOptions options;

private OperationStatus operation;

public ImportConsumerWorker(NinjaContext context, ImportOptions options, BlockingQueue<PrismObject> queue, OperationStatus operation) {
this.context = context;
this.options = options;

this.queue = queue;
this.operation = operation;
}

@Override
public void run() {
while (!stop()) {
try {
PrismObject object = queue.poll(2, TimeUnit.SECONDS);
if (object == null) {
continue;
}

RepositoryService repository = context.getRepository();
RepoAddOptions opts = createRepoAddOptions(options);

repository.addObject(object, opts, new OperationResult("Import object"));

operation.incrementCount();
} catch (Exception ex) {
// todo handle error
ex.printStackTrace();
}
}
}

private boolean stop() {
if (operation.isFinished()) {
return true;
}

if (operation.isStarted()) {
return false;
}

if (operation.isProducerFinished() && !queue.isEmpty()) {
return false;
}

return true;
}

private RepoAddOptions createRepoAddOptions(ImportOptions options) {
RepoAddOptions opts = new RepoAddOptions();
opts.setOverwrite(options.isOverwrite());
opts.setAllowUnencryptedValues(options.isAllowUnencryptedValues());

return opts;
}
}

0 comments on commit 46a1bbe

Please sign in to comment.