Skip to content

Commit

Permalink
Merge branch 'feature/ninja-audit'
Browse files Browse the repository at this point in the history
# Conflicts:
#	tools/ninja/src/main/java/com/evolveum/midpoint/ninja/action/worker/AbstractWriterConsumerWorker.java
#	tools/ninja/src/main/java/com/evolveum/midpoint/ninja/action/worker/ProgressReporterWorker.java
  • Loading branch information
virgo47 committed Dec 4, 2021
2 parents d3453c3 + 0b9149f commit 600c156
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package com.evolveum.midpoint.repo.sql;

import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;

import static com.evolveum.midpoint.schema.util.SystemConfigurationAuditUtil.isEscapingInvalidCharacters;

import java.sql.Types;
Expand All @@ -14,25 +16,28 @@
import java.util.Map.Entry;
import java.util.function.BiFunction;
import javax.xml.datatype.Duration;
import javax.xml.datatype.XMLGregorianCalendar;

import com.querydsl.sql.ColumnMetadata;
import com.querydsl.sql.SQLQuery;
import com.querydsl.sql.dml.DefaultMapper;
import com.querydsl.sql.dml.SQLInsertClause;
import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import com.evolveum.midpoint.audit.api.AuditEventRecord;
import com.evolveum.midpoint.audit.api.AuditReferenceValue;
import com.evolveum.midpoint.audit.api.AuditResultHandler;
import com.evolveum.midpoint.audit.api.AuditService;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.SerializationOptions;
import com.evolveum.midpoint.prism.*;
import com.evolveum.midpoint.prism.delta.ObjectDelta;
import com.evolveum.midpoint.prism.path.CanonicalItemPath;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.polystring.PolyString;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.prism.query.*;
import com.evolveum.midpoint.prism.query.builder.S_ConditionEntry;
import com.evolveum.midpoint.prism.query.builder.S_MatchingRuleEntry;
import com.evolveum.midpoint.prism.util.CloneUtil;
import com.evolveum.midpoint.repo.sql.audit.AuditSqlQueryContext;
import com.evolveum.midpoint.repo.sql.audit.beans.MAuditDelta;
Expand All @@ -49,6 +54,7 @@
import com.evolveum.midpoint.schema.*;
import com.evolveum.midpoint.schema.internals.InternalsConfig;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectQueryUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.Holder;
import com.evolveum.midpoint.util.MiscUtil;
Expand Down Expand Up @@ -737,6 +743,168 @@ public SearchResultMetadata searchObjectsIterative(
@NotNull AuditResultHandler handler,
@Nullable Collection<SelectorOptions<GetOperationOptions>> options,
@NotNull OperationResult parentResult) throws SchemaException {
throw new UnsupportedOperationException("searchObjectsIterative not supported in old repository audit");
// TODO: supported for Ninja, implementation is crude rework from new repo
// throw new UnsupportedOperationException("searchObjectsIterative not supported in old repository audit");
Validate.notNull(handler, "Result handler must not be null.");
Validate.notNull(parentResult, "Operation result must not be null.");

OperationResult operationResult = parentResult.subresult(OP_NAME_PREFIX + OP_SEARCH_OBJECTS_ITERATIVE)
.addParam("type", AuditEventRecordType.class.getName())
.addParam("query", query)
.build();

try {
if (query == null) {
return new SearchResultMetadata().approxNumberOfAllResults(0);
}

return executeSearchObjectsIterative(query, handler, options, operationResult);
} catch (RepositoryException | RuntimeException e) {
operationResult.recordFatalError(e);
throw new SystemException(e);
} catch (Throwable t) {
operationResult.recordFatalError(t);
throw t;
} finally {
operationResult.computeStatusIfUnknown();
}
}

