diff --git a/.gitignore b/.gitignore index a6dc1dcb6..f22e23a58 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ pom.xml.versionsBackup /messaging/server/nbproject/ /smartbox/performance-test/target/ *.iml +*.log .idea server/log **/*_pb2.py diff --git a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/CassandraDBAccessorTest.java b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/CassandraDBAccessorTest.java index 7170c19a5..d407fb62e 100644 --- a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/CassandraDBAccessorTest.java +++ b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/CassandraDBAccessorTest.java @@ -24,6 +24,7 @@ import com.datastax.driver.core.Statement; import com.typesafe.config.ConfigFactory; import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.AttributeDescriptorBase; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; import cz.o2.proxima.storage.Partition; @@ -201,8 +202,8 @@ public BoundStatement scanPartition( Repository repo = Repository.Builder.ofTest(ConfigFactory.defaultApplication()).build(); - AttributeDescriptor attr; - AttributeDescriptor attrWildcard; + AttributeDescriptorBase attr; + AttributeDescriptorBase attrWildcard; EntityDescriptor entity; public CassandraDBAccessorTest() throws URISyntaxException { diff --git a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/DefaultCQLFactoryTest.java b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/DefaultCQLFactoryTest.java index 1eafe8767..a3d3df8df 100644 --- a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/DefaultCQLFactoryTest.java +++ b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/DefaultCQLFactoryTest.java @@ -22,6 +22,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.AttributeDescriptorBase; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; import cz.o2.proxima.storage.StreamElement; @@ -47,8 +48,8 @@ public class DefaultCQLFactoryTest { final Config cfg = ConfigFactory.defaultApplication(); final Repository repo = Repository.Builder.ofTest(cfg).build(); - final AttributeDescriptor attr; - final AttributeDescriptor attrWildcard; + final AttributeDescriptorBase attr; + final AttributeDescriptorBase attrWildcard; final EntityDescriptor entity; final PreparedStatement statement = mock(PreparedStatement.class); final Session session = mock(Session.class); diff --git a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/TransformingCQLFactoryTest.java b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/TransformingCQLFactoryTest.java index f83bc74e1..ae2937916 100644 --- a/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/TransformingCQLFactoryTest.java +++ b/cassandra/src/test/java/cz/o2/proxima/storage/cassandra/TransformingCQLFactoryTest.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.Session; import com.typesafe.config.ConfigFactory; import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.AttributeDescriptorBase; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; import cz.o2.proxima.storage.StreamElement; @@ -44,7 +45,7 @@ public class TransformingCQLFactoryTest { Repository repo = Repository.Builder.ofTest(ConfigFactory.defaultApplication()).build(); - AttributeDescriptor attr; + AttributeDescriptorBase attr; EntityDescriptor entity; final List statements = new ArrayList<>(); diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java index e5692fe65..088dd0ac0 100644 --- a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptor.java @@ -30,7 +30,6 @@ */ public interface AttributeDescriptor extends Serializable { - class Builder { private final Repository repo; @@ -52,7 +51,7 @@ private Builder(Repository repo) { private URI schemeURI; @SuppressWarnings("unchecked") - public AttributeDescriptor build() { + public AttributeDescriptorImpl build() { Objects.requireNonNull(name, "Please specify name"); Objects.requireNonNull(entity, "Please specify entity"); Objects.requireNonNull(schemeURI, "Please specify scheme URI"); @@ -66,6 +65,14 @@ static Builder newBuilder(Repository repo) { return new Builder(repo); } + static AttributeDescriptorBase newProxy( + String name, + AttributeDescriptorBase target, + ProxyTransform transform) { + + return new AttributeProxyDescriptorImpl<>(name, target, transform); + } + /** Retrieve name of the attribute. */ String getName(); @@ -101,4 +108,10 @@ default String toAttributePrefix() { */ ValueSerializer getValueSerializer(); + /** + * Marker if this is a public attribute. + * @return {@code true} it this is public attribute + */ + boolean isPublic(); + } diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorBase.java b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorBase.java new file mode 100644 index 000000000..d3a9cf736 --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorBase.java @@ -0,0 +1,116 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import cz.o2.proxima.scheme.ValueSerializer; +import cz.o2.proxima.storage.OnlineAttributeWriter; +import java.net.URI; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; + +/** + * Base class for {@link AttributeDescriptorImpl} and {@link AttributeProxyDescriptorImpl}. + */ +public abstract class AttributeDescriptorBase implements AttributeDescriptor { + + @Getter + protected final String entity; + + @Getter + protected final String name; + + @Getter + protected final URI schemeURI; + + @Getter + protected final boolean proxy; + + @Getter + protected final boolean wildcard; + + @Getter + protected final ValueSerializer valueSerializer; + + @Getter + @Setter + protected OnlineAttributeWriter writer = null; + + public AttributeDescriptorBase( + String name, String entity, URI schemeURI, + ValueSerializer valueSerializer) { + + this.name = Objects.requireNonNull(name); + this.entity = Objects.requireNonNull(entity); + this.schemeURI = Objects.requireNonNull(schemeURI); + this.wildcard = this.name.endsWith(".*"); + this.proxy = false; + this.valueSerializer = Objects.requireNonNull(valueSerializer); + if (this.wildcard) { + if (name.length() < 3 + || name.substring(0, name.length() - 1).contains("*") + || name.charAt(name.length() - 2) != '.') { + + throw new IllegalArgumentException( + "Please specify wildcard attributes only in the format `.*; for now. " + + "That is - wildcard attributes can contain only single asterisk " + + "right after a dot at the end of the attribute name. " + + "This is implementation constraint for now."); + } + } + } + + public AttributeDescriptorBase(String name, AttributeDescriptorBase target) { + this.name = Objects.requireNonNull(name); + this.entity = target.getEntity(); + this.schemeURI = target.getSchemeURI(); + this.proxy = true; + this.wildcard = target.isWildcard(); + this.valueSerializer = target.getValueSerializer(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AttributeDescriptor) { + AttributeDescriptor other = (AttributeDescriptor) obj; + return Objects.equals(other.getEntity(), entity) && Objects.equals(other.getName(), name); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(entity, name); + } + + /** + * Retrieve name of the attribute if not wildcard, otherwise + * retrieve the prefix without the last asterisk. + */ + @Override + public String toAttributePrefix(boolean includeLastDot) { + if (isWildcard()) { + return name.substring(0, name.length() - (includeLastDot ? 1 : 2)); + } + return name; + } + + @Override + public boolean isPublic() { + return !name.startsWith("_"); + } + +} diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorImpl.java b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorImpl.java index 9e85878f4..059a2ffde 100644 --- a/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorImpl.java +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeDescriptorImpl.java @@ -17,91 +17,26 @@ package cz.o2.proxima.repository; import java.net.URI; -import java.util.Objects; -import lombok.Getter; -import cz.o2.proxima.storage.OnlineAttributeWriter; import cz.o2.proxima.scheme.ValueSerializer; /** * Descriptor of attribute of entity. */ -public class AttributeDescriptorImpl implements AttributeDescriptor { - - @Getter - private final String name; - - @Getter - private final boolean isWildcard; - - @Getter - private final URI schemeURI; - - @Getter - private final ValueSerializer valueSerializer; - - @Getter - private final String entity; - - @Getter - private OnlineAttributeWriter writer; +public class AttributeDescriptorImpl + extends AttributeDescriptorBase { AttributeDescriptorImpl( String name, String entity, - URI schemeURI, ValueSerializer parser) { - - this.name = Objects.requireNonNull(name); - this.schemeURI = Objects.requireNonNull(schemeURI); - this.valueSerializer = Objects.requireNonNull(parser); - this.entity = Objects.requireNonNull(entity); - this.isWildcard = this.name.contains("*"); - if (this.isWildcard) { - if (name.length() < 3 - || name.substring(0, name.length() - 1).contains("*") - || name.charAt(name.length() - 2) != '.') { - - throw new IllegalArgumentException( - "Please specify wildcard attributes only in the format `.*; for now. " - + "That is - wildcard attributes can contain only single asterisk " - + "right after a dot at the end of the attribute name. " - + "This is implementation constraint for now."); - } - } - } - - public void setWriter(OnlineAttributeWriter writer) { - this.writer = writer; - } + URI schemeURI, ValueSerializer serializer) { - @Override - public boolean equals(Object obj) { - if (obj instanceof AttributeDescriptor) { - AttributeDescriptor other = (AttributeDescriptor) obj; - return Objects.equals(other.getEntity(), entity) - && Objects.equals(other.getName(), name); - } - return false; + super(name, entity, schemeURI, serializer); } - @Override - public int hashCode() { - return Objects.hash(entity, name); - } @Override public String toString() { return "AttributeDescriptor(entity=" + entity + ", name=" + name + ")"; } - /** - * Retrieve name of the attribute if not wildcard, otherwise - * retrieve the prefix without the last asterisk. - */ - @Override - public String toAttributePrefix(boolean includeLastDot) { - if (isWildcard()) { - return name.substring(0, name.length() - (includeLastDot ? 1 : 2)); - } - return name; - } } diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyDescriptor.java index f18dd6f38..09eb6f9c7 100644 --- a/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyDescriptor.java +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyDescriptor.java @@ -38,7 +38,7 @@ /** * A family of attributes with the same storage. */ -public class AttributeFamilyDescriptor { +public class AttributeFamilyDescriptor { public static final class Builder { @@ -82,13 +82,13 @@ public static final class Builder { private Builder() { } - public Builder addAttribute(AttributeDescriptor desc) { + public Builder addAttribute(AttributeDescriptor desc) { attributes.add(desc); return this; } - public AttributeFamilyDescriptor build() { - return new AttributeFamilyDescriptor<>( + public AttributeFamilyDescriptor build() { + return new AttributeFamilyDescriptor( name, type, attributes, writer, commitLog, batchObservable, randomAccess, partitionedView, access, filter); } @@ -119,7 +119,7 @@ public static Builder newBuilder() { * Writer associated with this attribute family. */ @Nullable - private final W writer; + private final AttributeWriterBase writer; @Nullable private final CommitLogReader commitLogReader; @@ -133,10 +133,10 @@ public static Builder newBuilder() { @Nullable private final PartitionedView partitionedView; - private AttributeFamilyDescriptor(String name, + AttributeFamilyDescriptor(String name, StorageType type, List> attributes, - @Nullable W writer, + @Nullable AttributeWriterBase writer, @Nullable CommitLogReader commitLogReader, @Nullable BatchLogObservable batchObservable, @Nullable RandomAccessReader randomAccess, @@ -183,9 +183,10 @@ public int hashCode() { * Retrieve writer for this family. * Empty if this family is not writable */ - public Optional getWriter() { + public Optional getWriter() { if (!access.isReadonly()) { - return Optional.of(Objects.requireNonNull(writer)); + return Optional.of(Objects.requireNonNull( + writer, "Family " + name + " is not readonly, but has no writer")); } return Optional.empty(); } @@ -196,7 +197,8 @@ public Optional getWriter() { */ public Optional getCommitLogReader() { if (access.canReadCommitLog()) { - return Optional.of(Objects.requireNonNull(commitLogReader)); + return Optional.of(Objects.requireNonNull( + commitLogReader, "Family " + name + " doesn't have commit-log reader")); } return Optional.empty(); } @@ -206,7 +208,8 @@ public Optional getCommitLogReader() { */ public Optional getBatchObservable() { if (access.canReadBatchSnapshot() || access.canReadBatchUpdates()) { - return Optional.of(Objects.requireNonNull(batchObservable)); + return Optional.of(Objects.requireNonNull( + batchObservable, "Family " + name + " doesn't have batch observable")); } return Optional.empty(); } @@ -218,7 +221,8 @@ public Optional getBatchObservable() { */ public Optional getRandomAccessReader() { if (access.canRandomRead()) { - return Optional.of(Objects.requireNonNull(randomAccess)); + return Optional.of(Objects.requireNonNull( + randomAccess, "Family " + name + " doesn't have random access reader")); } return Optional.empty(); } @@ -229,7 +233,8 @@ public Optional getRandomAccessReader() { */ public Optional getPartitionedView() { if (access.canCreatePartitionedView()) { - return Optional.of(Objects.requireNonNull(partitionedView)); + return Optional.of(Objects.requireNonNull( + partitionedView, "Family " + name + " doesn't have partitioned view")); } return Optional.empty(); } diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyProxyDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyProxyDescriptor.java new file mode 100644 index 000000000..fe1d1d81e --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeFamilyProxyDescriptor.java @@ -0,0 +1,482 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import cz.o2.proxima.storage.AccessType; +import cz.o2.proxima.storage.AttributeWriterBase; +import cz.o2.proxima.storage.CommitCallback; +import cz.o2.proxima.storage.OnlineAttributeWriter; +import cz.o2.proxima.storage.Partition; +import cz.o2.proxima.storage.StorageType; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.storage.batch.BatchLogObservable; +import cz.o2.proxima.storage.batch.BatchLogObserver; +import cz.o2.proxima.storage.commitlog.BulkLogObserver; +import cz.o2.proxima.storage.commitlog.Cancellable; +import cz.o2.proxima.storage.commitlog.CommitLogReader; +import cz.o2.proxima.storage.commitlog.LogObserver; +import cz.o2.proxima.storage.randomaccess.KeyValue; +import cz.o2.proxima.storage.randomaccess.RandomAccessReader; +import cz.o2.proxima.util.Pair; +import cz.o2.proxima.view.PartitionedLogObserver; +import cz.o2.proxima.view.PartitionedView; +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.flow.Flow; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Proxy attribute family applying transformations of attributes + * to and from private space to public space. + */ +class AttributeFamilyProxyDescriptor extends AttributeFamilyDescriptor { + + AttributeFamilyProxyDescriptor( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + super( + "proxy::" + targetAttribute.getName() + "::" + targetFamily.getName(), + targetFamily.getType(), + Arrays.asList(targetAttribute), getWriter(targetAttribute, targetFamily), + getCommitLogReader(targetAttribute, targetFamily), + getBatchObservable(targetAttribute, targetFamily), + getRandomAccess(targetAttribute, targetFamily), + getPartitionedView(targetAttribute, targetFamily), + targetFamily.getType() == StorageType.PRIMARY + ? targetFamily.getAccess() + : AccessType.or(targetFamily.getAccess(), AccessType.from("read-only")), + targetFamily.getFilter()); + } + + private static OnlineAttributeWriter getWriter( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + + Optional w = targetFamily.getWriter(); + if (!w.isPresent() || !(w.get() instanceof OnlineAttributeWriter)) { + return null; + } + OnlineAttributeWriter writer = w.get().online(); + return new OnlineAttributeWriter() { + + @Override + public void rollback() { + writer.rollback(); + } + + @Override + public void write(StreamElement data, CommitCallback statusCallback) { + writer.write( + transformToRaw(data, targetAttribute), + statusCallback); + } + + @Override + public URI getURI() { + return writer.getURI(); + } + + }; + } + + private static CommitLogReader getCommitLogReader( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + + Optional target = targetFamily.getCommitLogReader(); + if (!target.isPresent()) { + return null; + } + CommitLogReader reader = target.get(); + return new CommitLogReader() { + + @Override + public URI getURI() { + return reader.getURI(); + } + + @Override + public List getPartitions() { + return reader.getPartitions(); + } + + @Override + public Cancellable observe( + String name, + CommitLogReader.Position position, LogObserver observer) { + + return reader.observe( + name, position, wrapTransformed(targetAttribute, observer)); + } + + @Override + public Cancellable observePartitions( + Collection partitions, CommitLogReader.Position position, + boolean stopAtCurrent, LogObserver observer) { + + return reader.observePartitions( + partitions, position, stopAtCurrent, + wrapTransformed(targetAttribute, observer)); + } + + @Override + public Cancellable observeBulk( + String name, CommitLogReader.Position position, + BulkLogObserver observer) { + + return reader.observeBulk(name, position, wrapTransformed(observer)); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + }; + } + + + private static BatchLogObservable getBatchObservable( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + + Optional target = targetFamily.getBatchObservable(); + if (!target.isPresent()) { + return null; + } + BatchLogObservable reader = target.get(); + return new BatchLogObservable() { + + @Override + public List getPartitions(long startStamp, long endStamp) { + return reader.getPartitions(startStamp, endStamp); + } + + @Override + public void observe( + List partitions, + List> attributes, + BatchLogObserver observer) { + + reader.observe(partitions, attributes, wrapTransformed(observer)); + } + + }; + } + + private static RandomAccessReader getRandomAccess( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + + Optional target = targetFamily.getRandomAccessReader(); + if (!target.isPresent()) { + return null; + } + RandomAccessReader reader = target.get(); + return new RandomAccessReader() { + + @Override + public RandomAccessReader.Offset fetchOffset( + RandomAccessReader.Listing type, String key) { + + if (type == Listing.ATTRIBUTE) { + return reader.fetchOffset( + type, targetAttribute.getTransform().fromProxy(key)); + } + return reader.fetchOffset(type, key); + } + + @Override + public Optional> get( + String key, String attribute, AttributeDescriptor desc) { + + ProxyTransform transform = targetAttribute.getTransform(); + return reader.get(key, transform.fromProxy(attribute), desc) + .map(kv -> transformToProxy(kv, targetAttribute)); + } + + @Override + public void scanWildcard( + String key, AttributeDescriptor wildcard, + RandomAccessReader.Offset offset, int limit, Consumer> consumer) { + + if (!targetAttribute.isWildcard()) { + throw new IllegalArgumentException( + "Proxy target is not wildcard attribute!"); + } + reader.scanWildcard(key, targetAttribute.getTarget(), offset, limit, kv -> { + consumer.accept(transformToProxy(kv, targetAttribute)); + }); + } + + @Override + public void listEntities( + RandomAccessReader.Offset offset, int limit, + Consumer> consumer) { + + } + + @Override + public void close() throws IOException { + } + + }; + } + + private static PartitionedView getPartitionedView( + AttributeProxyDescriptorImpl targetAttribute, + AttributeFamilyDescriptor targetFamily) { + + Optional target = targetFamily.getPartitionedView(); + if (!target.isPresent()) { + return null; + } + PartitionedView view = target.get(); + return new PartitionedView() { + + @Override + public List getPartitions() { + return view.getPartitions(); + } + + @Override + public Dataset observePartitions( + Flow flow, + Collection partitions, + PartitionedLogObserver observer) { + + return view.observePartitions(flow, partitions, wrapTransformed( + targetAttribute, observer)); + } + + @Override + public Dataset observe( + Flow flow, String name, PartitionedLogObserver observer) { + + return view.observe(flow, name, wrapTransformed( + targetAttribute, observer)); + } + + }; + } + + private static LogObserver wrapTransformed( + AttributeProxyDescriptorImpl proxy, + LogObserver observer) { + + return new LogObserver() { + + @Override + public boolean onNext(StreamElement ingest, LogObserver.ConfirmCallback confirm) { + return observer.onNext( + transformToProxy(ingest, proxy), confirm); + } + + @Override + public boolean onNext( + StreamElement ingest, + Partition partition, + LogObserver.ConfirmCallback confirm) { + + return observer.onNext( + transformToProxy(ingest, proxy), + partition, confirm); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onCancelled() { + observer.onCancelled(); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public void close() throws Exception { + observer.close(); + } + + }; + } + + static BulkLogObserver wrapTransformed(BulkLogObserver observer) { + return new BulkLogObserver() { + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public boolean onNext( + StreamElement ingest, + Partition partition, + BulkLogObserver.BulkCommitter confirm) { + + return observer.onNext(ingest, partition, confirm); + } + + @Override + public void onRestart() { + observer.onRestart(); + } + + @Override + public void onCancelled() { + observer.onCancelled(); + } + + @Override + public void close() throws Exception { + observer.close(); + } + + }; + } + + + static BatchLogObserver wrapTransformed(BatchLogObserver observer) { + return new BatchLogObserver() { + + @Override + public boolean onNext( + StreamElement ingest, + Partition partition) { + + return observer.onNext(ingest, partition); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + }; + } + + + private static PartitionedLogObserver wrapTransformed( + AttributeProxyDescriptorImpl target, PartitionedLogObserver observer) { + + return new PartitionedLogObserver() { + + @Override + public void onRepartition(Collection assigned) { + observer.onRepartition(assigned); + } + + @Override + public boolean onNext( + StreamElement ingest, + PartitionedLogObserver.ConfirmCallback confirm, + Partition partition, + PartitionedLogObserver.Consumer collector) { + + return observer.onNext(transformToProxy(ingest, target), + confirm, partition, collector); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + }; + } + + private static StreamElement transformToRaw( + StreamElement data, + AttributeProxyDescriptorImpl targetDesc) { + + return transform(data, + targetDesc.getTarget(), + targetDesc.getTransform()::fromProxy); + } + + private static StreamElement transformToProxy( + StreamElement data, + AttributeProxyDescriptorImpl targetDesc) { + + return transform(data, targetDesc, targetDesc.getTransform()::toProxy); + } + + private static KeyValue transformToProxy( + KeyValue kv, + AttributeProxyDescriptorImpl targetDesc) { + + return KeyValue.of( + kv.getEntityDescriptor(), + targetDesc, kv.getKey(), + targetDesc.getTransform().toProxy(kv.getAttribute()), + kv.getOffset(), kv.getValueBytes()); + } + + private static StreamElement transform( + StreamElement data, + AttributeDescriptor target, + Function transform) { + + if (data.isDelete()) { + if (data.isDeleteWildcard()) { + return StreamElement.deleteWildcard( + data.getEntityDescriptor(), + target, data.getUuid(), data.getKey(), data.getStamp()); + } else { + return StreamElement.delete( + data.getEntityDescriptor(), target, + data.getUuid(), data.getKey(), + transform.apply(data.getAttribute()), + data.getStamp()); + } + } + return StreamElement.update(data.getEntityDescriptor(), + target, data.getUuid(), data.getKey(), + transform.apply(data.getAttribute()), + data.getStamp(), data.getValue()); + } + + + + +} diff --git a/core/src/main/java/cz/o2/proxima/repository/AttributeProxyDescriptorImpl.java b/core/src/main/java/cz/o2/proxima/repository/AttributeProxyDescriptorImpl.java new file mode 100644 index 000000000..115114106 --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/AttributeProxyDescriptorImpl.java @@ -0,0 +1,50 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import lombok.Getter; + +/** + * Proxy to another attribute. + */ +class AttributeProxyDescriptorImpl + extends AttributeDescriptorBase { + + @Getter + private final AttributeDescriptorBase target; + + @Getter + private final ProxyTransform transform; + + AttributeProxyDescriptorImpl( + String name, + AttributeDescriptorBase target, + ProxyTransform transform) { + + super(name, target); + this.target = target; + this.transform = transform; + } + + @Override + public String toString() { + return "AttributeProxyDescriptorImpl(" + + "target=" + target + + ", name=" + name + + ")"; + } + +} diff --git a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java index 3fbf84ef4..b51918265 100644 --- a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java +++ b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptor.java @@ -36,9 +36,9 @@ class Builder { @Accessors(chain = true) private String name; - List attributes = new ArrayList<>(); + private final List> attributes = new ArrayList<>(); - public Builder addAttribute(AttributeDescriptor attr) { + public Builder addAttribute(AttributeDescriptorBase attr) { attributes.add(attr); return this; } @@ -46,6 +46,14 @@ public Builder addAttribute(AttributeDescriptor attr) { public EntityDescriptor build() { return new EntityDescriptorImpl(name, Collections.unmodifiableList(attributes)); } + + AttributeDescriptorBase findAttribute(String attr) { + return attributes.stream().filter(a -> a.getName() + .equals(attr)).findAny() + .orElseThrow(() -> new IllegalArgumentException( + "Cannot find attribute " + attr + " of entity " + this.name)); + } + } static Builder newBuilder() { @@ -56,10 +64,37 @@ static Builder newBuilder() { /** Name of the entity. */ String getName(); - /** Find attribute based by name. */ - Optional> findAttribute(String name); + /** + * Find attribute by name. + * @param name name of the attribute to search for + * @param includeProtected {@code true} to allow search for protected fields (prefixed by _). + * @return optional found attribute descriptor + */ + Optional> findAttribute(String name, boolean includeProtected); - /** List all attribute descriptors of given entity. */ - List getAllAttributes(); + /** + * Find attribute by name. + * Do not search protected fields (prefixed by _). + * @param name name of the attribute to search for + * @return optional found attribute descriptor + */ + default Optional> findAttribute(String name) { + return findAttribute(name, false); + } + + /** + * Find all attributes of this entity. + * @param includeProtected when {@code true} then protected attributes are + * also included (prefixed by _). + * @return + */ + List> getAllAttributes(boolean includeProtected); + + /** + * List all attribute descriptors of given entity. + */ + default List> getAllAttributes() { + return getAllAttributes(false); + } } diff --git a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java index 31cf5af22..1ced96679 100644 --- a/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java +++ b/core/src/main/java/cz/o2/proxima/repository/EntityDescriptorImpl.java @@ -37,19 +37,19 @@ public class EntityDescriptorImpl implements EntityDescriptor { private final String name; /** List of all attribute descriptors. */ - private final List attributes; + private final List> attributes; /** Map of attributes by name. */ - private final Map attributesByName; + private final Map> attributesByName; /** Map of attributes by pattern. */ - private final Map attributesByPattern; + private final Map> attributesByPattern; - EntityDescriptorImpl(String name, List attrs) { + EntityDescriptorImpl(String name, List> attrs) { this.name = Objects.requireNonNull(name); this.attributes = Collections.unmodifiableList(Objects.requireNonNull(attrs)); - List fullyQualified = attrs.stream() + List> fullyQualified = attrs.stream() .filter(a -> !a.isWildcard()) .collect(Collectors.toList()); @@ -65,24 +65,34 @@ public class EntityDescriptorImpl implements EntityDescriptor { /** Find attribute based by name. */ @Override - public Optional> findAttribute(String name) { - AttributeDescriptor byName = attributesByName.get(name); - if (byName != null) { - return Optional.of(byName); - } - for (Map.Entry e : attributesByPattern.entrySet()) { - if (e.getKey().matches(name)) { - return Optional.of(e.getValue()); + public Optional> findAttribute( + String name, boolean includeProtected) { + + AttributeDescriptor found = attributesByName.get(name); + if (found == null) { + for (Map.Entry> e : attributesByPattern.entrySet()) { + if (e.getKey().matches(name)) { + found = e.getValue(); + break; + } } } + if (found != null && (includeProtected || found.isPublic())) { + return Optional.of(found); + } return Optional.empty(); } /** List all attribute descriptors of given entity. */ @Override - public List getAllAttributes() { - return attributes; + public List> getAllAttributes(boolean includeProtected) { + if (includeProtected) { + return Collections.unmodifiableList(attributes); + } + return attributes.stream() + .filter(a -> a.isPublic()) + .collect(Collectors.toList()); } @Override diff --git a/core/src/main/java/cz/o2/proxima/repository/ProxyTransform.java b/core/src/main/java/cz/o2/proxima/repository/ProxyTransform.java new file mode 100644 index 000000000..256d533a7 --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/repository/ProxyTransform.java @@ -0,0 +1,39 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.repository; + +import java.io.Serializable; + +/** + * A transformation of attribute name applied both on reading and writing attribute. + */ +public interface ProxyTransform extends Serializable { + + /** + * Apply transformation to attribute name from proxy naming. + * @param proxy name of the attribute in proxy namespace + * @return the raw attribute + */ + String fromProxy(String proxy); + + /** + * Apply transformation to attribute name to proxy naming. + * @param raw the raw attribute name + * @return the proxy attribute name + */ + String toProxy(String raw); + +} diff --git a/core/src/main/java/cz/o2/proxima/repository/Repository.java b/core/src/main/java/cz/o2/proxima/repository/Repository.java index fb9ee0504..af311a916 100644 --- a/core/src/main/java/cz/o2/proxima/repository/Repository.java +++ b/core/src/main/java/cz/o2/proxima/repository/Repository.java @@ -200,7 +200,7 @@ public Repository build() { * This need not be synchronized because it is only written in constructor * and then it is read-only. */ - private final Map, Set>> attributeToFamily; + private final Map, Set> attributeToFamily; /** @@ -255,10 +255,20 @@ private Repository( readEntityDescriptors(cfg); if (loadFamilies) { + /* Read attribute families and map them to storages by attribute. */ readAttributeFamilies(cfg); - /* Read transformations from one entity to another. */ - readTransformations(cfg); + + if (shouldLoadAccessors) { + /* Link attribute families for proxied attribute. */ + loadProxiedFamilies(cfg); + + /* Read transformations from one entity to another. */ + readTransformations(cfg); + + linkAttributesToWriters(); + } + } if (shouldValidate) { @@ -266,7 +276,7 @@ private Repository( validate(); } - } catch (URISyntaxException ex) { + } catch (Exception ex) { throw new IllegalArgumentException("Cannot read config settings", ex); } @@ -336,7 +346,7 @@ private void readSchemeSerializers( } /** Read descriptors of entites from config */ - private void readEntityDescriptors(Config cfg) throws URISyntaxException { + private void readEntityDescriptors(Config cfg) throws Exception { ConfigValue entities = cfg.root().get("entities"); if (entities == null) { LOG.warn("Empty configuration of entities, skipping initialization"); @@ -350,40 +360,25 @@ private void readEntityDescriptors(Config cfg) throws URISyntaxException { toMap("entities." + entityName, e.getValue()).get("attributes")); EntityDescriptor.Builder entity = EntityDescriptor.newBuilder() .setName(entityName); - for (Map.Entry attr : entityAttrs.entrySet()) { + // first regular attributes + entityAttrs.forEach((key, value) -> { Map settings = toMap( - "entities." + entityName + ".attributes." + attr.getKey(), attr .getValue()); - - Object scheme = Objects.requireNonNull( - settings.get("scheme"), - "Missing key entities." + entityName + ".attributes." - + attr.getKey() + ".scheme"); - - String schemeStr = scheme.toString(); - if (schemeStr.indexOf(':') == -1) { - // if the scheme does not contain `:' the java.net.URI cannot parse it - // we will fix this by adding `:///' - schemeStr += ":///"; + "entities." + entityName + ".attributes." + key, value); + if (settings.get("proxy") == null) { + loadRegular(entityName, key, settings, entity); } - URI schemeURI = new URI(schemeStr); - // validate that the scheme serializer doesn't throw exceptions - // ignore the return value - try { - if (shouldValidate) { - getValueSerializerFactory(schemeURI.getScheme()) - .getValueSerializer(schemeURI) - .isValid(new byte[] { }); - } - } catch (Exception ex) { - throw new IllegalStateException("Cannot use serializer for URI " + schemeURI, ex); + }); + + // next proxies + entityAttrs.forEach((key, value) -> { + Map settings = toMap( + "entities." + entityName + ".attributes." + key, value); + if (settings.get("proxy") != null) { + loadProxy(key, settings, entity); } - entity.addAttribute(AttributeDescriptor.newBuilder(this) - .setEntity(entityName) - .setName(attr.getKey()) - .setSchemeURI(schemeURI) - .build()); - } + }); + if (!entityName.contains("*")) { LOG.info("Adding entity by fully qualified name {}", entityName); entitiesByName.put(entityName, entity.build()); @@ -395,6 +390,81 @@ private void readEntityDescriptors(Config cfg) throws URISyntaxException { } } + @SuppressWarnings("unchecked") + private void loadProxy( + String attrName, + Map settings, + EntityDescriptor.Builder entityBuilder) { + + AttributeDescriptorBase target = Optional.ofNullable(settings.get("proxy")) + .map(Object::toString) + .map(proxy -> entityBuilder.findAttribute(proxy)) + .orElseThrow(() -> new IllegalStateException( + "Invalid state: `proxy` should not be null")); + + final ProxyTransform transform; + if (this.shouldLoadAccessors) { + transform = Optional.ofNullable(settings.get("apply")) + .map(Object::toString) + .map(s -> { + try { + Class c = Classpath.findClass(s, ProxyTransform.class); + return c.newInstance(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }) + .orElseThrow(() -> new IllegalArgumentException("Missing required field `apply'")); + } else { + transform = null; + } + + entityBuilder.addAttribute( + AttributeDescriptor.newProxy(attrName, target, transform)); + } + + + private void loadRegular( + String entityName, + String attrName, + Map settings, + EntityDescriptor.Builder entityBuilder) { + + try { + + Object scheme = Objects.requireNonNull( + settings.get("scheme"), + "Missing key entities." + entityName + ".attributes." + + attrName + ".scheme"); + + String schemeStr = scheme.toString(); + if (schemeStr.indexOf(':') == -1) { + // if the scheme does not contain `:' the java.net.URI cannot parse it + // we will fix this by adding `:///' + schemeStr += ":///"; + } + URI schemeURI = new URI(schemeStr); + // validate that the scheme serializer doesn't throw exceptions + // ignore the return value + try { + if (shouldValidate) { + getValueSerializerFactory(schemeURI.getScheme()) + .getValueSerializer(schemeURI) + .isValid(new byte[] { }); + } + } catch (Exception ex) { + throw new IllegalStateException("Cannot use serializer for URI " + schemeURI, ex); + } + entityBuilder.addAttribute(AttributeDescriptor.newBuilder(this) + .setEntity(entityName) + .setName(attrName) + .setSchemeURI(schemeURI) + .build()); + } catch (URISyntaxException ex) { + throw new RuntimeException(ex); + } + } + @SuppressWarnings("unchecked") private Map toMap(String key, Object value) { if (!(value instanceof Map)) { @@ -518,17 +588,18 @@ private void readAttributeFamilies(Config cfg) { family.setFilter(newInstance(filter, StorageFilter.class)); } - Collection> allAttributes = new HashSet<>(); + Collection> allAttributes = new HashSet<>(); for (String attr : attributes) { // attribute descriptors affected by this settings - final List> attrDescs; + final List> attrDescs; if (attr.equals("*")) { // this means all attributes of entity - attrDescs = (List) entDesc.getAllAttributes(); + attrDescs = (List) entDesc.getAllAttributes(true); } else { - attrDescs = (List) Arrays.asList(entDesc.findAttribute(attr).orElseThrow( - () -> new IllegalArgumentException("Cannot find attribute " + attr))); + attrDescs = (List) Arrays.asList(entDesc.findAttribute(attr, true) + .orElseThrow( + () -> new IllegalArgumentException("Cannot find attribute " + attr))); } allAttributes.addAll(attrDescs); } @@ -536,7 +607,7 @@ private void readAttributeFamilies(Config cfg) { allAttributes.forEach(family::addAttribute); final AttributeFamilyDescriptor familyBuilt = family.build(); allAttributes.forEach(a -> { - Set> families = attributeToFamily + Set families = attributeToFamily .computeIfAbsent(a, k -> new HashSet<>()); if (!families.add(familyBuilt)) { throw new IllegalArgumentException( @@ -553,29 +624,44 @@ private void readAttributeFamilies(Config cfg) { } - if (shouldLoadAccessors) { - // iterate over all attribute families and setup appropriate (commit) writers - // for all attributes - attributeToFamily.forEach((key, value) -> { - Optional writer = value - .stream() - .filter(af -> af.getType() == StorageType.PRIMARY) - .filter(af -> !af.getAccess().isReadonly()) - .filter(af -> af.getWriter().isPresent()) - .findAny() - .map(af -> af.getWriter() - .orElseThrow(() -> new NoSuchElementException("Writer can not be empty"))); - - if (writer.isPresent()) { - key.setWriter(writer.get().online()); - } else { - LOG.info( - "No writer found for attribute {}, continuing, but assuming " - + "the attribute is read-only. Any attempt to write it will fail.", - key); - } - }); - } + } + + private void linkAttributesToWriters() { + // iterate over all attribute families and setup appropriate (commit) writers + // for all attributes + attributeToFamily.forEach((key, value) -> { + Optional writer = value + .stream() + .filter(af -> af.getType() == StorageType.PRIMARY) + .filter(af -> !af.getAccess().isReadonly()) + .filter(af -> af.getWriter().isPresent()) + .findAny() + .map(af -> af.getWriter() + .orElseThrow(() -> new NoSuchElementException("Writer can not be empty"))); + + if (writer.isPresent()) { + ((AttributeDescriptorBase) key).setWriter(writer.get().online()); + } else { + LOG.info( + "No writer found for attribute {}, continuing, but assuming " + + "the attribute is read-only. Any attempt to write it will fail.", + key); + } + }); + } + + private void loadProxiedFamilies(Config cfg) { + getAllEntities() + .flatMap(e -> e.getAllAttributes(true).stream()) + .filter(a -> ((AttributeDescriptorBase) a).isProxy()) + .forEach(a -> { + AttributeProxyDescriptorImpl p = (AttributeProxyDescriptorImpl) a; + AttributeDescriptorBase target = p.getTarget(); + attributeToFamily.put(p, getFamiliesForAttribute(target) + .stream() + .map(af -> new AttributeFamilyProxyDescriptor(p, af)) + .collect(Collectors.toSet())); + }); } private void readTransformations(Config cfg) { @@ -607,7 +693,7 @@ private void readTransformations(Config cfg) { List> attrs = readList("attributes", transformation, k) .stream() - .map(a -> entity.findAttribute(a).orElseThrow( + .map(a -> entity.findAttribute(a, true).orElseThrow( () -> new IllegalArgumentException( String.format("Missing attribute `%s` in `%s`", a, entity)))) @@ -676,9 +762,10 @@ private void validate() { Stream.concat( entitiesByName.values().stream(), entitiesByPattern.values().stream()) - .flatMap(d -> d.getAllAttributes().stream()) + .flatMap(d -> d.getAllAttributes(true).stream()) + .filter(a -> !((AttributeDescriptorBase) a).isProxy()) .filter(a -> { - Set> families = attributeToFamily.get(a); + Set families = attributeToFamily.get(a); return families == null || families.isEmpty(); }) .findAny() @@ -700,16 +787,17 @@ public StorageDescriptor getStorageDescriptor(String scheme) { /** List all unique atttribute families. */ - public Stream> getAllFamilies() { + public Stream getAllFamilies() { return attributeToFamily.values().stream() .flatMap(Collection::stream).distinct(); } /** Retrieve list of attribute families for attribute. */ - public Set> getFamiliesForAttribute( + public Set getFamiliesForAttribute( AttributeDescriptor attr) { - return Objects.requireNonNull(attributeToFamily.get(attr), + return Objects.requireNonNull( + attributeToFamily.get(attr), "Cannot find any family for attribute " + attr); } diff --git a/core/src/main/java/cz/o2/proxima/storage/AccessType.java b/core/src/main/java/cz/o2/proxima/storage/AccessType.java index ab1aaf3f1..6a0372121 100644 --- a/core/src/main/java/cz/o2/proxima/storage/AccessType.java +++ b/core/src/main/java/cz/o2/proxima/storage/AccessType.java @@ -115,6 +115,56 @@ public String toString() { } + static AccessType or(AccessType left, AccessType right) { + return new AccessType() { + @Override + public boolean canReadBatchUpdates() { + return left.canReadBatchUpdates() || right.canReadBatchUpdates(); + } + + @Override + public boolean canReadBatchSnapshot() { + return left.canReadBatchSnapshot() || right.canReadBatchSnapshot(); + } + + @Override + public boolean canRandomRead() { + return left.canRandomRead() || right.canRandomRead(); + } + + @Override + public boolean canReadCommitLog() { + return left.canReadCommitLog() || right.canReadCommitLog(); + } + + @Override + public boolean isStateCommitLog() { + return left.isStateCommitLog() || right.isStateCommitLog(); + } + + @Override + public boolean isReadonly() { + return left.isReadonly() || right.isReadonly(); + } + + @Override + public boolean isListPrimaryKey() { + return left.isListPrimaryKey() || right.isListPrimaryKey(); + } + + @Override + public boolean isWriteOnly() { + return left.isWriteOnly() || right.isWriteOnly(); + } + + @Override + public boolean canCreatePartitionedView() { + return left.canCreatePartitionedView() || right.canCreatePartitionedView(); + } + + }; + } + /** * @return {@code true} if this family can be used to access data by batch * observing of updates. diff --git a/core/src/test/java/cz/o2/proxima/repository/PartitionedViewTest.java b/core/src/test/java/cz/o2/proxima/repository/PartitionedViewTest.java index e3278b001..6a8cc0924 100644 --- a/core/src/test/java/cz/o2/proxima/repository/PartitionedViewTest.java +++ b/core/src/test/java/cz/o2/proxima/repository/PartitionedViewTest.java @@ -52,7 +52,7 @@ public class PartitionedViewTest implements Serializable { @Before public void setUp() { executor = new InMemExecutor(); - AttributeFamilyDescriptor family = repo.getAllFamilies() + AttributeFamilyDescriptor family = repo.getAllFamilies() .filter(af -> af.getName().equals("event-storage-stream")) .findAny() .get(); diff --git a/core/src/test/java/cz/o2/proxima/repository/RepositoryTest.java b/core/src/test/java/cz/o2/proxima/repository/RepositoryTest.java index 5ee2afb5d..c743b8d22 100644 --- a/core/src/test/java/cz/o2/proxima/repository/RepositoryTest.java +++ b/core/src/test/java/cz/o2/proxima/repository/RepositoryTest.java @@ -17,9 +17,17 @@ package cz.o2.proxima.repository; import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.storage.commitlog.LogObserver; +import cz.o2.proxima.storage.randomaccess.KeyValue; import java.io.IOException; -import org.junit.After; -import org.junit.Before; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import org.junit.Test; import static org.junit.Assert.*; @@ -28,15 +36,6 @@ */ public class RepositoryTest { - - @Before - public void setup() { - } - - @After - public void teardown() { - } - @Test public void testConfigParsing() throws IOException { Repository repo = Repository.Builder.of(ConfigFactory.load().resolve()).build(); @@ -61,4 +60,145 @@ public void testConfigParsing() throws IOException { gateway.findAttribute("bytes").get().getSchemeURI().toString()); } + @Test(timeout = 2000) + public void testProxyWrite() throws UnsupportedEncodingException, InterruptedException { + Repository repo = Repository.Builder.of(ConfigFactory.load().resolve()).build(); + EntityDescriptor proxied = repo.findEntity("proxied").get(); + AttributeDescriptor target = proxied.findAttribute("_e.*", true).get(); + AttributeDescriptor source = proxied.findAttribute("event.*").get(); + Set families = repo + .getFamiliesForAttribute(target); + Set proxiedFamilies = repo + .getFamiliesForAttribute(source); + assertEquals( + families.stream() + .map(a -> "proxy::event.*::" + a.getName()) + .collect(Collectors.toList()), + proxiedFamilies.stream() + .map(a -> a.getName()) + .collect(Collectors.toList())); + + // verify that writing to attribute event.abc ends up as _e.abc + CountDownLatch latch = new CountDownLatch(2); + proxiedFamilies.iterator().next().getCommitLogReader().get().observe("dummy", new LogObserver() { + + @Override + public boolean onNext(StreamElement ingest, LogObserver.ConfirmCallback confirm) { + assertEquals("test", new String(ingest.getValue())); + assertEquals("event.abc", ingest.getAttribute()); + assertEquals(source, ingest.getAttributeDescriptor()); + latch.countDown(); + return false; + } + + @Override + public void onError(Throwable error) { + throw new RuntimeException(error); + } + + @Override + public void close() throws Exception { + // nop + } + + }); + + source.getWriter().write(StreamElement.update( + proxied, + source, UUID.randomUUID().toString(), + "key", "event.abc", System.currentTimeMillis(), "test".getBytes("UTF-8")), + (s, exc) -> { + latch.countDown(); + }); + + latch.await(); + + KeyValue kv = families.iterator().next() + .getRandomAccessReader().get().get("key", "_e.abc", target) + .orElseGet(() -> { + fail("Missing _e.abc stored"); + return null; + }); + + assertEquals("test", new String((byte[]) kv.getValue())); + + } + + @Test + public void testProxyRandomGet() throws UnsupportedEncodingException, InterruptedException { + Repository repo = Repository.Builder.of(ConfigFactory.load().resolve()).build(); + EntityDescriptor proxied = repo.findEntity("proxied").get(); + AttributeDescriptor target = proxied.findAttribute("_e.*", true).get(); + AttributeDescriptor source = proxied.findAttribute("event.*").get(); + Set proxiedFamilies = repo + .getFamiliesForAttribute(source); + + // verify that writing to attribute event.abc ends up as _e.abc + source.getWriter().write(StreamElement.update( + proxied, + source, UUID.randomUUID().toString(), + "key", "event.abc", System.currentTimeMillis(), "test".getBytes("UTF-8")), + (s, exc) -> { + assertTrue(s); + }); + + KeyValue kv = proxiedFamilies.iterator().next() + .getRandomAccessReader().get().get("key", "event.abc", target) + .orElseGet(() -> { + fail("Missing event.abc stored"); + return null; + }); + + assertEquals("test", new String((byte[]) kv.getValue())); + assertEquals(source, kv.getAttrDescriptor()); + assertEquals("event.abc", kv.getAttribute()); + assertEquals("key", kv.getKey()); + + } + + @Test + public void testProxyScan() throws UnsupportedEncodingException, InterruptedException { + Repository repo = Repository.Builder.of(ConfigFactory.load().resolve()).build(); + EntityDescriptor proxied = repo.findEntity("proxied").get(); + AttributeDescriptor target = proxied.findAttribute("_e.*", true).get(); + AttributeDescriptor source = proxied.findAttribute("event.*").get(); + Set proxiedFamilies = repo + .getFamiliesForAttribute(source); + + // verify that writing to attribute event.abc ends up as _e.abc + source.getWriter().write(StreamElement.update( + proxied, + source, UUID.randomUUID().toString(), + "key", "event.abc", System.currentTimeMillis(), "test".getBytes("UTF-8")), + (s, exc) -> { + assertTrue(s); + }); + + source.getWriter().write(StreamElement.update( + proxied, + source, UUID.randomUUID().toString(), + "key", "event.def", System.currentTimeMillis(), "test2".getBytes("UTF-8")), + (s, exc) -> { + assertTrue(s); + }); + + + List> kvs = new ArrayList<>(); + proxiedFamilies.iterator().next() + .getRandomAccessReader().get().scanWildcard("key", source, kvs::add); + + assertEquals("test", new String((byte[]) kvs.get(0).getValue())); + assertEquals(source, kvs.get(0).getAttrDescriptor()); + assertEquals("event.abc", kvs.get(0).getAttribute()); + assertEquals("key", kvs.get(0).getKey()); + + assertEquals("test2", new String((byte[]) kvs.get(1).getValue())); + assertEquals(source, kvs.get(1).getAttrDescriptor()); + assertEquals("event.def", kvs.get(1).getAttribute()); + assertEquals("key", kvs.get(1).getKey()); + + } + + + } diff --git a/core/src/test/java/cz/o2/proxima/transform/EventTransform.java b/core/src/test/java/cz/o2/proxima/transform/EventTransform.java new file mode 100644 index 000000000..71243880e --- /dev/null +++ b/core/src/test/java/cz/o2/proxima/transform/EventTransform.java @@ -0,0 +1,37 @@ +/** + * Copyright 2017 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.transform; + +import cz.o2.proxima.repository.ProxyTransform; + +/** + * Transformation from proxy space (event.*) to raw space (_e.*). + */ +public class EventTransform implements ProxyTransform { + + @Override + public String fromProxy(String proxy) { + int pos = proxy.indexOf('.'); + return "_e." + proxy.substring(pos + 1); + } + + @Override + public String toProxy(String raw) { + int pos = raw.indexOf('.'); + return "event." + raw.substring(pos + 1); + } + +} diff --git a/core/src/test/resources/reference.conf b/core/src/test/resources/reference.conf index 3adc8daae..236e5259b 100644 --- a/core/src/test/resources/reference.conf +++ b/core/src/test/resources/reference.conf @@ -10,16 +10,16 @@ # this entity represents state of the gateway gateway: { attributes: { - armed: { scheme: "bytes" } - users: { scheme: "bytes" } - status: { scheme: "bytes" } + armed: { scheme: bytes } + users: { scheme: bytes } + status: { scheme: bytes } # the following defines a pattern for attributes # each attribute that matches the pattern is treated the same - "device.*": { scheme: "bytes" } + "device.*": { scheme: bytes } # settings for specific rule - "rule.*": { scheme: "bytes" } + "rule.*": { scheme: bytes } # this is fake attribute that always fails validation fail: { scheme: "fail:whenever" } @@ -31,10 +31,25 @@ dummy: { attributes: { data: { scheme: bytes } - "wildcard.*": { scheme: "bytes" } + "wildcard.*": { scheme: "bytes" } } } + proxied { + + attributes { + # this is "protected" field and should not be accessed directly + "_e.*": { scheme: bytes } + + # this is proxy public attribute performing transformation + "event.*": { + proxy: "_e.*" + apply: cz.o2.proxima.transform.EventTransform + } + } + + } + } attributeFamilies: { @@ -74,6 +89,14 @@ type: primary access: "commit-log, random-access" } + + proxy-primary { + entity: proxied + attributes: [ "_e.*" ] + storage: "inmem:///proxima/proxy" + type: primary + access: "commit-log, random-access" + } } transformations { diff --git a/kafka/src/test/java/cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.java b/kafka/src/test/java/cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.java index 436eb1b15..3b43690f5 100644 --- a/kafka/src/test/java/cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.java +++ b/kafka/src/test/java/cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.java @@ -18,6 +18,7 @@ import com.typesafe.config.ConfigFactory; import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.AttributeDescriptorBase; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; import cz.o2.proxima.storage.Partition; @@ -45,7 +46,7 @@ public class LocalKafkaCommitLogDescriptorTest { final Repository repo = Repository.Builder.ofTest(ConfigFactory.empty()).build(); - final AttributeDescriptor attr; + final AttributeDescriptorBase attr; final EntityDescriptor entity; final URI storageURI; diff --git a/maven/src/main/resources/java-source.ftlh b/maven/src/main/resources/java-source.ftlh index 0588db841..d5d371814 100644 --- a/maven/src/main/resources/java-source.ftlh +++ b/maven/src/main/resources/java-source.ftlh @@ -97,9 +97,9 @@ public class ${java_classname} { <#-- entity.attributes as attribute --> public Optional getCommitLog(AttributeDescriptor... attributes) { - Set> descriptors = new HashSet<>(); + Set descriptors = new HashSet<>(); for (AttributeDescriptor attr : attributes) { - Set> commitLogs = repo.getFamiliesForAttribute(attr) + Set commitLogs = repo.getFamiliesForAttribute(attr) .stream() .filter(af -> af.getCommitLogReader().isPresent()) .collect(Collectors.toSet()); @@ -118,9 +118,9 @@ public class ${java_classname} { public Optional getPartitionedView(AttributeDescriptor... attributes) { - Set> descriptors = new HashSet<>(); + Set descriptors = new HashSet<>(); for (AttributeDescriptor attr : attributes) { - Set> views = repo.getFamiliesForAttribute(attr) + Set views = repo.getFamiliesForAttribute(attr) .stream() .filter(af -> af.getAccess().canCreatePartitionedView()) .collect(Collectors.toSet()); diff --git a/server/src/main/java/cz/o2/proxima/server/IngestServer.java b/server/src/main/java/cz/o2/proxima/server/IngestServer.java index a3d9e04b6..67a3494ed 100644 --- a/server/src/main/java/cz/o2/proxima/server/IngestServer.java +++ b/server/src/main/java/cz/o2/proxima/server/IngestServer.java @@ -651,14 +651,13 @@ public void run() { protected void startConsumerThreads() throws InterruptedException { // index the repository - Map, Set>> familyToCommitLog; + Map> familyToCommitLog; familyToCommitLog = indexFamilyToCommitLogs(); LOG.info("Starting consumer threads for familyToCommitLog {}", familyToCommitLog); // execute threads to consume the commit log - familyToCommitLog.entrySet().stream().forEach(entry -> { - AttributeFamilyDescriptor family = entry.getKey(); - for (AttributeFamilyDescriptor commitLogFamily : entry.getValue()) { + familyToCommitLog.forEach((family, logs) -> { + for (AttributeFamilyDescriptor commitLogFamily : logs) { CommitLogReader commitLog = commitLogFamily.getCommitLogReader() .orElseThrow(() -> new IllegalStateException( "Failed validation on consistency of attribute families. Fix code!")); @@ -689,7 +688,7 @@ protected void startConsumerThreads() throws InterruptedException { } private void runTransformer(String name, TransformationDescriptor transform) { - AttributeFamilyDescriptor family = transform.getAttributes() + AttributeFamilyDescriptor family = transform.getAttributes() .stream() .map(a -> this.repo.getFamiliesForAttribute(a) .stream().filter(af -> af.getAccess().canReadCommitLog()) @@ -768,7 +767,7 @@ public boolean onNextInternal( * themselves. */ @SuppressWarnings("unchecked") - private Map, Set>> + private Map> indexFamilyToCommitLogs() { // each attribute and its associated primary family diff --git a/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java b/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java index 5b5a04a27..a3211c838 100644 --- a/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java +++ b/server/src/test/java/cz/o2/proxima/server/IngestServiceTest.java @@ -20,7 +20,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.typesafe.config.ConfigFactory; import cz.o2.proxima.proto.service.Rpc; -import cz.o2.proxima.server.IngestServer; import cz.o2.proxima.server.test.Test.ExtendedMessage; import cz.o2.proxima.storage.InMemBulkStorage; import cz.o2.proxima.storage.InMemStorage; diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java index c1878eb19..ed2b99754 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/Console.java @@ -29,7 +29,6 @@ import cz.o2.proxima.repository.AttributeFamilyDescriptor; import cz.o2.proxima.repository.EntityDescriptor; import cz.o2.proxima.repository.Repository; -import cz.o2.proxima.storage.AttributeWriterBase; import cz.o2.proxima.storage.OnlineAttributeWriter; import cz.o2.proxima.storage.StorageType; import cz.o2.proxima.storage.StreamElement; @@ -129,11 +128,6 @@ public GroovyObject getEnv() throws Exception { repo); } - private static void usage() { - System.err.println(String.format("Usage %s []", Console.class.getName())); - System.exit(1); - } - private Repository getRepo(String[] paths) { Config config; if (paths.length > 0) { @@ -234,7 +228,7 @@ public WindowedStream> getBatchSnapshot( DatasetBuilder> builder = () -> { final Dataset> input; - AttributeFamilyDescriptor family = repo.getFamiliesForAttribute(attrDesc) + AttributeFamilyDescriptor family = repo.getFamiliesForAttribute(attrDesc) .stream() .filter(af -> af.getAccess().canReadBatchSnapshot()) .filter(af -> af.getBatchObservable().isPresent()) @@ -315,7 +309,7 @@ public WindowedStream> getBatchUpdates( long startStamp, long endStamp) { - AttributeFamilyDescriptor family = repo.getFamiliesForAttribute(attrDesc) + AttributeFamilyDescriptor family = repo.getFamiliesForAttribute(attrDesc) .stream() .filter(af -> af.getAccess().canReadBatchUpdates()) .filter(af -> af.getBatchObservable().isPresent()) @@ -382,7 +376,7 @@ public void put( TextFormat.merge(textFormat, builder); payload = builder.build().toByteArray(); } - Set> families = repo.getFamiliesForAttribute(attrDesc); + Set families = repo.getFamiliesForAttribute(attrDesc); OnlineAttributeWriter writer = families.stream() .filter(af -> af.getType() == StorageType.PRIMARY) .findAny() @@ -416,7 +410,7 @@ public void delete( EntityDescriptor entityDesc, AttributeDescriptor attrDesc, String key, String attribute) throws InterruptedException { - Set> families = repo.getFamiliesForAttribute(attrDesc); + Set families = repo.getFamiliesForAttribute(attrDesc); OnlineAttributeWriter writer = families.stream() .filter(af -> af.getType() == StorageType.PRIMARY) .findAny() diff --git a/tools/src/main/java/cz/o2/proxima/tools/io/BatchSource.java b/tools/src/main/java/cz/o2/proxima/tools/io/BatchSource.java index 04627a934..ff886960c 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/io/BatchSource.java +++ b/tools/src/main/java/cz/o2/proxima/tools/io/BatchSource.java @@ -46,7 +46,7 @@ public class BatchSource implements DataSource> { public static BatchSource of( BatchLogObservable observable, - AttributeFamilyDescriptor family, + AttributeFamilyDescriptor family, long startStamp, long endStamp) { diff --git a/tools/src/main/java/cz/o2/proxima/tools/io/ConsoleRandomReader.java b/tools/src/main/java/cz/o2/proxima/tools/io/ConsoleRandomReader.java index e7d8dcd27..533eba18f 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/io/ConsoleRandomReader.java +++ b/tools/src/main/java/cz/o2/proxima/tools/io/ConsoleRandomReader.java @@ -53,11 +53,10 @@ public ConsoleRandomReader(EntityDescriptor desc, Repository repo) { this.listEntityOffsets = new HashMap<>(); desc.getAllAttributes().forEach(f -> { - Optional> randomFamily; + Optional randomFamily; randomFamily = repo.getFamiliesForAttribute(f) .stream() - .filter(af -> af.getAccess().isListPrimaryKey() - && af.getAccess().canRandomRead()) + .filter(af -> af.getAccess().canRandomRead()) .findAny(); if (randomFamily.isPresent()) { attrToReader.put(f, randomFamily.get().getRandomAccessReader().get());