Skip to content

Commit

Permalink
ninja: queue type parameter generalization, cleanup
Browse files Browse the repository at this point in the history
(cherry picked from commit d3453c3)
  • Loading branch information
virgo47 committed Dec 4, 2021
1 parent 2f1154c commit 9c43752
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.ninja.action;

import com.evolveum.midpoint.ninja.action.worker.SearchProducerWorker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;

import com.evolveum.midpoint.ninja.action.worker.ProgressReporterWorker;
import com.evolveum.midpoint.ninja.action.worker.SearchProducerWorker;
import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.opts.ExportOptions;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.*;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.PrismReferenceDefinition;
import com.evolveum.midpoint.prism.PrismReferenceValue;
import com.evolveum.midpoint.prism.query.*;
import com.evolveum.midpoint.prism.schema.SchemaRegistry;
import com.evolveum.midpoint.repo.api.RepositoryService;
Expand All @@ -26,10 +36,6 @@
import com.evolveum.midpoint.xml.ns._public.common.common_3.ResourceType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ShadowType;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;

/**
* Abstract action for all search-based operations, such as export and verify.
*
Expand All @@ -46,7 +52,7 @@ public abstract class AbstractRepositorySearchAction<OP extends ExportOptions> e

protected abstract String getOperationShortName();

protected abstract Runnable createConsumer(BlockingQueue<PrismObject> queue, OperationStatus operation);
protected abstract Runnable createConsumer(BlockingQueue<PrismObject<?>> queue, OperationStatus operation);

protected String getOperationName() {
return this.getClass().getName() + "." + getOperationShortName();
Expand All @@ -60,7 +66,7 @@ public void execute() throws Exception {
// "+ 2" will be used for consumer and progress reporter
ExecutorService executor = Executors.newFixedThreadPool(options.getMultiThread() + 2);

BlockingQueue<PrismObject> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());
BlockingQueue<PrismObject<?>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());

List<SearchProducerWorker> producers = createProducers(queue, operation);

Expand Down Expand Up @@ -99,7 +105,7 @@ public LogTarget getInfoLogTarget() {
return LogTarget.SYSTEM_ERR;
}

private List<SearchProducerWorker> createProducers(BlockingQueue<PrismObject> queue, OperationStatus operation)
private List<SearchProducerWorker> createProducers(BlockingQueue<PrismObject<?>> queue, OperationStatus operation)
throws SchemaException, IOException {

QueryFactory queryFactory = context.getPrismContext().queryFactory();
Expand All @@ -122,7 +128,8 @@ private List<SearchProducerWorker> createProducers(BlockingQueue<PrismObject> qu
ObjectFilter filter = NinjaUtils.createObjectFilter(options.getFilter(), context, type.getClassDefinition());
ObjectQuery query = queryFactory.createQuery(filter);
if (ObjectTypes.SHADOW.equals(type)) {
List<SearchProducerWorker> shadowProducers = createProducersForShadows(context, queue, operation, producers, filter);
List<SearchProducerWorker> shadowProducers =
createProducersForShadows(context, queue, operation, producers, filter);
producers.addAll(shadowProducers);
continue;
}
Expand All @@ -138,8 +145,9 @@ private List<SearchProducerWorker> createProducers(BlockingQueue<PrismObject> qu
* run in more threads. No extra special processing is done for shadows. Just to split them to workers for
* performance reasons.
*/
private List<SearchProducerWorker> createProducersForShadows(NinjaContext context,
BlockingQueue<PrismObject> queue, OperationStatus operation, List<SearchProducerWorker> producers, ObjectFilter filter) {
private List<SearchProducerWorker> createProducersForShadows(
NinjaContext context, BlockingQueue<PrismObject<?>> queue,
OperationStatus operation, List<SearchProducerWorker> producers, ObjectFilter filter) {

QueryFactory queryFactory = context.getPrismContext().queryFactory();
List<SearchProducerWorker> shadowProducers = new ArrayList<>();
Expand Down Expand Up @@ -210,8 +218,8 @@ private RefFilter createResourceRefFilter(String oid) throws SchemaException {
return prismContext.queryFactory().createReferenceEqual(ShadowType.F_RESOURCE_REF, def, values);
}

private SearchProducerWorker createProducer(BlockingQueue<PrismObject> queue, OperationStatus operation,
List<SearchProducerWorker> producers, ObjectTypes type, ObjectFilter filter) {
private SearchProducerWorker createProducer(BlockingQueue<PrismObject<?>> queue, OperationStatus operation,
List<SearchProducerWorker> producers, ObjectTypes type, ObjectFilter filter) {
ObjectQuery query = context.getPrismContext().queryFactory().createQuery(filter);
return new SearchProducerWorker(context, options, queue, operation, producers, type, query);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected String getOperationShortName() {
}

@Override
protected Runnable createConsumer(BlockingQueue<PrismObject> queue, OperationStatus operation) {
protected Runnable createConsumer(BlockingQueue<PrismObject<?>> queue, OperationStatus operation) {
return new ExportConsumerWorker(context, options, queue, operation);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
/*
* Copyright (c) 2010-2019 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.ninja.action;

import com.evolveum.midpoint.ninja.action.worker.ImportConsumerWorker;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import com.evolveum.midpoint.ninja.action.worker.ImportProducerWorker;
import com.evolveum.midpoint.ninja.action.worker.ImportRepositoryConsumerWorker;
import com.evolveum.midpoint.ninja.action.worker.ProgressReporterWorker;
import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.opts.ImportOptions;
Expand All @@ -19,10 +23,6 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* Created by Viliam Repan (lazyman).
*/
Expand All @@ -40,7 +40,7 @@ public void execute() throws Exception {
OperationResult result = new OperationResult(OPERATION_IMPORT);
OperationStatus progress = new OperationStatus(context, result);

BlockingQueue<PrismObject> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());
BlockingQueue<PrismObject<?>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY_PER_THREAD * options.getMultiThread());

// "+ 2" will be used for producer and progress reporter
ExecutorService executor = Executors.newFixedThreadPool(options.getMultiThread() + 2);
Expand All @@ -60,11 +60,14 @@ public void execute() throws Exception {

executor.execute(new ProgressReporterWorker(context, options, queue, progress));

List<ImportConsumerWorker> consumers = createConsumers(queue, progress);
consumers.stream().forEach(c -> executor.execute(c));
List<ImportRepositoryConsumerWorker> consumers = createConsumers(queue, progress);
consumers.forEach(c -> executor.execute(c));

executor.shutdown();
executor.awaitTermination(NinjaUtils.WAIT_FOR_EXECUTOR_FINISH, TimeUnit.DAYS);
boolean awaitResult = executor.awaitTermination(NinjaUtils.WAIT_FOR_EXECUTOR_FINISH, TimeUnit.DAYS);
if (!awaitResult) {
log.error("Executor did not finish before timeout");
}

handleResultOnFinish(progress, "Import finished");
}
Expand All @@ -79,15 +82,15 @@ public LogTarget getInfoLogTarget() {
}

private ImportProducerWorker importByFilter(ObjectFilter filter, boolean stopAfterFound,
BlockingQueue<PrismObject> queue, OperationStatus status) {
BlockingQueue<PrismObject<?>> queue, OperationStatus status) {
return new ImportProducerWorker(context, options, queue, status, filter, stopAfterFound);
}

private List<ImportConsumerWorker> createConsumers(BlockingQueue<PrismObject> queue, OperationStatus operation) {
List<ImportConsumerWorker> consumers = new ArrayList<>();
private List<ImportRepositoryConsumerWorker> createConsumers(BlockingQueue<PrismObject<?>> queue, OperationStatus operation) {
List<ImportRepositoryConsumerWorker> consumers = new ArrayList<>();

for (int i = 0; i < options.getMultiThread(); i++) {
consumers.add(new ImportConsumerWorker(context, options, queue, operation, consumers));
consumers.add(new ImportRepositoryConsumerWorker(context, options, queue, operation, consumers));
}

return consumers;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
Expand All @@ -24,8 +24,7 @@ protected String getOperationShortName() {
}

@Override
protected Runnable createConsumer(BlockingQueue<PrismObject> queue, OperationStatus operation) {
protected Runnable createConsumer(BlockingQueue<PrismObject<?>> queue, OperationStatus operation) {
return new VerifyConsumerWorker(context, options, queue, operation);
}

}
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.ninja.action.worker;

import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.impl.NinjaException;
import com.evolveum.midpoint.ninja.opts.ExportOptions;
import com.evolveum.midpoint.ninja.util.Log;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.PrismSerializer;
import com.evolveum.midpoint.prism.SerializationOptions;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;

import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Created by Viliam Repan (lazyman).
*/
public abstract class AbstractWriterConsumerWorker<OP extends ExportOptions> extends BaseWorker<OP, PrismObject> {
public abstract class AbstractWriterConsumerWorker<OP extends ExportOptions, T>
extends BaseWorker<OP, T> {

public AbstractWriterConsumerWorker(NinjaContext context, OP options, BlockingQueue<PrismObject> queue,
OperationStatus operation) {
public AbstractWriterConsumerWorker(NinjaContext context,
OP options, BlockingQueue<T> queue, OperationStatus operation) {
super(context, options, queue, operation);
}

Expand All @@ -44,7 +41,7 @@ public void run() {

try (Writer writer = createWriter()) {
while (!shouldConsumerStop()) {
PrismObject object = null;
T object = null;
try {
object = queue.poll(CONSUMER_POLL_TIMEOUT, TimeUnit.SECONDS);
if (object == null) {
Expand Down Expand Up @@ -79,7 +76,7 @@ public void run() {

protected abstract String getProlog();

protected abstract <O extends ObjectType> void write(Writer writer, PrismObject<O> object) throws SchemaException, IOException;
protected abstract void write(Writer writer, T object) throws SchemaException, IOException;

protected abstract String getEpilog();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.ninja.action.worker;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.util.OperationStatus;

import java.util.List;
import java.util.concurrent.BlockingQueue;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.util.OperationStatus;

/**
* Created by Viliam Repan (lazyman).
*/
public abstract class BaseWorker<O extends Object, T extends Object> implements Runnable {
public abstract class BaseWorker<O, T> implements Runnable {

public static final int CONSUMER_POLL_TIMEOUT = 2;

private List<? extends BaseWorker> workers;
private final List<? extends BaseWorker> workers;

protected BlockingQueue<T> queue;
protected NinjaContext context;
Expand All @@ -35,7 +34,7 @@ public BaseWorker(NinjaContext context, O options, BlockingQueue<T> queue, Opera
}

public BaseWorker(NinjaContext context, O options, BlockingQueue<T> queue, OperationStatus operation,
List<? extends BaseWorker> workers) {
List<? extends BaseWorker> workers) {
this.queue = queue;
this.context = context;
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.ninja.action.worker;

import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;

import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.opts.ExportOptions;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
Expand All @@ -15,29 +18,24 @@
import com.evolveum.midpoint.prism.PrismSerializer;
import com.evolveum.midpoint.prism.SerializationOptions;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;

import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;

/**
* Created by Viliam Repan (lazyman).
*/
public class ExportConsumerWorker extends AbstractWriterConsumerWorker<ExportOptions> {
public class ExportConsumerWorker extends AbstractWriterConsumerWorker<ExportOptions, PrismObject<?>> {

private PrismSerializer<String> serializer;

public ExportConsumerWorker(NinjaContext context, ExportOptions options, BlockingQueue<PrismObject> queue,
OperationStatus operation) {
public ExportConsumerWorker(NinjaContext context,
ExportOptions options, BlockingQueue<PrismObject<?>> queue, OperationStatus operation) {
super(context, options, queue, operation);
}

@Override
protected void init() {
serializer = context.getPrismContext()
.xmlSerializer()
.options(SerializationOptions.createSerializeForExport().skipContainerIds(options.isSkipContainerIds()));
.xmlSerializer()
.options(SerializationOptions.createSerializeForExport().skipContainerIds(options.isSkipContainerIds()));
}

@Override
Expand All @@ -46,7 +44,7 @@ protected String getProlog() {
}

@Override
protected <O extends ObjectType> void write(Writer writer, PrismObject<O> object) throws SchemaException, IOException {
protected void write(Writer writer, PrismObject<?> object) throws SchemaException, IOException {
String xml = serializer.serialize(object);
writer.write(xml);
}
Expand Down

0 comments on commit 9c43752

Please sign in to comment.