Skip to content

Commit

Permalink
[proxima-direct-core] O2-Czech-Republic#269 add TransactionValidator
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Apr 4, 2022
1 parent d9948ba commit eb5b60b
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/src/test/resources/test-transactions.conf
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
attributes: [ "*" ]
storage: "inmem:///proxima_gateway"
type: primary
access: [ commit-log, random-access, cached-view, batch-snapshot, batch-updates ]
access: [ commit-log, random-access, cached-view, batch-snapshot, batch-updates, list-primary-key ]
}

user-storage-stream: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.transform.DirectElementWiseTransform;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.TransactionMode;
Expand All @@ -31,6 +32,7 @@
import cz.o2.proxima.transaction.KeyAttributes;
import cz.o2.proxima.transaction.Response;
import cz.o2.proxima.transaction.State;
import cz.o2.proxima.transform.ElementWiseTransformation;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
Expand All @@ -41,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -79,6 +82,64 @@ public Transaction begin() {
};
}

/**
* Interface for a {@link ElementWiseTransformation} to get access to {@link Transaction}.
* Implementations should extend {@link TransactionValidator}.
*/
interface TransactionAwareTransformation extends DirectElementWiseTransform {
Transaction currentTransaction();

void setTransaction(Transaction transaction);
}

/**
* Base class for enforcing constraints on outputs of transaction (e.g. unique constraints).
* Extend this class to do any application-specific validation of to-be-committed outputs of a
* transaction.
*/
public abstract static class TransactionValidator implements TransactionAwareTransformation {

Transaction transaction;

@Override
public final Transaction currentTransaction() {
return Objects.requireNonNull(transaction);
}

@Override
public void setTransaction(Transaction transaction) {
this.transaction = transaction;
}

@Override
public final void transform(StreamElement input, CommitCallback commit)
throws TransactionRejectedRuntimeException {
try {
validate(input, currentTransaction());
} catch (TransactionRejectedException ex) {
// this needs to be delegated to caller
throw new TransactionRejectedRuntimeException(ex);
}
}

/**
* Validate the input element. Use provided {@link Transaction} to add new inputs (if any). MUST
* NOT call {@link Transaction#commitWrite}.
*
* @param element the input stream element to transform
* @return
* @throws TransactionPreconditionFailedException if any precondition for a transaction fails.
*/
public abstract void validate(StreamElement element, Transaction transaction)
throws TransactionPreconditionFailedException, TransactionRejectedException;
}

public static class TransactionPreconditionFailedException extends RuntimeException {
public TransactionPreconditionFailedException(String message) {
super(message);
}
}

public static class TransactionRejectedException extends Exception {
@Getter private final String transactionId;

Expand All @@ -88,6 +149,12 @@ private TransactionRejectedException(String transactionId) {
}
}

static class TransactionRejectedRuntimeException extends RuntimeException {
private TransactionRejectedRuntimeException(TransactionRejectedException wrap) {
super(wrap.getMessage(), wrap);
}
}

public class Transaction implements AutoCloseable {

@Getter private final String transactionId;
Expand Down Expand Up @@ -155,7 +222,12 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback)

List<StreamElement> injected =
outputs.stream().map(this::injectSequenceIdAndStamp).collect(Collectors.toList());
Collection<StreamElement> transformed = applyTransforms(injected);
Collection<StreamElement> transformed;
try {
transformed = applyTransforms(injected);
} catch (TransactionRejectedRuntimeException ex) {
throw (TransactionRejectedException) ex.getCause();
}
StreamElement toWrite = getSingleOrCommit(transformed);
OnlineAttributeWriter writer =
transformed.size() == 1 && !isGlobalTransaction ? delegate : commitDelegate;
Expand Down Expand Up @@ -204,23 +276,55 @@ private Collection<StreamElement> applyTransforms(List<StreamElement> outputs) {
if (applicableTransforms != null) {
applicableTransforms
.stream()
.filter(t -> !(t.getTransformation() instanceof TransactionValidator))
.filter(t -> t.getFilter().apply(el))
.forEach(
td ->
td.getTransformation()
.asElementWiseTransform()
.apply(
el,
transformed ->
newElements.add(injectSequenceIdAndStamp(transformed))));
.forEach(td -> applyTransform(newElements, el, td));
}
}
}
currentElements = newElements;
} while (!currentElements.isEmpty());
applyValidations(elements);
return elements;
}

