Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

APEXMALHAR-2085: Operator supporting the Beam concepts of windowing, watermarks, triggering and accumulation #319

Merged
merged 1 commit into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public void waitForResultCount(int count, long timeoutMillis) throws Interrupted
@Override
public int getCount(boolean reset)
{
throw new UnsupportedOperationException("Not supported yet.");
synchronized (collectedTuples) {
try {
return collectedTuples.size();
} finally {
if (reset) {
collectedTuples.clear();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import org.apache.hadoop.classification.InterfaceStability;

/**
* This interface is for the processing part of the WindowedOperator.
* We can assume that all stateful processing of the WindowedOperator is a form of accumulation.
*
* In most cases, AccumT is the same as OutputT. But in some cases, the accumulated type and the output type may be
* different. For example, if we are doing the AVERAGE of doubles, InputT will be double, and we need the SUM and the
* COUNT stored as type AccumT, and AccumT will be a pair of double and long, in which double is the sum of the inputs,
* and long is the number of inputs. OutputT will be double, because it represents the average of the inputs.
*/
@InterfaceStability.Evolving
public interface Accumulation<InputT, AccumT, OutputT>
{
/**
* Returns the default accumulated value when nothing has been accumulated
*
* @return
*/
AccumT defaultAccumulatedValue();

/**
* Accumulates the input to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate(AccumT accumulatedValue, InputT input);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to make the interface stateless? Should it be stateful? My concern is with the interface promoting boxing/unboxing pattern and leading to an excessive GC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov Good point. I'm assuming you're asking why not:

  void accumulate(AccumT accumulatedValue, InputT input);

with accumulatedValue updated in place. But doing it will make it a lot less flexible because the underlying storage might not support this kind of operation. For example, if the storage supports get(key) and put(key, value) with get(key) returning not a reference to the actual object (possibly as a result of deserialization), then it would not work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is to delegate to a class that implements Accumulation interface handling of accumulation and change interface to

void accumulate(InputT input);
void merge(Accumulation accumulatedValue);
OutputT getOutput();

The implementation class will need to define how it handles accumulation and how AccumT is defined. The implementation may use Collection for AccumT or may use primitive types such as int, long or double to accumulate values.


/**
* Merges two accumulated value into one
*
* @param accumulatedValue1
* @param accumulatedValue2
* @return
*/
AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2);

/**
* Gets the output of the accumulated value. This is used for generating the data for triggers
*
* @param accumulatedValue
* @return
*/
OutputT getOutput(AccumT accumulatedValue);

/**
* Gets the retraction of the value. This is used for retracting previous panes in
* ACCUMULATING_AND_RETRACTING accumulation mode
*
* @param value
* @return
*/
OutputT getRetraction(OutputT value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import org.apache.hadoop.classification.InterfaceStability;

/**
* Control tuple interface.
* TODO: This should be removed or moved to Apex Core when Apex Core has native support for custom control tuples.
*/
@InterfaceStability.Evolving
public interface ControlTuple
{
/**
* Watermark control tuple
*/
interface Watermark extends ControlTuple
{
/**
* Gets the timestamp associated with this watermark
*
* @return
*/
long getTimestamp();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import org.apache.hadoop.classification.InterfaceStability;

/**
* This is the interface for accumulation when joining multiple streams.
*/
@InterfaceStability.Evolving
public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
{
/**
* Accumulate the second input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate2(AccumT accumulatedValue, InputT2 input);

/**
* Accumulate the third input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate3(AccumT accumulatedValue, InputT3 input);

/**
* Accumulate the fourth input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate4(AccumT accumulatedValue, InputT4 input);

/**
* Accumulate the fifth input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate5(AccumT accumulatedValue, InputT5 input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import java.util.Collection;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceStability;

/**
* This interface is for storing data for session windowed streams.
*
* @param <K> The key type
* @param <V> The value type
*/
@InterfaceStability.Evolving
public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V>
{
/**
* Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
* This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
* session windows.
* This should only return at most two entries if sessions have been merged appropriately.
*
* @param key the key
* @param timestamp the timestamp
* @param gap
* @return
*/
Collection<Map.Entry<Window.SessionWindow<K>, V>> getSessionEntries(K key, long timestamp, long gap);
}