Permalink
Browse files

PIG-2651: Provide a much easier to use accumulator interface

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1348264 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 715d22d commit 9be56bcdc6f1b4fd9399cc4bafe65f4df5a5ad82 Jianyong Dai committed Jun 8, 2012
View
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2651: Provide a much easier to use accumulator interface (jcoveney via daijy)
+
PIG-2658: Add pig.script.submitted.timestamp and pig.job.submitted.timestamp in generated Map-Reduce job conf (billgraham)
PIG-2735: Add a pig.version.suffix property in build.xml to easily override with a build number (julien)
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This class provides a much more intuitive way to write Accumulator UDFs.<br>
+ * For example, you could express IsEmpty as follows:
+ * <pre><code>public class IsEmpty extends IteratingAccumulatorEvalFunc<Boolean> {
+ * public Boolean exec(Iterator<Tuple> iter) throws IOException {
+ * return !iter.hashNext();
+ * }
+ * }</code></pre>
+ * Count could be implemented as follows:
+ * <pre><code>public class Count extends IteratingAccumulatorEvalFunc<Long> {
+ * public Long exec(Iterator<Tuple> iter) throws IOException {
+ * long ct = 0;
+ * for (; iter.hasNext(); iter.next()) {
+ * ct++;
+ * }
+ * return ct;
+ * }
+ * }</code></pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class IteratingAccumulatorEvalFunc<T> extends AccumulatorEvalFunc<T> implements TerminatingAccumulator<T> {
+ private boolean isInitialized = false;
+ private DelayedQueueIterator dqi;
+ private BlockingQueue<Tuple> queue;
+ private volatile boolean isFinished = false;
+ private volatile boolean noMoreValues = false;
+ private volatile boolean exceptionThrown = false;
+ private T returnValue;
+ private Thread executionThread;
+ private Exception executionThreadException;
+
+ private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private static final long WAIT_TO_OFFER = 500L;
+ private static final long WAIT_TO_POLL = 500L;
+ private static final long WAIT_TO_JOIN = 500L;
+
+ private void initialize() {
+ dqi = new DelayedQueueIterator();
+ queue = new LinkedBlockingQueue<Tuple>(10000);
+
+ executionThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ returnValue = exec(dqi);
+ isFinished = true;
+ } catch (Exception e) {
+ executionThreadException = e;
+ exceptionThrown = true;
+ }
+ }
+ });
+ executionThread.start();
+
+ isInitialized = true;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ @Override
+ public void accumulate(Tuple input) throws IOException {
+ if (!isInitialized)
+ initialize();
+
+ for (Tuple t : (DataBag)input.get(0)) {
+ if (isFinished)
+ return;
+
+ boolean added = false;
+ while (!isFinished && !added && !exceptionThrown)
+ try {
+ added = queue.offer(t, WAIT_TO_OFFER, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ } //TODO handle the exception?
+
+ if (exceptionThrown)
+ throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
+ }
+ }
+
+ @Override
+ public T getValue() {
+ noMoreValues = true;
+
+ do {
+ if (exceptionThrown)
+ throw new RuntimeException("Exception thrown in thread: ", executionThreadException);
+
+ try {
+ executionThread.join(WAIT_TO_JOIN);
+ } catch (InterruptedException e) {
+ } //TODO handle the exception?
+ } while (executionThread.isAlive());
+
+ return returnValue;
+ }
+
+ @Override
+ public void cleanup() {
+ returnValue = null;
+ dqi = null;
+ queue = null;
+ isFinished = false;
+ noMoreValues = false;
+ executionThread = null;
+ isInitialized = false;
+ }
+
+ public abstract T exec(Iterator<Tuple> iter) throws IOException;
+
+ private class DelayedQueueIterator implements Iterator<Tuple> {
+ private Tuple next;
+
+ @Override
+ public boolean hasNext() {
+ if (next != null)
+ return true;
+
+ while (!noMoreValues) {
+ try {
+ next = queue.poll(WAIT_TO_POLL, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ if (next != null)
+ return true;
+ }
+
+ next = queue.poll();
+
+ if (next != null)
+ return true;
+
+ return false;
+ }
+
+ @Override
+ public Tuple next() {
+ Tuple t = next;
+
+ if (t == null)
+ throw new RuntimeException("Entered inconsistent state!");
+
+ next = null;
+
+ return t;
+ }
+
+ @Override
+ public void remove() {}
+ }
+}
@@ -398,10 +398,11 @@ protected Result processPlan() throws ExecException{
// if accumulating, we haven't got data yet for some fields, just return
if (isAccumulative() && isAccumStarted()) {
- if (earlyTermination.cardinality() < noItems)
+ if (earlyTermination.cardinality() < noItems) {
res.returnStatus = POStatus.STATUS_BATCH_OK;
- else
+ } else {
res.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
+ }
return res;
}
@@ -3202,7 +3202,37 @@ register :FUNCPATH:/testudf.jar;
b = foreach (group a all) generate org.apache.pig.test.udf.evalfunc.NonAlgNonAccCount(a) as ct;
c = foreach b generate ct, 1, 1;
store c into ':OUTPATH:';\,
- }
+ },
+ {
+ 'num' => 7,
+ 'java_params' => ['-Dpig.exec.nocombiner=true', '-Dpig.accumulative.batchsize=5'],
+ 'pig' => q\a = load ':INPATH:/singlefile/votertab10k' using PigStorage() as (name, age:long, registration, contributions);
+register :FUNCPATH:/testudf.jar;
+b = foreach (group a all) generate COUNT(a),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorCount(a),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorSum(a.age),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorIsEmpty(a);
+store b into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/votertab10k' using PigStorage() as (name, age:long, registration, contributions);
+b = foreach (group a all) generate COUNT(a), SUM(a.age), IsEmpty(a);
+c = foreach b generate $0, *;
+store c into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 8,
+ 'java_params' => ['-Dpig.exec.nocombiner=true', '-Dpig.accumulative.batchsize=5'],
+ 'pig' => q\a = load ':INPATH:/singlefile/votertab10k' using PigStorage() as (name, age:long, registration, contributions);
+register :FUNCPATH:/testudf.jar;
+b = foreach (group a all) generate org.apache.pig.test.udf.evalfunc.NonAlgNonAccCount(a),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorCount(a),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorSum(a.age),
+ org.apache.pig.test.udf.evalfunc.IteratingAccumulatorIsEmpty(a);
+store b into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/votertab10k' using PigStorage() as (name, age:long, registration, contributions);
+register :FUNCPATH:/testudf.jar;
+b = foreach (group a all) generate org.apache.pig.test.udf.evalfunc.NonAlgNonAccCount(a), COUNT(a), SUM(a.age), IsEmpty(a);
+store b into ':OUTPATH:';\,
+ },
]
},
{
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.udf.evalfunc;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.IteratingAccumulatorEvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class IteratingAccumulatorCount extends IteratingAccumulatorEvalFunc<Long> {
+ public Long exec(Iterator<Tuple> iter) throws IOException {
+ long ct = 0;
+ for (; iter.hasNext(); iter.next()) {
+ ct++;
+ }
+ return ct;
+ }
+}
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.udf.evalfunc;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.IteratingAccumulatorEvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class IteratingAccumulatorIsEmpty extends IteratingAccumulatorEvalFunc<Boolean> {
+ public Boolean exec(Iterator<Tuple> iter) throws IOException {
+ return !iter.hasNext();
+ }
+}
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.udf.evalfunc;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.IteratingAccumulatorEvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class IteratingAccumulatorSum extends IteratingAccumulatorEvalFunc<Long> {
+ public Long exec(Iterator<Tuple> iter) throws IOException {
+ long sum = 0;
+ for (; iter.hasNext(); sum += (Long)iter.next().get(0)) {
+ continue;
+ }
+ return sum;
+ }
+}

0 comments on commit 9be56bc

Please sign in to comment.