private SearchResultMetadata executeSearchObjectsIterative(
ObjectQuery originalQuery,
AuditResultHandler handler,
Collection<SelectorOptions<GetOperationOptions>> options,
OperationResult operationResult) throws SchemaException, RepositoryException {

ObjectPaging originalPaging = originalQuery != null ? originalQuery.getPaging() : null;
// this is total requested size of the search
Integer maxSize = originalPaging != null ? originalPaging.getMaxSize() : null;

List<? extends ObjectOrdering> providedOrdering = originalPaging != null
? originalPaging.getOrderingInstructions()
: null;
if (providedOrdering != null && providedOrdering.size() > 1) {
throw new RepositoryException("searchObjectsIterative() does not support ordering"
+ " by multiple paths (yet): " + providedOrdering);
}

ObjectQuery pagedQuery = schemaService.prismContext().queryFactory().createQuery();
ObjectPaging paging = schemaService.prismContext().queryFactory().createPaging();
if (originalPaging != null && originalPaging.getOrderingInstructions() != null) {
originalPaging.getOrderingInstructions().forEach(o ->
paging.addOrderingInstruction(o.getOrderBy(), o.getDirection()));
}
// TODO check of provided ordering
paging.addOrderingInstruction(AuditEventRecordType.F_REPO_ID, OrderDirection.ASCENDING);
pagedQuery.setPaging(paging);

int pageSize = Math.min(
sqlConfiguration().getIterativeSearchByPagingBatchSize(),
defaultIfNull(maxSize, Integer.MAX_VALUE));
pagedQuery.getPaging().setMaxSize(pageSize);

AuditEventRecordType lastProcessedObject = null;
int handledObjectsTotal = 0;

while (true) {
if (maxSize != null && maxSize - handledObjectsTotal < pageSize) {
// relevant only for the last page
pagedQuery.getPaging().setMaxSize(maxSize - handledObjectsTotal);
}

// filterAnd() is quite null safe, even for both nulls
ObjectFilter originalFilter = originalQuery != null ? originalQuery.getFilter() : null;
pagedQuery.setFilter(ObjectQueryUtil.filterAndImmutable(
originalFilter, iterativeSearchCondition(lastProcessedObject, providedOrdering)));

// we don't call public searchObject to avoid subresults and query simplification
List<AuditEventRecordType> resultPage = searchObjects(
pagedQuery, options, operationResult);

// process page results
for (AuditEventRecordType auditEvent : resultPage) {
lastProcessedObject = auditEvent;
if (!handler.handle(auditEvent, operationResult)) {
return new SearchResultMetadata()
.approxNumberOfAllResults(handledObjectsTotal + 1)
.pagingCookie(lastProcessedObject.getRepoId().toString())
.partialResults(true);
}
handledObjectsTotal += 1;

if (maxSize != null && handledObjectsTotal >= maxSize) {
return new SearchResultMetadata()
.approxNumberOfAllResults(handledObjectsTotal)
.pagingCookie(lastProcessedObject.getRepoId().toString());
}
}

if (resultPage.isEmpty() || resultPage.size() < pageSize) {
return new SearchResultMetadata()
.approxNumberOfAllResults(handledObjectsTotal)
.pagingCookie(lastProcessedObject != null
? lastProcessedObject.getRepoId().toString() : null);
}
}
}

