Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2085 windowed operator interfaces and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
David Yan committed Jun 23, 2016
1 parent 45d56b4 commit 20fc1d3
Show file tree
Hide file tree
Showing 22 changed files with 18,902 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import org.apache.hadoop.classification.InterfaceStability;

/**
* This interface is for the processing part of the WindowedOperator.
* We can assume that all stateful processing of the WindowedOperator is a form of accumulation.
*
* In most cases, AccumT is the same as OutputT. But in some cases, the accumulated type and the output type may be
* different. For example, if we are doing the AVERAGE of doubles, InputT will be double, and we need the SUM and the
* COUNT stored as type AccumT, and AccumT will be a pair of double and long, in which double is the sum of the inputs,
* and long is the number of inputs. OutputT will be double, because it represents the average of the inputs.
*/
@InterfaceStability.Evolving
public interface Accumulation<InputT, AccumT, OutputT>
{
/**
* Returns the default accumulated value when nothing has been accumulated
*
* @return
*/
AccumT defaultAccumulatedValue();

/**
* Accumulates the input to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate(AccumT accumulatedValue, InputT input);

/**
* Merges two accumulated value into one
*
* @param accumulatedValue1
* @param accumulatedValue2
* @return
*/
AccumT merge(AccumT accumulatedValue1, AccumT accumulatedValue2);

/**
* Gets the output of the accumulated value. This is used for generating the data for triggers
*
* @param accumulatedValue
* @return
*/
OutputT getOutput(AccumT accumulatedValue);

/**
* Gets the retraction of the accumulated value. This is used for retracting previous panes in
* ACCUMULATING_AND_RETRACTING accumulation mode
*
* @param accumulatedValue
* @return
*/
OutputT getRetraction(AccumT accumulatedValue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.apex.malhar.lib.window;

/**
* Control tuple interface.
* TODO: This should be removed or moved to Apex Core when Apex Core has native support for custom control tuples.
*/
public interface ControlTuple
{
/**
* Watermark control tuple
*/
interface Watermark extends ControlTuple
{
/**
* Gets the timestamp associated with this watermark
*
* @return
*/
long getTimestamp();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

/**
* This is the interface for accumulation when joining multiple streams.
*/
public interface JoinAccumulation<InputT1, InputT2, InputT3, InputT4, InputT5, AccumT, OutputT> extends Accumulation<InputT1, AccumT, OutputT>
{
/**
* Accumulate the second input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate2(AccumT accumulatedValue, InputT2 input);

/**
* Accumulate the third input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate3(AccumT accumulatedValue, InputT3 input);

/**
* Accumulate the fourth input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate4(AccumT accumulatedValue, InputT4 input);

/**
* Accumulate the fifth input type to the accumulated value
*
* @param accumulatedValue
* @param input
* @return
*/
AccumT accumulate5(AccumT accumulatedValue, InputT5 input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;

import java.util.Collection;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceStability;

/**
* This interface is for storing data for session windowed streams.
*
* @param <K> The key type
* @param <V> The value type
*/
@InterfaceStability.Evolving
public interface SessionWindowedStorage<K, V> extends WindowedKeyedStorage<K, V>
{
/**
* Given the key, the timestamp and the gap, gets the data that falls into timestamp +/- gap.
* This is used for getting the entry the data given the timestamp belongs to, and for determining whether to merge
* session windows.
* This should only return at most two entries if sessions have been merged appropriately.
*
* @param key the key
* @param timestamp the timestamp
* @param gap
* @return
*/
Collection<Map.Entry<Window.SessionWindow, V>> getSessionEntries(K key, long timestamp, long gap);
}

0 comments on commit 20fc1d3

Please sign in to comment.