private void applyValidations(Set<StreamElement> elements) {
for (StreamElement el : elements) {
List<TransformationDescriptor> applicableTransforms =
attributeTransforms.get(el.getAttributeDescriptor());
if (applicableTransforms != null) {
applicableTransforms
.stream()
.filter(t -> t.getTransformation() instanceof TransactionValidator)
.filter(t -> t.getFilter().apply(el))
.forEach(td -> applyTransform(Collections.emptyList(), el, td));
}
}
}

private void applyTransform(
List<StreamElement> newElements, StreamElement el, TransformationDescriptor td) {

if (td.getTransformation() instanceof TransactionValidator) {
TransactionValidator transform = (TransactionValidator) td.getTransformation();
transform.setTransaction(this);
transform.transform(el, CommitCallback.noop());
} else {
ElementWiseTransformation transform = td.getTransformation().asElementWiseTransform();
if (transform instanceof TransactionAwareTransformation) {
((TransactionAwareTransformation) transform).setTransaction(this);
}
int currentSize = newElements.size();
int add =
transform.apply(
el, transformed -> newElements.add(injectSequenceIdAndStamp(transformed)));
Preconditions.checkState(
newElements.size() == currentSize + add,
"Transformation %s is asynchronous which not currently supported in transaction mode.",
transform.getClass());
}
}

private StreamElement getSingleOrCommit(Collection<StreamElement> outputs) {
if (outputs.size() == 1 && !isGlobalTransaction) {
return Iterables.getOnlyElement(outputs);
Expand Down Expand Up @@ -353,7 +457,7 @@ public synchronized void write(StreamElement data, CommitCallback statusCallback
suffix);
t.update(Collections.singletonList(outputKeyAttribute));
t.commitWrite(Collections.singletonList(data), statusCallback);
} catch (TransactionRejectedException e) {
} catch (Throwable e) {
statusCallback.commit(false, e);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2017-2022 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.transaction;

import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter.Transaction;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter.TransactionPreconditionFailedException;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter.TransactionRejectedException;
import cz.o2.proxima.direct.transaction.TransactionalOnlineAttributeWriter.TransactionValidator;
import cz.o2.proxima.repository.EntityAwareAttributeDescriptor.Regular;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transaction.KeyAttributes;
import cz.o2.proxima.util.Optionals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class TransactionUniqueConstraintValidator extends TransactionValidator {

private DirectDataOperator op;
private RandomAccessReader reader;
private EntityDescriptor gateway;
private Regular<Integer> intField;

@Override
public void setup(
Repository repo, DirectDataOperator directDataOperator, Map<String, Object> cfg) {
op = directDataOperator;
gateway = repo.getEntity("gateway");
intField = Regular.of(gateway, gateway.getAttribute("intField"));
}

@Override
public void validate(StreamElement element, Transaction transaction)
throws TransactionPreconditionFailedException, TransactionRejectedException {

if (element.getAttributeDescriptor().equals(intField)) {
Optional<Integer> intValue = intField.valueOf(element);
if (intValue.isPresent()) {
List<String> keys = new ArrayList<>();
reader().listEntities(p -> keys.add(p.getSecond()));
for (String k : keys) {
Optional<KeyValue<Integer>> value = reader.get(k, intField);
if (value.isPresent()) {
transaction.update(
Collections.singletonList(KeyAttributes.ofStreamElement(value.get())));
if (value.get().getParsedRequired().equals(intValue.get())) {
throw new TransactionPreconditionFailedException(
String.format(
"Duplicate value %d, first key: %s, duplicate: %s",
value.get().getParsedRequired(), k, element.getKey()));
}
} else {
transaction.update(
Collections.singletonList(KeyAttributes.ofMissingAttribute(gateway, k, intField)));
}
}
}
}
}

@Override
public void close() throws Exception {}

private RandomAccessReader reader() {
if (reader == null) {
reader = Optionals.get(op.getRandomAccess(intField));
}
return reader;
}
}
Loading

0 comments on commit eb5b60b

Please sign in to comment.