From fc454023385c3d9b52252b87d4f166019f1a434e Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 4 Dec 2017 16:16:26 +0100 Subject: [PATCH] [proxima-ingest] add generic entity transformations on write --- .../proxima/repository/EntityDescriptor.java | 4 +- .../repository/EntityDescriptorImpl.java | 2 +- .../cz/o2/proxima/repository/Repository.java | 109 +++++++++++++++++- .../o2/proxima/repository/Transformation.java | 55 +++++++++ .../repository/TransformationDescriptor.java | 95 +++++++++++++++ .../o2/proxima/storage/InMemBulkStorage.java | 1 + .../cz/o2/proxima/storage/InMemStorage.java | 37 ++++-- .../proxima/transform/EventDataToDummy.java | 54 +++++++++ core/src/test/resources/reference.conf | 13 ++- .../example/EventDataToUserHistory.java | 65 +++++++++++ .../model/src/main/resources/reference.conf | 9 ++ .../proxima/storage/kafka/KafkaCommitLog.java | 2 +- maven/src/main/resources/java-source.ftlh | 4 +- .../cz/o2/proxima/server/IngestServer.java | 85 +++++++++++++- .../o2/proxima/server/IngestServiceTest.java | 102 ++++++++-------- .../proxima/server/RetrieveServiceTest.java | 1 + server/src/test/resources/application.conf | 6 +- 17 files changed, 562 insertions(+), 82 deletions(-) create mode 100644 core/src/main/java/cz/o2/proxima/repository/Transformation.java create mode 100644 core/src/main/java/cz/o2/proxima/repository/TransformationDescriptor.java create mode 100644 core/src/test/java/cz/o2/proxima/transform/EventDataToDummy.java create mode 100644 example/model/src/main/java/cz/o2/proxima/example/EventDataToUserHistory.java diff --git a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java index 3a9038cd3..3fbf84ef4 100644 --- a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java +++ b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java @@ -55,9 +55,9 @@ static Builder newBuilder() { /** Name of the entity. */ String getName(); - + /** Find attribute based by name. */ - Optional findAttribute(String name); + Optional> findAttribute(String name); /** List all attribute descriptors of given entity. */ List getAllAttributes(); diff --git a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java index 22660aeac..31cf5af22 100644 --- a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java +++ b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java @@ -65,7 +65,7 @@ public class EntityDescriptorImpl implements EntityDescriptor { /** Find attribute based by name. */ @Override - public Optional findAttribute(String name) { + public Optional> findAttribute(String name) { AttributeDescriptor byName = attributesByName.get(name); if (byName != null) { return Optional.of(byName); diff --git a/core/src/main/java/cz/o2/proxima/repository/Repository.java b/core/src/main/java/cz/o2/proxima/repository/Repository.java index d575011a5..fb9ee0504 100644 --- a/core/src/main/java/cz/o2/proxima/repository/Repository.java +++ b/core/src/main/java/cz/o2/proxima/repository/Repository.java @@ -68,7 +68,6 @@ public static Repository of(Config config) { return Repository.Builder.of(config).build(); } - /** * Builder for repository. */ @@ -123,18 +122,21 @@ public Repository build() { } } + /** * Application configuration. */ @Getter private final Config config; + /** * Classpath reflections scanner. */ @Getter private final Reflections reflections; + /** * When read-only flag is specified, some checks are not performed in construction. * This enables to use the repository inside reader applications that @@ -142,6 +144,7 @@ public Repository build() { */ private final boolean isReadonly; + /** * Flag to indicate if we should validate the scheme with serializer. * Defaults to {@code true}. {@code false} can be used when @@ -150,12 +153,14 @@ public Repository build() { */ private final boolean shouldValidate; + /** * Flag to indicate we should or should not load accessor to column families. * The accessor is not needed mostly in the compiler. */ private final boolean shouldLoadAccessors; + /** * Map of all storage descriptors available. * Key is acceptable scheme of the descriptor. @@ -164,6 +169,7 @@ public Repository build() { **/ private final Map schemeToStorage = new HashMap<>(); + /** * Map of all scheme serializers. * Key is acceptable scheme of the serializer. @@ -179,20 +185,30 @@ public Repository build() { * and then it is read-only. **/ private final Map entitiesByName = new HashMap<>(); + + /** * Map of entities by pattern. * This need not be synchronized because it is only written in constructor * and then it is read-only. **/ private final Map entitiesByPattern; + + /** - * Map of attribute family to list of attributes. + * Map of attribute descriptor to list of families. * This need not be synchronized because it is only written in constructor * and then it is read-only. */ private final Map, Set>> attributeToFamily; + /** + * Map of transformation name to transformation descriptor. + */ + private final Map transformations = new HashMap<>(); + + /** * Construct the repository from the config with the specified read-only and * validation flag. @@ -241,6 +257,8 @@ private Repository( if (loadFamilies) { /* Read attribute families and map them to storages by attribute. */ readAttributeFamilies(cfg); + /* Read transformations from one entity to another. */ + readTransformations(cfg); } if (shouldValidate) { @@ -380,8 +398,8 @@ private void readEntityDescriptors(Config cfg) throws URISyntaxException { @SuppressWarnings("unchecked") private Map toMap(String key, Object value) { if (!(value instanceof Map)) { - throw new IllegalArgumentException("Key " + key + " must " - + "be object got " + throw new IllegalArgumentException( + "Key " + key + " must be object got " + (value != null ? value.getClass().getName() : "(null)")); @@ -560,6 +578,84 @@ private void readAttributeFamilies(Config cfg) { } } + private void readTransformations(Config cfg) { + + if (entitiesByName.isEmpty() && entitiesByPattern.isEmpty()) { + // no loaded entities, no more stuff to read + return; + } + Map transformations = Optional.ofNullable( + cfg.root().get("transformations")) + .map(v -> toMap("transformations", v.unwrapped())) + .orElse(null); + + if (transformations == null) { + LOG.info("Skipping empty transformations configuration."); + return; + } + + transformations.forEach((k, v) -> { + try { + Map transformation = toMap(k, v); + EntityDescriptor entity = findEntity(readStr("entity", transformation, k)) + .orElseThrow(() -> new IllegalArgumentException( + String.format("Entity `%s` doesn't exist", + transformation.get("entity")))); + + Class cls = Classpath.findClass( + readStr("using", transformation, k), Transformation.class); + + List> attrs = readList("attributes", transformation, k) + .stream() + .map(a -> entity.findAttribute(a).orElseThrow( + () -> new IllegalArgumentException( + String.format("Missing attribute `%s` in `%s`", + a, entity)))) + .collect(Collectors.toList()); + + TransformationDescriptor desc = TransformationDescriptor.newBuilder() + .addAttributes(attrs) + .setEntity(entity) + .setTransformationClass(cls) + .build(); + + this.transformations.put(k, desc); + + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); + } + }); + + this.transformations.forEach((k, v) -> v.getTransformation().setup(this)); + + } + + private static String readStr(String key, Map map, String name) { + return Optional.ofNullable(map.get(key)) + .map(Object::toString) + .orElseThrow( + () -> new IllegalArgumentException( + String.format("Missing required field `%s` in `%s`", key, name))); + } + + @SuppressWarnings("unchecked") + private static List readList( + String key, Map map, String name) { + + return Optional.ofNullable(map.get(key)) + .map(v -> { + if (v instanceof List) return (List) v; + throw new IllegalArgumentException( + String.format("Key `%s` in `%s` must be list", key, name)); + }) + .map(l -> l.stream().map(Object::toString).collect(Collectors.toList())) + .orElseThrow(() -> new IllegalArgumentException( + String.format("Missing required field `%s` in `%s", key, name))); + } + + + + @SuppressWarnings("unchecked") private List toList(Object in) { if (in instanceof List) { @@ -624,6 +720,11 @@ public Stream getAllEntities() { entitiesByPattern.values().stream()); } + /** Retrieve all transformers. */ + public Map getTransformations() { + return Collections.unmodifiableMap(transformations); + } + public boolean isEmpty() { return this.entitiesByName.isEmpty() && entitiesByPattern.isEmpty(); } diff --git a/core/src/main/java/cz/o2/proxima/repository/Transformation.java b/core/src/main/java/cz/o2/proxima/repository/Transformation.java new file mode 100644 index 000000000..f5e1f21de --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/Transformation.java @@ -0,0 +1,55 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import cz.o2.proxima.storage.StreamElement; +import java.io.Serializable; + +/** + * A stateless element-wise transformation applied on incoming data + * converting single {@code StreamElement} to another {@code StreamElement}. + */ +public interface Transformation extends Serializable { + + /** + * Collector for outputs. + */ + @FunctionalInterface + interface Collector extends Serializable { + + /** + * Collect transformed value. + */ + void collect(T value); + + } + + /** + * Read the repository and setup descriptors of target entity and attributes. + * @param repo the repository + */ + void setup(Repository repo); + + + /** + * Apply the transformation function. + * @param input the input stream element to transform + * @param collector collector for outputs + * @return transformed stream element + */ + void apply(StreamElement input, Collector collector); + +} diff --git a/core/src/main/java/cz/o2/proxima/repository/TransformationDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/TransformationDescriptor.java new file mode 100644 index 000000000..f7b0b4c61 --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/TransformationDescriptor.java @@ -0,0 +1,95 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import lombok.Getter; + +/** + * Descriptor of single transformation specified in {@code transformations}. + */ +public class TransformationDescriptor implements Serializable { + + static Builder newBuilder() { + return new Builder(); + } + + static class Builder { + + EntityDescriptor entity; + final List> attrs = new ArrayList<>(); + Class transformation; + + Builder setEntity(EntityDescriptor entity) { + this.entity = entity; + return this; + } + + Builder setTransformationClass(Class transformation) { + this.transformation = transformation; + return this; + } + + Builder addAttributes(AttributeDescriptor... attrs) { + Arrays.stream(attrs).forEach(this.attrs::add); + return this; + } + + Builder addAttributes(Iterable> attrs) { + attrs.forEach(this.attrs::add); + return this; + } + + TransformationDescriptor build() { + + Preconditions.checkArgument( + !attrs.isEmpty(), "Please specify at least one attribute"); + Preconditions.checkArgument(transformation != null, + "Please specify transformation function"); + Preconditions.checkArgument(entity != null, + "Please specify source entity"); + + try { + return new TransformationDescriptor(entity, attrs, transformation.newInstance()); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + @Getter + private final EntityDescriptor entity; + @Getter + private final List> attributes; + @Getter + private final Transformation transformation; + + private TransformationDescriptor( + EntityDescriptor entity, + List> attributes, + Transformation transformation) { + + this.entity = Objects.requireNonNull(entity); + this.attributes = Collections.unmodifiableList(attributes); + this.transformation = Objects.requireNonNull(transformation); + } +} diff --git a/core/src/test/java/cz/o2/proxima/storage/InMemBulkStorage.java b/core/src/test/java/cz/o2/proxima/storage/InMemBulkStorage.java index c39182550..e545aef08 100644 --- a/core/src/test/java/cz/o2/proxima/storage/InMemBulkStorage.java +++ b/core/src/test/java/cz/o2/proxima/storage/InMemBulkStorage.java @@ -44,6 +44,7 @@ public void write(StreamElement data, CommitCallback statusCallback) { InMemBulkStorage.this.data.put( getURI().getPath() + "/" + data.getKey() + "#" + data.getAttribute(), data.getValue()); + System.err.println(" *** written " + data); if (++writtenSinceLastCommit >= 10) { statusCallback.commit(true, null); writtenSinceLastCommit = 0; diff --git a/core/src/test/java/cz/o2/proxima/storage/InMemStorage.java b/core/src/test/java/cz/o2/proxima/storage/InMemStorage.java index e5bb335c4..5cf7d8dcd 100644 --- a/core/src/test/java/cz/o2/proxima/storage/InMemStorage.java +++ b/core/src/test/java/cz/o2/proxima/storage/InMemStorage.java @@ -44,6 +44,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import java.io.Serializable; import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; /** @@ -66,17 +67,26 @@ private interface InMemIngestWriter extends Serializable { void write(StreamElement data); } - public final class Writer + public static final class Writer extends AbstractOnlineAttributeWriter { - private Writer(EntityDescriptor entityDesc, URI uri) { + private final Map data; + private final Map observers; + + private Writer( + EntityDescriptor entityDesc, URI uri, + Map data, + Map observers) { + super(entityDesc, uri); + this.data = data; + this.observers = observers; } @Override public void write(StreamElement data, CommitCallback statusCallback) { - InMemStorage.this.data.put( + this.data.put( getURI().getPath() + "/" + data.getKey() + "#" + data.getAttribute(), data.getValue()); @@ -123,7 +133,7 @@ public Cancellable observe( } final int id; synchronized (observers) { - id = observers.isEmpty() ? 0 : observers.lastKey(); + id = observers.isEmpty() ? 0 : observers.lastKey() + 1; observers.put(id, elem -> observer.onNext(elem, () -> 0, (succ, exc) -> { })); } return () -> { @@ -323,25 +333,28 @@ public Offset fetchOffset(Listing type, String key) { @Getter private final NavigableMap data; - private final NavigableMap observers; + + private final Map> observers; public InMemStorage() { super(Arrays.asList("inmem")); this.data = Collections.synchronizedNavigableMap(new TreeMap<>()); - this.observers = Collections.synchronizedNavigableMap(new TreeMap<>()); + this.observers = new ConcurrentHashMap<>(); } @Override public DataAccessor getAccessor( EntityDescriptor entityDesc, URI uri, Map cfg) { - return new DataAccessor() { - - Writer writer = new Writer(entityDesc, uri); - InMemCommitLogReader commitLogReader = new InMemCommitLogReader( - entityDesc, uri, observers); - Reader reader = new Reader(entityDesc, uri, data); + observers.computeIfAbsent(uri, k -> Collections.synchronizedNavigableMap( + new TreeMap<>())); + NavigableMap uriObservers = observers.get(uri); + Writer writer = new Writer(entityDesc, uri, data, uriObservers); + InMemCommitLogReader commitLogReader = new InMemCommitLogReader( + entityDesc, uri, uriObservers); + Reader reader = new Reader(entityDesc, uri, data); + return new DataAccessor() { @Override public Optional getWriter() { return Optional.of(writer); diff --git a/core/src/test/java/cz/o2/proxima/transform/EventDataToDummy.java b/core/src/test/java/cz/o2/proxima/transform/EventDataToDummy.java new file mode 100644 index 000000000..81c011074 --- /dev/null +++ b/core/src/test/java/cz/o2/proxima/transform/EventDataToDummy.java @@ -0,0 +1,54 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.transform; + +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.repository.Transformation; +import cz.o2.proxima.storage.StreamElement; + +/** + * Transform {@code event.data} to {@code dummy.wildcard.}. + */ +public class EventDataToDummy implements Transformation { + + EntityDescriptor target; + AttributeDescriptor targetAttr; + String prefix; + + @Override + public void setup(Repository repo) { + target = repo.findEntity("dummy").orElseThrow( + () -> new IllegalStateException("Missing entity `dummy`")); + targetAttr = target.findAttribute("wildcard.*") + .orElseThrow(() -> new IllegalArgumentException( + "Missing attribute `wildcard.*` in `dummy`")); + prefix = targetAttr.toAttributePrefix(); + } + + + @Override + public void apply( + StreamElement input, Collector collector) { + collector.collect(StreamElement.update( + target, targetAttr, input.getUuid(), + input.getKey(), prefix + input.getStamp(), + input.getStamp(), input.getValue())); + } + + +} diff --git a/core/src/test/resources/reference.conf b/core/src/test/resources/reference.conf index 2db93c0a3..3adc8daae 100644 --- a/core/src/test/resources/reference.conf +++ b/core/src/test/resources/reference.conf @@ -23,8 +23,8 @@ # this is fake attribute that always fails validation fail: { scheme: "fail:whenever" } - # this attribute has bytes scheme, which always succeeds - bytes: { scheme: "bytes" } + + bytes: { scheme: bytes } } } @@ -75,5 +75,14 @@ access: "commit-log, random-access" } } + + transformations { + event-data-to-dummy-wildcard { + entity: event + attributes: [ "data" ] + using: cz.o2.proxima.transform.EventDataToDummy + } + } + } diff --git a/example/model/src/main/java/cz/o2/proxima/example/EventDataToUserHistory.java b/example/model/src/main/java/cz/o2/proxima/example/EventDataToUserHistory.java new file mode 100644 index 000000000..aed0fa8b4 --- /dev/null +++ b/example/model/src/main/java/cz/o2/proxima/example/EventDataToUserHistory.java @@ -0,0 +1,65 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.example; + +import cz.o2.proxima.example.event.Event; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.repository.Transformation; +import cz.o2.proxima.storage.StreamElement; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transformation function from {@code event.data} to {@code user.event.}. + */ +public class EventDataToUserHistory implements Transformation { + + private static final Logger LOG = LoggerFactory.getLogger(EventDataToUserHistory.class); + + EntityDescriptor user; + AttributeDescriptor event; + String prefix; + + @SuppressWarnings("unchecked") + @Override + public void setup(Repository repo) { + user = repo.findEntity("user").orElseThrow( + () -> new IllegalArgumentException("No entity named `user` found")); + event = (AttributeDescriptor) user.findAttribute("event.*").orElseThrow( + () -> new IllegalArgumentException("No attribute `event.*` found in `user`")); + prefix = event.toAttributePrefix(); + } + + @Override + public void apply(StreamElement input, Collector collector) { + if (!input.isDelete()) { + Optional data = input.getParsed(); + if (data.isPresent()) { + collector.collect(StreamElement.update( + user, event, input.getUuid(), + data.get().getUserName(), + prefix + input.getStamp(), + input.getStamp(), input.getValue())); + } + } else { + LOG.warn("Ignored delete in transformed event {}", input); + } + } + +} diff --git a/example/model/src/main/resources/reference.conf b/example/model/src/main/resources/reference.conf index e1917ade9..d30fd4c51 100644 --- a/example/model/src/main/resources/reference.conf +++ b/example/model/src/main/resources/reference.conf @@ -148,3 +148,12 @@ attributeFamilies { } +transformations { + + event-to-user-history { + entity: event + attributes: [ "data" ] + using: cz.o2.proxima.example.EventDataToUserHistory + } + +} diff --git a/kafka/src/main/java/cz/o2/proxima/storage/kafka/KafkaCommitLog.java b/kafka/src/main/java/cz/o2/proxima/storage/kafka/KafkaCommitLog.java index e8f0dffea..6ef043293 100644 --- a/kafka/src/main/java/cz/o2/proxima/storage/kafka/KafkaCommitLog.java +++ b/kafka/src/main/java/cz/o2/proxima/storage/kafka/KafkaCommitLog.java @@ -626,7 +626,7 @@ private void processConsumerWithObserver( } else { String entityKey = key.substring(0, hashPos); String attribute = key.substring(hashPos + 1); - Optional attr = getEntityDescriptor().findAttribute(attribute); + Optional> attr = getEntityDescriptor().findAttribute(attribute); if (!attr.isPresent()) { LOG.error("Invalid attribute in kafka key {}", key); } else { diff --git a/maven/src/main/resources/java-source.ftlh b/maven/src/main/resources/java-source.ftlh index 2111810d7..0588db841 100644 --- a/maven/src/main/resources/java-source.ftlh +++ b/maven/src/main/resources/java-source.ftlh @@ -85,10 +85,10 @@ public class ${java_classname} { <#list entity.attributes as attribute> <#if attribute.wildcard> @SuppressWarnings("unchecked") - private final AttributeDescriptor<${attribute.type}> ${attribute.name}Descriptor = descriptor.findAttribute("${attribute.name}.*").get(); + private final AttributeDescriptor<${attribute.type}> ${attribute.name}Descriptor = (AttributeDescriptor) descriptor.findAttribute("${attribute.name}.*").get(); <#else> @SuppressWarnings("unchecked") - private final AttributeDescriptor<${attribute.type}> ${attribute.name}Descriptor = descriptor.findAttribute("${attribute.name}").get(); + private final AttributeDescriptor<${attribute.type}> ${attribute.name}Descriptor = (AttributeDescriptor) descriptor.findAttribute("${attribute.name}").get(); <#-- attribute.wildcard --> public AttributeDescriptor<${attribute.type}> get${attribute.nameCamel}Descriptor() { diff --git a/server/src/main/java/cz/o2/proxima/server/IngestServer.java b/server/src/main/java/cz/o2/proxima/server/IngestServer.java index 9b2c680e4..a3d9e04b6 100644 --- a/server/src/main/java/cz/o2/proxima/server/IngestServer.java +++ b/server/src/main/java/cz/o2/proxima/server/IngestServer.java @@ -27,6 +27,8 @@ import cz.o2.proxima.repository.AttributeFamilyDescriptor; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.repository.Transformation; +import cz.o2.proxima.repository.TransformationDescriptor; import cz.o2.proxima.server.metrics.Metrics; import cz.o2.proxima.storage.AttributeWriterBase; import cz.o2.proxima.storage.BulkAttributeWriter; @@ -44,6 +46,7 @@ import cz.o2.proxima.storage.randomaccess.RandomAccessReader; import cz.o2.proxima.util.Pair; import cz.seznam.euphoria.shaded.guava.com.google.common.base.Strings; +import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; @@ -90,7 +93,6 @@ public static void main(String[] args) throws Exception { server.run(); } - /** * The ingestion service. **/ @@ -495,7 +497,7 @@ private boolean writeRequest( "Entity " + request.getEntity() + " not found")); return false; } - Optional attr = entity.get().findAttribute( + Optional> attr = entity.get().findAttribute( request.getAttribute()); if (!attr.isPresent()) { consumer.accept(notFound(request.getUuid(), @@ -679,6 +681,85 @@ protected void startConsumerThreads() throws InterruptedException { } } }); + + // execute transformer threads + repo.getTransformations().forEach((k, v) -> { + runTransformer(k, v); + }); + } + + private void runTransformer(String name, TransformationDescriptor transform) { + AttributeFamilyDescriptor family = transform.getAttributes() + .stream() + .map(a -> this.repo.getFamiliesForAttribute(a) + .stream().filter(af -> af.getAccess().canReadCommitLog()) + .collect(Collectors.toSet())) + .reduce(Sets::intersection) + .filter(s -> !s.isEmpty()) + .map(s -> s.stream().filter(f -> f.getCommitLogReader().isPresent()) + .findAny().orElse(null)) + .filter(af -> af != null) + .orElseThrow(() -> new IllegalArgumentException( + "Cannot obtain attribute family for " + transform.getAttributes())); + + Transformation f = transform.getTransformation(); + final String consumer = "transformer-" + name; + CommitLogReader reader = family.getCommitLogReader().get(); + + new RetryableLogObserver(3, consumer, reader) { + + @Override + protected void failure() { + LOG.error("Failed to transform using {}. Bailing out.", f); + System.exit(1); + } + + @Override + public boolean onNextInternal( + StreamElement ingest, LogObserver.ConfirmCallback confirm) { + + // add one to prevent confirmation before all elements + // are processed + AtomicInteger toConfirm = new AtomicInteger(1); + try { + Transformation.Collector collector = elem -> { + toConfirm.incrementAndGet(); + try { + LOG.info("Writing transformed element {}", elem); + ingestRequest( + elem, elem.getUuid(), rpc -> { + if (rpc.getStatus() == 200) { + if (toConfirm.decrementAndGet() == 0) { + confirm.confirm(); + } + } else { + toConfirm.set(-1); + confirm.fail(new RuntimeException( + String.format("Received invalid status %d:%s", + rpc.getStatus(), rpc.getStatusMessage()))); + } + }); + } catch (Exception ex) { + toConfirm.set(-1); + confirm.fail(ex); + } + }; + f.apply(ingest, collector); + if (toConfirm.decrementAndGet() == 0) { + confirm.confirm(); + } + } catch (Exception ex) { + toConfirm.set(-1); + confirm.fail(ex); + } + return true; + } + + + }.start(); + LOG.info( + "Started transformer {} reading from {} using {}", + consumer, reader.getURI(), f.getClass()); } /** diff --git a/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java b/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java index 4a80ce342..5b5a04a27 100644 --- a/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java +++ b/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java @@ -75,7 +75,6 @@ public void onCompleted() { latch.countDown(); } }; - } @@ -359,26 +358,7 @@ public void testIngestValid() throws InterruptedException { .setValue(ByteString.EMPTY) .build(); - ingest.ingest(request, new StreamObserver() { - - @Override - public void onNext(Rpc.Status status) { - responses.add(status); - } - - @Override - public void onError(Throwable thrwbl) { - // nop - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }); - - latch.await(); + flushToIngest(request); assertEquals(1, responses.size()); Rpc.Status status = responses.poll(); @@ -402,26 +382,7 @@ public void testIngestValidBulk() throws InterruptedException { .setValue(ByteString.EMPTY) .build(); - ingest.ingest(request, new StreamObserver() { - - @Override - public void onNext(Rpc.Status status) { - responses.add(status); - } - - @Override - public void onError(Throwable thrwbl) { - // nop - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }); - - latch.await(); + flushToIngest(request); assertEquals(1, responses.size()); Rpc.Status status = responses.poll(); @@ -433,7 +394,6 @@ public void onCompleted() { assertTrue(data.containsKey("/proxima_events/bulk/my-dummy-entity#data")); } - @Test(timeout = 2000) public void testIngestValidExtendedScheme() throws InterruptedException, InvalidProtocolBufferException { @@ -450,6 +410,53 @@ public void testIngestValidExtendedScheme() .setValue(payload.toByteString()) .build(); + flushToIngest(request); + + assertEquals(1, responses.size()); + Rpc.Status status = responses.poll(); + assertEquals(200, status.getStatus()); + + InMemStorage storage = (InMemStorage) server.repo.getStorageDescriptor("inmem"); + Map data = storage.getData(); + assertEquals(2, data.size()); + byte[] value = data.get("/test_inmem/my-dummy-entity#data"); + assertTrue(value != null); + assertEquals(payload, ExtendedMessage.parseFrom(value)); + value = data.get("/test_inmem/random/my-dummy-entity#data"); + assertTrue(value != null); + assertEquals(payload, ExtendedMessage.parseFrom(value)); + } + + @Test(timeout = 2000) + public void testTransform() throws InterruptedException { + // write event.data and check that we receive write to dummy.wildcard. + long now = System.currentTimeMillis(); + Rpc.Ingest request = Rpc.Ingest.newBuilder() + .setEntity("event") + .setAttribute("data") + .setUuid(UUID.randomUUID().toString()) + .setKey("my-dummy-entity") + .setValue(ByteString.EMPTY) + .setStamp(now) + .build(); + + flushToIngest(request); + + assertEquals(1, responses.size()); + Rpc.Status status = responses.poll(); + assertEquals(200, status.getStatus()); + + InMemStorage storage = (InMemStorage) server.repo.getStorageDescriptor("inmem"); + Map data = storage.getData(); + assertEquals(2, data.size()); + byte[] value = data.get("/proxima_events/my-dummy-entity#data"); + assertTrue(value != null); + value = data.get("/proxima/dummy/my-dummy-entity#wildcard." + now); + assertTrue(value != null); + } + + private void flushToIngest(Rpc.Ingest request) throws InterruptedException { + ingest.ingest(request, new StreamObserver() { @Override @@ -470,17 +477,6 @@ public void onCompleted() { }); latch.await(); - - assertEquals(1, responses.size()); - Rpc.Status status = responses.poll(); - assertEquals(200, status.getStatus()); - - InMemStorage storage = (InMemStorage) server.repo.getStorageDescriptor("inmem"); - Map data = storage.getData(); - assertEquals(1, data.size()); - byte[] value = data.get("/test_inmem/my-dummy-entity#data"); - assertTrue(value != null); - assertEquals(payload, ExtendedMessage.parseFrom(value)); } } diff --git a/server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java b/server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java index 4da0a2ac4..d255233a4 100644 --- a/server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java +++ b/server/src/test/java/cz/o2/proxima/server/RetrieveServiceTest.java @@ -44,6 +44,7 @@ public class RetrieveServiceTest { public void setup() throws InterruptedException { server = new IngestServer(ConfigFactory.load().resolve()); retrieve = server.new RetrieveService(); + server.startConsumerThreads(); } diff --git a/server/src/test/resources/application.conf b/server/src/test/resources/application.conf index 70a10b9dc..a40c30dff 100644 --- a/server/src/test/resources/application.conf +++ b/server/src/test/resources/application.conf @@ -15,16 +15,16 @@ type: primary access: commit-log } - } - attributeFamilies: { test-storage-random { entity: test attributes: [ data ] - storage: "inmem:///test_inmem" + storage: "inmem:///test_inmem/random" type: replica access: random-access } + } + }