Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#30 add generic entity transformation on write #41

Merged
merged 3 commits into from
Dec 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# The Proxima platform
[![Build Status](https://travis-ci.org/O2-Czech-Republic/proxima-platform.svg?branch=master)](https://travis-ci.org/O2-Czech-Republic/proxima-platform)


The platform is a generic data ingestion, manipulation and retrieval framework.
High level can be described by following scheme:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ static Builder newBuilder() {

/** Name of the entity. */
String getName();

/** Find attribute based by name. */
Optional<AttributeDescriptor> findAttribute(String name);
Optional<AttributeDescriptor<?>> findAttribute(String name);

/** List all attribute descriptors of given entity. */
List<AttributeDescriptor> getAllAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class EntityDescriptorImpl implements EntityDescriptor {

/** Find attribute based by name. */
@Override
public Optional<AttributeDescriptor> findAttribute(String name) {
public Optional<AttributeDescriptor<?>> findAttribute(String name) {
AttributeDescriptor byName = attributesByName.get(name);
if (byName != null) {
return Optional.of(byName);
Expand Down
109 changes: 105 additions & 4 deletions core/src/main/java/cz/o2/proxima/repository/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public static Repository of(Config config) {
return Repository.Builder.of(config).build();
}


/**
* Builder for repository.
*/
Expand Down Expand Up @@ -123,25 +122,29 @@ public Repository build() {
}
}


/**
* Application configuration.
*/
@Getter
private final Config config;


/**
* Classpath reflections scanner.
*/
@Getter
private final Reflections reflections;


/**
* When read-only flag is specified, some checks are not performed in construction.
* This enables to use the repository inside reader applications that
* don't have to have all the server jars on classpath.
*/
private final boolean isReadonly;


/**
* Flag to indicate if we should validate the scheme with serializer.
* Defaults to {@code true}. {@code false} can be used when
Expand All @@ -150,12 +153,14 @@ public Repository build() {
*/
private final boolean shouldValidate;


/**
* Flag to indicate we should or should not load accessor to column families.
* The accessor is not needed mostly in the compiler.
*/
private final boolean shouldLoadAccessors;


/**
* Map of all storage descriptors available.
* Key is acceptable scheme of the descriptor.
Expand All @@ -164,6 +169,7 @@ public Repository build() {
**/
private final Map<String, StorageDescriptor> schemeToStorage = new HashMap<>();


/**
* Map of all scheme serializers.
* Key is acceptable scheme of the serializer.
Expand All @@ -179,20 +185,30 @@ public Repository build() {
* and then it is read-only.
**/
private final Map<String, EntityDescriptor> entitiesByName = new HashMap<>();


/**
* Map of entities by pattern.
* This need not be synchronized because it is only written in constructor
* and then it is read-only.
**/
private final Map<NamePattern, EntityDescriptor> entitiesByPattern;


/**
* Map of attribute family to list of attributes.
* Map of attribute descriptor to list of families.
* This need not be synchronized because it is only written in constructor
* and then it is read-only.
*/
private final Map<AttributeDescriptorImpl<?>, Set<AttributeFamilyDescriptor<?>>> attributeToFamily;


/**
* Map of transformation name to transformation descriptor.
*/
private final Map<String, TransformationDescriptor> transformations = new HashMap<>();


/**
* Construct the repository from the config with the specified read-only and
* validation flag.
Expand Down Expand Up @@ -241,6 +257,8 @@ private Repository(
if (loadFamilies) {
/* Read attribute families and map them to storages by attribute. */
readAttributeFamilies(cfg);
/* Read transformations from one entity to another. */
readTransformations(cfg);
}

if (shouldValidate) {
Expand Down Expand Up @@ -380,8 +398,8 @@ private void readEntityDescriptors(Config cfg) throws URISyntaxException {
@SuppressWarnings("unchecked")
private Map<String, Object> toMap(String key, Object value) {
if (!(value instanceof Map)) {
throw new IllegalArgumentException("Key " + key + " must "
+ "be object got "
throw new IllegalArgumentException(
"Key " + key + " must be object got "
+ (value != null
? value.getClass().getName()
: "(null)"));
Expand Down Expand Up @@ -560,6 +578,84 @@ private void readAttributeFamilies(Config cfg) {
}
}

private void readTransformations(Config cfg) {

if (entitiesByName.isEmpty() && entitiesByPattern.isEmpty()) {
// no loaded entities, no more stuff to read
return;
}
Map<String, Object> transformations = Optional.ofNullable(
cfg.root().get("transformations"))
.map(v -> toMap("transformations", v.unwrapped()))
.orElse(null);

if (transformations == null) {
LOG.info("Skipping empty transformations configuration.");
return;
}

transformations.forEach((k, v) -> {
try {
Map<String, Object> transformation = toMap(k, v);
EntityDescriptor entity = findEntity(readStr("entity", transformation, k))
.orElseThrow(() -> new IllegalArgumentException(
String.format("Entity `%s` doesn't exist",
transformation.get("entity"))));

Class<? extends Transformation> cls = Classpath.findClass(
readStr("using", transformation, k), Transformation.class);

List<AttributeDescriptor<?>> attrs = readList("attributes", transformation, k)
.stream()
.map(a -> entity.findAttribute(a).orElseThrow(
() -> new IllegalArgumentException(
String.format("Missing attribute `%s` in `%s`",
a, entity))))
.collect(Collectors.toList());

TransformationDescriptor desc = TransformationDescriptor.newBuilder()
.addAttributes(attrs)
.setEntity(entity)
.setTransformationClass(cls)
.build();

this.transformations.put(k, desc);

} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
});

this.transformations.forEach((k, v) -> v.getTransformation().setup(this));

}

private static String readStr(String key, Map<String, Object> map, String name) {
return Optional.ofNullable(map.get(key))
.map(Object::toString)
.orElseThrow(
() -> new IllegalArgumentException(
String.format("Missing required field `%s` in `%s`", key, name)));
}

@SuppressWarnings("unchecked")
private static List<String> readList(
String key, Map<String, Object> map, String name) {

return Optional.ofNullable(map.get(key))
.map(v -> {
if (v instanceof List) return (List<Object>) v;
throw new IllegalArgumentException(
String.format("Key `%s` in `%s` must be list", key, name));
})
.map(l -> l.stream().map(Object::toString).collect(Collectors.toList()))
.orElseThrow(() -> new IllegalArgumentException(
String.format("Missing required field `%s` in `%s", key, name)));
}




@SuppressWarnings("unchecked")
private List<String> toList(Object in) {
if (in instanceof List) {
Expand Down Expand Up @@ -624,6 +720,11 @@ public Stream<EntityDescriptor> getAllEntities() {
entitiesByPattern.values().stream());
}

/** Retrieve all transformers. */
public Map<String, TransformationDescriptor> getTransformations() {
return Collections.unmodifiableMap(transformations);
}

public boolean isEmpty() {
return this.entitiesByName.isEmpty() && entitiesByPattern.isEmpty();
}
Expand Down
55 changes: 55 additions & 0 deletions core/src/main/java/cz/o2/proxima/repository/Transformation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.repository;

import cz.o2.proxima.storage.StreamElement;
import java.io.Serializable;

/**
* A stateless element-wise transformation applied on incoming data
* converting single {@code StreamElement} to another {@code StreamElement}.
*/
public interface Transformation extends Serializable {

/**
* Collector for outputs.
*/
@FunctionalInterface
interface Collector<T> extends Serializable {

/**
* Collect transformed value.
*/
void collect(T value);

}

/**
* Read the repository and setup descriptors of target entity and attributes.
* @param repo the repository
*/
void setup(Repository repo);


/**
* Apply the transformation function.
* @param input the input stream element to transform
* @param collector collector for outputs
* @return transformed stream element
*/
void apply(StreamElement input, Collector<StreamElement> collector);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.repository;

import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import lombok.Getter;

/**
* Descriptor of single transformation specified in {@code transformations}.
*/
public class TransformationDescriptor implements Serializable {

static Builder newBuilder() {
return new Builder();
}

static class Builder {

EntityDescriptor entity;
final List<AttributeDescriptor<?>> attrs = new ArrayList<>();
Class<? extends Transformation> transformation;

Builder setEntity(EntityDescriptor entity) {
this.entity = entity;
return this;
}

Builder setTransformationClass(Class<? extends Transformation> transformation) {
this.transformation = transformation;
return this;
}

Builder addAttributes(AttributeDescriptor<?>... attrs) {
Arrays.stream(attrs).forEach(this.attrs::add);
return this;
}

Builder addAttributes(Iterable<AttributeDescriptor<?>> attrs) {
attrs.forEach(this.attrs::add);
return this;
}

TransformationDescriptor build() {

Preconditions.checkArgument(
!attrs.isEmpty(), "Please specify at least one attribute");
Preconditions.checkArgument(transformation != null,
"Please specify transformation function");
Preconditions.checkArgument(entity != null,
"Please specify source entity");

try {
return new TransformationDescriptor(entity, attrs, transformation.newInstance());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}

@Getter
private final EntityDescriptor entity;
@Getter
private final List<AttributeDescriptor<?>> attributes;
@Getter
private final Transformation transformation;

private TransformationDescriptor(
EntityDescriptor entity,
List<AttributeDescriptor<?>> attributes,
Transformation transformation) {

this.entity = Objects.requireNonNull(entity);
this.attributes = Collections.unmodifiableList(attributes);
this.transformation = Objects.requireNonNull(transformation);
}
}
Loading