Skip to content

Commit

Permalink
[proxima-core] #30 add filters to transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Dec 13, 2017
1 parent 660d3f0 commit 916c14a
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public CassandraDBAccessor(
try {
cqlFactory = Classpath.findClass(cqlFactoryName, CQLFactory.class).newInstance();
cqlFactory.setup(uri, converter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException ex) {
} catch (InstantiationException | IllegalAccessException ex) {
throw new IllegalArgumentException("Cannot instantiate class " + cqlFactoryName,
ex);
}
Expand Down
60 changes: 27 additions & 33 deletions core/src/main/java/cz/o2/proxima/repository/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private <T> T newInstance(String name, Class<T> cls) {
try {
Class<T> forName = Classpath.findClass(name, cls);
return forName.newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException ex) {
} catch (InstantiationException | IllegalAccessException ex) {
throw new IllegalArgumentException("Cannot instantiate class " + name, ex);
}
}
Expand Down Expand Up @@ -406,14 +406,7 @@ private void loadProxy(
if (this.shouldLoadAccessors) {
transform = Optional.ofNullable(settings.get("apply"))
.map(Object::toString)
.map(s -> {
try {
Class<ProxyTransform> c = Classpath.findClass(s, ProxyTransform.class);
return c.newInstance();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
})
.map(s -> newInstance(s, ProxyTransform.class))
.orElseThrow(() -> new IllegalArgumentException("Missing required field `apply'"));
} else {
transform = null;
Expand Down Expand Up @@ -681,35 +674,36 @@ private void readTransformations(Config cfg) {
}

transformations.forEach((k, v) -> {
try {
Map<String, Object> transformation = toMap(k, v);
EntityDescriptor entity = findEntity(readStr("entity", transformation, k))
.orElseThrow(() -> new IllegalArgumentException(
String.format("Entity `%s` doesn't exist",
transformation.get("entity"))));
Map<String, Object> transformation = toMap(k, v);
EntityDescriptor entity = findEntity(readStr("entity", transformation, k))
.orElseThrow(() -> new IllegalArgumentException(
String.format("Entity `%s` doesn't exist",
transformation.get("entity"))));

Class<? extends Transformation> cls = Classpath.findClass(
readStr("using", transformation, k), Transformation.class);
Transformation t = newInstance(
readStr("using", transformation, k), Transformation.class);

List<AttributeDescriptor<?>> attrs = readList("attributes", transformation, k)
.stream()
.map(a -> entity.findAttribute(a, true).orElseThrow(
() -> new IllegalArgumentException(
String.format("Missing attribute `%s` in `%s`",
a, entity))))
.collect(Collectors.toList());
List<AttributeDescriptor<?>> attrs = readList("attributes", transformation, k)
.stream()
.map(a -> entity.findAttribute(a, true).orElseThrow(
() -> new IllegalArgumentException(
String.format("Missing attribute `%s` in `%s`",
a, entity))))
.collect(Collectors.toList());

TransformationDescriptor.Builder desc = TransformationDescriptor.newBuilder()
.addAttributes(attrs)
.setEntity(entity)
.setTransformation(t);

TransformationDescriptor desc = TransformationDescriptor.newBuilder()
.addAttributes(attrs)
.setEntity(entity)
.setTransformationClass(cls)
.build();
Optional
.ofNullable(transformation.get("filter"))
.map(Object::toString)
.map(s -> newInstance(s, StorageFilter.class))
.ifPresent(desc::setFilter);

this.transformations.put(k, desc);
this.transformations.put(k, desc.build());

} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
});

this.transformations.forEach((k, v) -> v.getTransformation().setup(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package cz.o2.proxima.repository;

import cz.o2.proxima.storage.PassthroughFilter;
import cz.o2.proxima.storage.StorageFilter;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.Getter;

/**
Expand All @@ -37,18 +40,24 @@ static class Builder {

EntityDescriptor entity;
final List<AttributeDescriptor<?>> attrs = new ArrayList<>();
Class<? extends Transformation> transformation;
Transformation transformation;
StorageFilter filter;

Builder setEntity(EntityDescriptor entity) {
this.entity = entity;
return this;
}

Builder setTransformationClass(Class<? extends Transformation> transformation) {
Builder setTransformation(Transformation transformation) {
this.transformation = transformation;
return this;
}

Builder setFilter(StorageFilter filter) {
this.filter = filter;
return this;
}

Builder addAttributes(AttributeDescriptor<?>... attrs) {
Arrays.stream(attrs).forEach(this.attrs::add);
return this;
Expand All @@ -68,11 +77,7 @@ TransformationDescriptor build() {
Preconditions.checkArgument(entity != null,
"Please specify source entity");

try {
return new TransformationDescriptor(entity, attrs, transformation.newInstance());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return new TransformationDescriptor(entity, attrs, transformation, filter);
}
}

Expand All @@ -82,14 +87,18 @@ TransformationDescriptor build() {
private final List<AttributeDescriptor<?>> attributes;
@Getter
private final Transformation transformation;
@Getter
private final StorageFilter filter;

private TransformationDescriptor(
EntityDescriptor entity,
List<AttributeDescriptor<?>> attributes,
Transformation transformation) {
Transformation transformation,
@Nullable StorageFilter filter) {

this.entity = Objects.requireNonNull(entity);
this.attributes = Collections.unmodifiableList(attributes);
this.transformation = Objects.requireNonNull(transformation);
this.filter = filter == null ? new PassthroughFilter() : filter;
}
}
13 changes: 6 additions & 7 deletions core/src/main/java/cz/o2/proxima/util/Classpath.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
* Classpath related utilities.
*/
public class Classpath {

private static final Logger LOG = LoggerFactory.getLogger(Classpath.class);

/** Find given class.
* Try hard to find it replacing `.' by `$' if
* appropriate.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Class<T> findClass(String name, Class<T> superClass)
throws ClassNotFoundException {

public static <T> Class<T> findClass(String name, Class<T> superClass) {

Class clz;
if ((clz = instantiateClass(name)) != null) {
return clz;
Expand All @@ -53,9 +52,9 @@ public static <T> Class<T> findClass(String name, Class<T> superClass)
return clz;
}
}
throw new ClassNotFoundException("Cannot find class " + name);
throw new RuntimeException("Cannot find class " + name);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private static Class instantiateClass(String name) {
try {
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/cz/o2/proxima/repository/RepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package cz.o2.proxima.repository;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.storage.PassthroughFilter;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.randomaccess.KeyValue;
import cz.o2.proxima.transform.EventDataToDummy;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -58,6 +61,15 @@ public void testConfigParsing() throws IOException {
gateway.findAttribute("fail").get().getSchemeURI().toString());
assertEquals("bytes:///",
gateway.findAttribute("bytes").get().getSchemeURI().toString());

assertEquals(1, repo.getTransformations().size());
TransformationDescriptor transform = repo.getTransformations().values().iterator().next();
assertEquals(PassthroughFilter.class, transform.getFilter().getClass());
assertEquals(event, transform.getEntity());
assertEquals(
Arrays.asList(event.findAttribute("data").get()),
transform.getAttributes());
assertEquals(EventDataToDummy.class, transform.getTransformation().getClass());
}

@Test(timeout = 2000)
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
entity: event
attributes: [ "data" ]
using: cz.o2.proxima.transform.EventDataToDummy
# this is default, but add it to test parsing
filter: cz.o2.proxima.storage.PassthroughFilter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,7 @@ private void configure(Map<String, Object> cfg) {
.orElse(consumerPollInterval);

this.partitioner = Optional.ofNullable((String) cfg.get(PARTITIONER_CLASS))
.map(cls -> {
try {
return Classpath.findClass(cls, Partitioner.class);
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
})
.map(cls -> Classpath.findClass(cls, Partitioner.class))
.map(cls -> {
try {
return cls.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Parser<?> getParserForClass(String protoClassName) {
Class<?> protoClass = Classpath.findClass(protoClassName, GeneratedMessage.class);
Method parser = protoClass.getMethod("parser");
return (Parser) parser.invoke(null);
} catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException
} catch (IllegalAccessException | IllegalArgumentException
| NoSuchMethodException | SecurityException | InvocationTargetException ex) {

throw new IllegalArgumentException(
Expand All @@ -106,8 +106,7 @@ private M getDefaultInstance(String protoClass) {
protoClass, GeneratedMessage.class);
Method method = cls.getMethod("getDefaultInstance");
return (M) method.invoke(null);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException
| IllegalAccessException | InvocationTargetException ex) {
} catch (Exception ex) {
throw new IllegalArgumentException(
"Cannot retrieve default instance for type "
+ protoClass);
Expand Down
13 changes: 9 additions & 4 deletions server/src/main/java/cz/o2/proxima/server/IngestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -701,15 +701,16 @@ private void runTransformer(String name, TransformationDescriptor transform) {
.orElseThrow(() -> new IllegalArgumentException(
"Cannot obtain attribute family for " + transform.getAttributes()));

Transformation f = transform.getTransformation();
Transformation t = transform.getTransformation();
StorageFilter f = transform.getFilter();
final String consumer = "transformer-" + name;
CommitLogReader reader = family.getCommitLogReader().get();

new RetryableLogObserver(3, consumer, reader) {

@Override
protected void failure() {
LOG.error("Failed to transform using {}. Bailing out.", f);
LOG.error("Failed to transform using {}. Bailing out.", t);
System.exit(1);
}

Expand All @@ -719,6 +720,10 @@ public boolean onNextInternal(

// add one to prevent confirmation before all elements
// are processed
if (!f.apply(ingest)) {
LOG.debug("Skipping transformation of {} by filter", ingest);
return true;
}
AtomicInteger toConfirm = new AtomicInteger(1);
try {
Transformation.Collector<StreamElement> collector = elem -> {
Expand All @@ -743,7 +748,7 @@ public boolean onNextInternal(
confirm.fail(ex);
}
};
f.apply(ingest, collector);
t.apply(ingest, collector);
if (toConfirm.decrementAndGet() == 0) {
confirm.confirm();
}
Expand All @@ -758,7 +763,7 @@ public boolean onNextInternal(
}.start();
LOG.info(
"Started transformer {} reading from {} using {}",
consumer, reader.getURI(), f.getClass());
consumer, reader.getURI(), t.getClass());
}

/**
Expand Down

0 comments on commit 916c14a

Please sign in to comment.