From 19029582be0725319597aa6f680a8a61ab7b015e Mon Sep 17 00:00:00 2001 From: "Pradeep A. Dalvi" Date: Wed, 16 Mar 2016 17:39:17 +0530 Subject: [PATCH] APEXMALHAR-2015: Projection Operator & its unit tests - Projection Operator - Unit tests for select/drop fields & projection operator --- .../lib/projection/ProjectionOperator.java | 310 ++++++++++++++++++ .../lib/projection/ActivateTest.java | 160 +++++++++ .../lib/projection/ProjectionTest.java | 276 ++++++++++++++++ 3 files changed, 746 insertions(+) create mode 100644 library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java create mode 100644 library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java create mode 100644 library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java new file mode 100644 index 0000000000..8598de9c00 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.projection; + +import java.lang.reflect.Field; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.lib.util.PojoUtils; + +/** + * ProjectionOperator + * Projection Operator projects defined set of fields from given selectFields/dropFields + * + * Parameters + * - selectFields: comma separated list of fields to be selected from input tuples + * - dropFields: comma separated list of fields to be dropped from input tuples + * selectFields and dropFields are optional and either of them shall be specified + * When both are not specified, all fields shall be projected to downstream operator + * When both are specified, selectFields shall be given the preference + * + * Input Port takes POJOs as an input + * + * Output Ports + * - projected port emits POJOs with projected fields from input POJOs + * - remainder port, if connected, emits POJOs with remainder fields from input POJOs + * - error port emits input POJOs as is upon error situations + * + * Examples + * For {a, b, c} type of input tuples + * - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c} + * - when selectFields = "a" and dropFields = "b", projected port shall emit {a}, remainder {b, c} + * - when selectFields = "b", projected port shall emit {b} and remainder port shall emit {a, c} + * - when dropFields = "b", projected port shall emit {a, c} and remainder port shall emit {b} + * + */ +public class ProjectionOperator extends BaseOperator implements Operator.ActivationListener +{ + protected String selectFields; + protected String dropFields; + protected String condition; + + static class TypeInfo + { + String name; + Class type; + PojoUtils.Setter setter; + PojoUtils.Getter getter; + + public TypeInfo(String name, Class type) + { + this.name = name; + this.type = type; + } + + public String toString() + { + String s = new String("'name': " + name + " 'type': " + type); + return s; + } + } + + private transient List projectedFields = new ArrayList<>(); + private transient List remainderFields = new ArrayList<>(); + + @VisibleForTesting + List getProjectedFields() + { + return projectedFields; + } + + @VisibleForTesting + List getRemainderFields() + { + return remainderFields; + } + + @AutoMetric + protected long projectedTuples; + + @AutoMetric + protected long remainderTuples; + + @AutoMetric + protected long errorTuples; + + protected Class inClazz = null; + protected Class projectedClazz = null; + protected Class remainderClazz = null; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input = new DefaultInputPort() + { + public void setup(PortContext context) + { + inClazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object t) + { + handleProjection(t); + } + }; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort projected = new DefaultOutputPort() + { + public void setup(PortContext context) + { + projectedClazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort remainder = new DefaultOutputPort() + { + public void setup(PortContext context) + { + remainderClazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + + public final transient DefaultOutputPort error = new DefaultOutputPort(); + + /** + * addProjectedField: Add field details (name, type, getter and setter) for field with given name + * in projectedFields list + */ + protected void addProjectedField(String s) + { + try { + Field f = inClazz.getDeclaredField(s); + TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType())); + t.getter = PojoUtils.createGetter(inClazz, t.name, t.type); + t.setter = PojoUtils.createSetter(projectedClazz, t.name, t.type); + projectedFields.add(t); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Field " + s + " not found in class " + inClazz, e); + } + } + + /** + * addRemainderField: Add field details (name, type, getter and setter) for field with given name + * in remainderFields list + */ + protected void addRemainderField(String s) + { + try { + Field f = inClazz.getDeclaredField(s); + TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType())); + t.getter = PojoUtils.createGetter(inClazz, t.name, t.type); + t.setter = PojoUtils.createSetter(remainderClazz, t.name, t.type); + remainderFields.add(t); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Field " + s + " not found in class " + inClazz, e); + } + } + + @Override + public void activate(Context context) + { + final Field[] allFields = inClazz.getDeclaredFields(); + + if (selectFields != null && !selectFields.isEmpty()) { + List sFields = Arrays.asList(selectFields.split(",")); + for (String s : sFields) { + addProjectedField(s); + } + + if (remainderClazz != null) { + for (Field f : allFields) { + if (!sFields.contains(f.getName())) { + addRemainderField(f.getName()); + } + } + } else { + logger.info("Remainder Port does not have Schema class defined"); + } + } else { + List dFields = new ArrayList<>(); + if (dropFields != null && !dropFields.isEmpty()) { + dFields = Arrays.asList(dropFields.split(",")); + if (remainderClazz != null) { + for (String s : dFields) { + addRemainderField(s); + } + } else { + logger.info("Remainder Port does not have Schema class defined"); + } + } + + for (Field f : allFields) { + if (!dFields.contains(f.getName())) { + addProjectedField(f.getName()); + } + } + } + + logger.debug("projected fields: {}", projectedFields); + logger.debug("remainder fields: {}", remainderFields); + } + + @Override + public void deactivate() + { + projectedFields.clear(); + remainderFields.clear(); + } + + @Override + public void beginWindow(long windowId) + { + errorTuples = projectedTuples = remainderTuples = 0; + } + + protected Object getProjectedObject(Object t) throws IllegalAccessException + { + try { + Object p = projectedClazz.newInstance(); + for (TypeInfo ti: projectedFields) { + ti.setter.set(p, ti.getter.get(t)); + } + return p; + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw e; + } + } + + protected Object getRemainderObject(Object t) throws IllegalAccessException + { + try { + Object r = remainderClazz.newInstance(); + for (TypeInfo ti: remainderFields) { + ti.setter.set(r, ti.getter.get(t)); + } + return r; + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw e; + } + } + + /** + * handleProjection: emit projected object on projected port + * and remainder object on remainder port if that is connected. + */ + private void handleProjection(Object t) + { + try { + Object p = getProjectedObject(t); + + if (remainder.isConnected()) { + Object r = getRemainderObject(t); + remainder.emit(r); + remainderTuples++; + } + + projected.emit(p); + projectedTuples++; + } catch (IllegalAccessException e) { + error.emit(t); + errorTuples++; + } + } + + private static final Logger logger = LoggerFactory.getLogger(ProjectionOperator.class); +} diff --git a/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java new file mode 100644 index 0000000000..f0b684f842 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.projection; + +import java.util.Date; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for Projection related Activate method + */ +public class ActivateTest +{ + private static final Logger logger = LoggerFactory.getLogger(ActivateTest.class); + + public static class DummyPOJO + { + private long l; + private String str; + private Date date; + + public long getL() + { + return l; + } + + public void setL(long l) + { + this.l = l; + } + + public String getStr() + { + return str; + } + + public void setStr(String str) + { + this.str = str; + } + + public Date getDate() + { + return date; + } + + public void setDate(Date date) + { + this.date = date; + } + } + + private static ProjectionOperator projection; + private static DummyPOJO data; + + @Test + public void testSelectDropFieldsNull() + { + logger.debug("start round 0"); + projection.selectFields = null; + projection.dropFields = null; + projection.activate(null); + Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size()); + Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size()); + projection.deactivate(); + logger.debug("start round 0"); + } + + @Test + public void testSelectDropFieldsEmpty() + { + logger.debug("start round 0"); + projection.selectFields = ""; + projection.dropFields = ""; + projection.activate(null); + Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size()); + Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size()); + projection.deactivate(); + logger.debug("start round 0"); + } + + @Test + public void testSelectFields() + { + logger.debug("start round 0"); + projection.selectFields = "l,str"; + projection.dropFields = ""; + projection.activate(null); + Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size()); + Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size()); + projection.deactivate(); + logger.debug("start round 0"); + } + + @Test + public void testDropFields() + { + logger.debug("start round 0"); + projection.selectFields = ""; + projection.dropFields = "str,date"; + projection.activate(null); + Assert.assertEquals("projected fields", 1, projection.getProjectedFields().size()); + Assert.assertEquals("remainder fields", 2, projection.getRemainderFields().size()); + projection.deactivate(); + logger.debug("start round 0"); + } + + @Test + public void testBothFieldsSpecified() + { + logger.debug("start round 0"); + projection.selectFields = ""; + projection.selectFields = "l,str"; + projection.dropFields = "str,date"; + projection.activate(null); + Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size()); + Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size()); + projection.deactivate(); + logger.debug("start round 0"); + } + + @BeforeClass + public static void setup() + { + data = new DummyPOJO(); + projection = new ProjectionOperator(); + projection.inClazz = DummyPOJO.class; + projection.projectedClazz = DummyPOJO.class; + projection.remainderClazz = DummyPOJO.class; + } + + @AfterClass + public static void teardown() + { + projection.teardown(); + } +} diff --git a/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java new file mode 100644 index 0000000000..a47b1673c8 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.projection; + +import java.lang.reflect.Field; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * Tests for ProjectionOperator + */ +public class ProjectionTest +{ + private static final Logger logger = LoggerFactory.getLogger(ProjectionTest.class); + + public static class DummyPOJO + { + private long projected; + private long remainder; + + public long getProjected() + { + return projected; + } + + public void setProjected(long projected) + { + this.projected = projected; + } + + public long getRemainder() + { + return remainder; + } + + public void setRemainder(long remainder) + { + this.remainder = remainder; + } + } + + public static class ProjectedPOJO + { + private long projected; + + public long getProjected() + { + return projected; + } + + public void setProjected(long projected) + { + this.projected = projected; + } + } + + public static class RemainderPOJO + { + private long remainder; + + public long getRemainder() + { + return remainder; + } + + public void setRemainder(long remainder) + { + this.remainder = remainder; + } + } + + private static ProjectionOperator projection; + private static DummyPOJO data; + + public Long getFieldValue(Object p, String field) + { + Long value = new Long(0); + + for (Field f: p.getClass().getDeclaredFields()) { + f.setAccessible(true); + try { + logger.debug("{} field: {} type: {} val: {}", field, f.getName(), f.getType(), f.get(p)); + } catch (IllegalAccessException e) { + logger.info("could not access value of field: {} type: {}", f.getName(), f.getType()); + } + } + + try { + value = (Long)p.getClass().getDeclaredField(field).get(p); + } catch (NoSuchFieldException e) { + Assert.assertTrue(e instanceof NoSuchFieldException); + } catch (IllegalAccessException e) { + Assert.assertTrue(e instanceof IllegalAccessException); + } + + return value; + } + + public void checkProjected(Object p, Integer val) + { + Long value = ((ProjectedPOJO)p).getProjected(); + + Assert.assertEquals("projected field value", new Long(val), value); + } + + public void checkRemainder(Object r, Integer val) + { + Long value = ((RemainderPOJO)r).getRemainder(); + + Assert.assertEquals("remainder field value", new Long(val), value); + } + + @Test + public void testProjectionRemainder() + { + logger.debug("start round 0"); + projection.beginWindow(0); + + data.setProjected(1234); + data.setRemainder(6789); + + Object p = null; + try { + p = projection.getProjectedObject(data); + } catch (IllegalAccessException e) { + Assert.assertTrue(e instanceof IllegalAccessException); + } + logger.debug("projected class {}", p.getClass()); + + Object r = null; + try { + r = projection.getRemainderObject(data); + } catch (IllegalAccessException e) { + Assert.assertTrue(e instanceof IllegalAccessException); + } + logger.debug("remainder class {}", r.getClass()); + + checkProjected(p, 1234); + checkRemainder(r, 6789); + + projection.endWindow(); + logger.debug("end round 0"); + } + + @Test + public void testProjected() + { + logger.debug("start round 0"); + projection.beginWindow(0); + + data.setProjected(2345); + data.setRemainder(5678); + + Object p = null; + try { + p = projection.getProjectedObject(data); + } catch (IllegalAccessException e) { + Assert.assertTrue(e instanceof IllegalAccessException); + } + logger.debug("projected class {}", p.getClass()); + + checkProjected(p, 2345); + + projection.endWindow(); + logger.debug("end round 0"); + } + + @Test + public void testRemainder() + { + logger.debug("start round 0"); + projection.beginWindow(0); + + data.setProjected(9876); + data.setRemainder(4321); + + Object r = null; + try { + r = projection.getRemainderObject(data); + } catch (IllegalAccessException e) { + Assert.assertTrue(e instanceof IllegalAccessException); + } + logger.debug("remainder class {}", r.getClass()); + + checkRemainder(r, 4321); + + projection.endWindow(); + logger.debug("end round 0"); + } + + @Test + public void testProjection() + { + logger.debug("start round 0"); + projection.beginWindow(0); + + projection.input.process(data); + Assert.assertEquals("projected tuples", 1, projection.projectedTuples); + Assert.assertEquals("remainder tuples", 0, projection.remainderTuples); + + projection.endWindow(); + logger.debug("end round 0"); + + CollectorTestSink projectedSink = new CollectorTestSink(); + CollectorTestSink remainderSink = new CollectorTestSink(); + + projection.projected.setSink(projectedSink); + projection.remainder.setSink(remainderSink); + + /* Collector Sink Test when remainder port is connected */ + logger.debug("start round 1"); + projection.beginWindow(1); + + data.setProjected(4321); + data.setRemainder(9876); + + projection.input.process(data); + Assert.assertEquals("projected tuples", 1, projection.projectedTuples); + Assert.assertEquals("remainder tuples", 1, projection.remainderTuples); + + Object p = projectedSink.collectedTuples.get(0); + Object r = remainderSink.collectedTuples.get(0); + + checkProjected(p, 4321); + checkRemainder(r, 9876); + + projection.endWindow(); + logger.debug("end round 1"); + } + + @BeforeClass + public static void setup() + { + data = new DummyPOJO(); + projection = new ProjectionOperator(); + projection.inClazz = DummyPOJO.class; + projection.projectedClazz = ProjectedPOJO.class; + projection.remainderClazz = RemainderPOJO.class; + + + projection.selectFields = "projected"; + projection.activate(null); + } + + @AfterClass + public static void teardown() + { + projection.deactivate(); + projection.teardown(); + } +}