/**
* See {@code SqaleRepositoryService.iterativeSearchCondition()} for more info.
* This is unsupported version only for Ninja usage.
*/
@Nullable
private ObjectFilter iterativeSearchCondition(
@Nullable AuditEventRecordType lastProcessedObject,
List<? extends ObjectOrdering> providedOrdering) {
if (lastProcessedObject == null) {
return null;
}

Long lastProcessedId = lastProcessedObject.getRepoId();
XMLGregorianCalendar lastProcessedTimestamp = lastProcessedObject.getTimestamp();
if (providedOrdering == null || providedOrdering.isEmpty()) {
return schemaService.prismContext()
.queryFor(AuditEventRecordType.class)
.item(AuditEventRecordType.F_REPO_ID).gt(lastProcessedId)
.and()
// timestamp of the next entry can be the same, we need greater-or-equal here
.item(AuditEventRecordType.F_TIMESTAMP).ge(lastProcessedTimestamp)
.buildFilter();
}

if (providedOrdering.size() == 1) {
ObjectOrdering objectOrdering = providedOrdering.get(0);
ItemPath orderByPath = objectOrdering.getOrderBy();
boolean asc = objectOrdering.getDirection() == OrderDirection.ASCENDING;
S_ConditionEntry filter = schemaService.prismContext()
.queryFor(AuditEventRecordType.class)
.item(orderByPath);
@SuppressWarnings("unchecked")
Item<PrismValue, ItemDefinition<?>> item =
lastProcessedObject.asPrismContainerValue().findItem(orderByPath);
if (item.size() > 1) {
throw new IllegalArgumentException(
"Multi-value property for ordering is forbidden - item: " + item);
} else if (item.isEmpty()) {
// TODO what if it's nullable? is it null-first or last?
// See: https://www.postgresql.org/docs/13/queries-order.html
// "By default, null values sort as if larger than any non-null value; that is,
// NULLS FIRST is the default for DESC order, and NULLS LAST otherwise."
} else {
S_MatchingRuleEntry matchingRuleEntry =
asc ? filter.gt(item.getRealValue()) : filter.lt(item.getRealValue());
filter = matchingRuleEntry.or()
.block()
.item(orderByPath).eq(item.getRealValue())
.and()
.item(AuditEventRecordType.F_REPO_ID);
return (asc ? filter.gt(lastProcessedId) : filter.lt(lastProcessedId))
.endBlock()
.buildFilter();
}
}

throw new IllegalArgumentException(
"Shouldn't get here with check in executeSearchObjectsIterative()");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
/**
* Abstract action for all search-based operations, such as export and verify.
*
* @author Viliam Repan (lazyman)
* @param <O> options class
*/
public abstract class AbstractRepositorySearchAction<OP extends ExportOptions> extends RepositoryAction<OP> {
public abstract class AbstractRepositorySearchAction<O extends ExportOptions> extends RepositoryAction<O> {

private static final String DOT_CLASS = AbstractRepositorySearchAction.class.getName() + ".";

Expand Down Expand Up @@ -91,7 +91,10 @@ public void execute() throws Exception {
}

executor.shutdown();
executor.awaitTermination(NinjaUtils.WAIT_FOR_EXECUTOR_FINISH, TimeUnit.DAYS);
boolean awaitResult = executor.awaitTermination(NinjaUtils.WAIT_FOR_EXECUTOR_FINISH, TimeUnit.DAYS);
if (!awaitResult) {
log.error("Executor did not finish before timeout");
}

handleResultOnFinish(operation, "Finished " + getOperationShortName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.ninja.action;

import java.util.Objects;

import com.evolveum.midpoint.ninja.impl.LogTarget;
import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.opts.ConnectionOptions;
Expand All @@ -15,17 +17,19 @@
import com.evolveum.midpoint.schema.result.OperationResult;

/**
* Created by Viliam Repan (lazyman).
* Base implementation class for action, that is Ninja command.
*
* @param <O> options class
*/
public abstract class Action<T> {
public abstract class Action<O> {

protected Log log;

protected NinjaContext context;

protected T options;
protected O options;

public void init(NinjaContext context, T options) {
public void init(NinjaContext context, O options) {
this.context = context;
this.options = options;

Expand All @@ -34,7 +38,8 @@ public void init(NinjaContext context, T options) {

this.context.setLog(log);

ConnectionOptions connection = NinjaUtils.getOptions(this.context.getJc(), ConnectionOptions.class);
ConnectionOptions connection = Objects.requireNonNull(
NinjaUtils.getOptions(this.context.getJc(), ConnectionOptions.class));
this.context.init(connection);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/*
* Copyright (c) 2010-2018 Evolveum and contributors
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.ninja.action;

/**
* Created by Viliam Repan (lazyman).
* Base implementation class for action (Ninja command) running against the repository.
*
* @param <O> options class
*/
public abstract class RepositoryAction<T> extends Action<T> {
public abstract class RepositoryAction<O> extends Action<O> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.ninja.action.audit;

import java.util.List;
import java.util.concurrent.BlockingQueue;

import com.evolveum.midpoint.audit.api.AuditResultHandler;
import com.evolveum.midpoint.audit.api.AuditService;
import com.evolveum.midpoint.ninja.action.worker.BaseWorker;
import com.evolveum.midpoint.ninja.impl.NinjaContext;
import com.evolveum.midpoint.ninja.impl.NinjaException;
import com.evolveum.midpoint.ninja.opts.ExportOptions;
import com.evolveum.midpoint.ninja.util.Log;
import com.evolveum.midpoint.ninja.util.NinjaUtils;
import com.evolveum.midpoint.ninja.util.OperationStatus;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.schema.GetOperationOptionsBuilder;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.audit_3.AuditEventRecordType;

/**
* Producer worker for audit export operation.
*/
public class AuditExportProducerWorker extends BaseWorker<ExportOptions, AuditEventRecordType> {

private final ObjectQuery query;

public AuditExportProducerWorker(
NinjaContext context, ExportOptions options, BlockingQueue<AuditEventRecordType> queue,
OperationStatus operation, List<AuditExportProducerWorker> producers, ObjectQuery query) {
super(context, options, queue, operation, producers);

this.query = query;
}

@Override
public void run() {
Log log = context.getLog();

try {
GetOperationOptionsBuilder optionsBuilder = context.getSchemaService().getOperationOptionsBuilder();
if (options.isRaw()) {
optionsBuilder = optionsBuilder.raw();
}

optionsBuilder = NinjaUtils.addIncludeOptionsForExport(optionsBuilder, AuditEventRecordType.class);

AuditResultHandler handler = (object, parentResult) -> {
try {
//noinspection unchecked
queue.put(object); // TODO no better way of conversion?
} catch (InterruptedException ex) {
log.error("Couldn't queue object {}, reason: {}", ex, object, ex.getMessage());
}
return true;
};

AuditService auditService = context.getAuditService();
auditService.searchObjectsIterative(query, handler, optionsBuilder.build(), operation.getResult());
} catch (SchemaException ex) {
log.error("Unexpected exception, reason: {}", ex, ex.getMessage());
} catch (NinjaException ex) {
log.error(ex.getMessage(), ex);
} finally {
markDone();

if (isWorkersDone()) {
if (!operation.isFinished()) {
operation.producerFinish();
}
}
}
}
}

0 comments on commit 600c156

Please sign in to comment.