Skip to content

Commit

Permalink
Enrich java docs for public methods
Browse files Browse the repository at this point in the history
  • Loading branch information
haoch committed Sep 20, 2016
1 parent 0364183 commit 4699f9c
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 33 deletions.
Expand Up @@ -23,12 +23,27 @@
import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;

import java.util.HashMap;
import java.util.Map;

/**
* Siddhi CEP Execution Environment
* <p>
* Siddhi CEP Environment, provides utility methods to
*
* <ul>
* <li>Initialize SiddhiCEP environment based on {@link StreamExecutionEnvironment}</li>
* <li>Register {@link SiddhiStream} with field-based StreamSchema and bind with physical source {@link DataStream}</li>
* <li>Define rich-featured Siddhi CEP execution plan with SQL-Like query for SiddhiStreamOperator</li>
* <li>Transform and connect source DataStream to SiddhiStreamOperator</li>
* <li>Register customizable siddhi plugins to extend built-in CEP functions</li>
* </ul>
* </p>
*
* @see SiddhiStream
* @see org.apache.flink.contrib.siddhi.schema.StreamSchema
* @see org.apache.flink.contrib.siddhi.operator.SiddhiStreamOperator
*/
@PublicEvolving
public class SiddhiCEP {
Expand All @@ -37,51 +52,127 @@ public class SiddhiCEP {
private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas = new HashMap<>();
private final Map<String, Class<?>> extensions = new HashMap<>();

/**
* @param streamExecutionEnvironment Stream Execution Environment
*/
private SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;
}

/**
* @see DataStream
* @return Siddhi streamId and source DataStream mapping.
*/
public Map<String, DataStream<?>> getDataStreams() {
return this.dataStreams;
}

/**
* @see SiddhiStreamSchema
* @return Siddhi streamId and stream schema mapping.
*/
public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas() {
return this.dataStreamSchemas;
}

/**
* @param streamId Siddhi streamId to check.
* @return whether the given streamId is defined in current SiddhiCEP environment.
*/
public boolean isStreamDefined(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return dataStreams.containsKey(streamId);
}

/**
* @return Registered siddhi extensions.
*/
public Map<String, Class<?>> getExtensions() {
return this.extensions;
}

/**
* Check whether given streamId has been defined, if not, throw {@link UndefinedStreamException}
* @param streamId Siddhi streamId to check.
* @throws UndefinedStreamException throws if given streamId is not defined
*/
public void checkStreamDefined(String streamId) throws UndefinedStreamException {
Preconditions.checkNotNull(streamId,"streamId");
if (!isStreamDefined(streamId)) {
throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined");
}
}

public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;
}

public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> inStream, String... fieldNames) {
SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
return environment.from(streamId, inStream, fieldNames);
/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema,
* and select as initial source stream to connect to siddhi operator.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
*/
public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(dataStream.getExecutionEnvironment());
return environment.from(streamId, dataStream, fieldNames);
}

public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> inStream, String... fieldNames) {
this.registerStream(streamId, inStream, fieldNames);
/**
* Register stream with unique <code>streaId</code>, source <code>dataStream</code> and schema fields,
* and select the registered stream as initial stream to connect to Siddhi Runtime.
*
* @see #registerStream(String, DataStream, String...)
* @see #from(String)
*/
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
this.registerStream(streamId, dataStream, fieldNames);
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select stream by streamId as initial stream to connect to Siddhi Runtime.
*
* @param streamId Siddhi Stream Name
* @param <T> Stream Generic Type
*/
public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId) {
Preconditions.checkNotNull(streamId,"streamId");
return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
}

/**
* Select one stream and union other streams by streamId to connect to Siddhi Stream Operator.
*
* @param firstStreamId First siddhi streamId, which should be predefined in SiddhiCEP context.
* @param unionStreamIds Other siddhi streamIds to union, which should be predefined in SiddhiCEP context.
*
* @return The UnionSiddhiStream Builder
*/
public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId, String... unionStreamIds) {
Preconditions.checkNotNull(firstStreamId,"firstStreamId");
Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId, this).union(unionStreamIds);
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*/
public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) {
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
if (isStreamDefined(streamId)) {
throw new DuplicatedStreamException("Input stream: " + streamId + " already exists");
}
Expand All @@ -91,17 +182,33 @@ public <T> void registerStream(final String streamId, DataStream<T> dataStream,
dataStreamSchemas.put(streamId, schema);
}

/**
* @return Current StreamExecutionEnvironment.
*/
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}

/**
* Register Siddhi CEP Extensions
*
* @see <a href="https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi">https://docs.wso2.com/display/CEP310/Writing+Extensions+to+Siddhi</a>
* @param extensionName Unique siddhi extension name
* @param extensionClass Siddhi Extension class
*/
public void registerExtension(String extensionName, Class<?> extensionClass) {
if (extensions.containsKey(extensionName)) {
throw new IllegalArgumentException("Extension named " + extensionName + " already registered");
}
extensions.put(extensionName, extensionClass);
}

/**
* Get registered source DataStream with Siddhi streamId.
*
* @param streamId Siddhi streamId
* @return The source DataStream registered with Siddhi streamId
*/
public <T> DataStream<T> getDataStream(String streamId) {
if (this.dataStreams.containsKey(streamId)) {
return (DataStream<T>) this.dataStreams.get(streamId);
Expand All @@ -110,6 +217,12 @@ public <T> DataStream<T> getDataStream(String streamId) {
}
}

/**
* Create new SiddhiCEP instance.
*
* @param streamExecutionEnvironment StreamExecutionEnvironment
* @return New SiddhiCEP instance.
*/
public static SiddhiCEP getSiddhiEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
return new SiddhiCEP(streamExecutionEnvironment);
}
Expand Down
Expand Up @@ -37,20 +37,30 @@
import java.util.Map;

/**
* Siddhi CEP API Interface
* Siddhi CEP Stream API
*/
@PublicEvolving
public abstract class SiddhiStream {
private final SiddhiCEP environment;
private final SiddhiCEP cepEnvironment;

public SiddhiStream(SiddhiCEP environment) {
this.environment = environment;
/**
* @param cepEnvironment SiddhiCEP cepEnvironment.
*/
public SiddhiStream(SiddhiCEP cepEnvironment) {
Preconditions.checkNotNull(cepEnvironment,"SiddhiCEP cepEnvironment is null");
this.cepEnvironment = cepEnvironment;
}

protected SiddhiCEP getEnvironment() {
return this.environment;
/**
* @return current SiddhiCEP cepEnvironment.
*/
protected SiddhiCEP getCepEnvironment() {
return this.cepEnvironment;
}

/**
* @return Transform SiddhiStream to physical DataStream
*/
protected abstract DataStream<Tuple2<String, Object>> toDataStream();

/**
Expand Down Expand Up @@ -79,16 +89,27 @@ public Object getKey(Tuple2<String, Object> value) throws Exception {
}
}

/**
* ExecutableStream context to define execution logic, i.e. SiddhiCEP execution plan.
*/
public static abstract class ExecutableStream extends SiddhiStream {
public ExecutableStream(SiddhiCEP environment) {
super(environment);
}

/**
* @param executionPlan Siddhi SQL-Like execution plan query
* @return ExecutionSiddhiStream context
*/
public ExecutionSiddhiStream sql(String executionPlan) {
return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getEnvironment());
Preconditions.checkNotNull(executionPlan,"executionPlan");
return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, getCepEnvironment());
}
}

/**
* Initial Single Siddhi Stream Context
*/
public static class SingleSiddhiStream<T> extends ExecutableStream {
private final String streamId;

Expand All @@ -98,19 +119,33 @@ public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
this.streamId = streamId;
}


/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and as the first stream of {@link UnionSiddhiStream}
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*
* @return {@link UnionSiddhiStream} context
*/
public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
getEnvironment().registerStream(streamId, dataStream, fieldNames);
getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
return union(streamId);
}

/**
* @param streamIds Defined siddhi streamIds to union
* @return {@link UnionSiddhiStream} context
*/
public UnionSiddhiStream<T> union(String... streamIds) {
Preconditions.checkNotNull(streamIds);
return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getEnvironment());
Preconditions.checkNotNull(streamIds,"streamIds");
return new UnionSiddhiStream<T>(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
}

@Override
protected DataStream<Tuple2<String, Object>> toDataStream() {
return convertDataStream(getEnvironment().getDataStream(this.streamId), this.streamId);
return convertDataStream(getCepEnvironment().getDataStream(this.streamId), this.streamId);
}
}

Expand All @@ -120,6 +155,8 @@ public static class UnionSiddhiStream<T> extends ExecutableStream {

public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
super(environment);
Preconditions.checkNotNull(firstStreamId,"firstStreamId");
Preconditions.checkNotNull(unionStreamIds,"unionStreamIds");
environment.checkStreamDefined(firstStreamId);
for (String unionStreamId : unionStreamIds) {
environment.checkStreamDefined(unionStreamId);
Expand All @@ -128,25 +165,41 @@ public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, Sidd
this.unionStreamIds = unionStreamIds;
}

/**
* Define siddhi stream with streamId, source <code>DataStream</code> and stream schema and continue to union it with current stream.
*
* @param streamId Unique siddhi streamId
* @param dataStream DataStream to bind to the siddhi stream.
* @param fieldNames Siddhi stream schema field names
*
* @return {@link UnionSiddhiStream} context
*/
public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String... fieldNames) {
getEnvironment().registerStream(streamId, dataStream, fieldNames);
Preconditions.checkNotNull(streamId,"streamId");
Preconditions.checkNotNull(dataStream,"dataStream");
Preconditions.checkNotNull(fieldNames,"fieldNames");
getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
return union(streamId);
}

/**
* @param streamId another defined streamId to union with.
* @return {@link UnionSiddhiStream} context
*/
public UnionSiddhiStream<T> union(String... streamId) {
List<String> newUnionStreamIds = new LinkedList<>();
newUnionStreamIds.addAll(unionStreamIds);
newUnionStreamIds.addAll(Arrays.asList(streamId));
return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getEnvironment());
return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
}

@Override
protected DataStream<Tuple2<String, Object>> toDataStream() {
final String localFirstStreamId = firstStreamId;
final List<String> localUnionStreamIds = this.unionStreamIds;
DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
DataStream<Tuple2<String, Object>> dataStream = convertDataStream(getCepEnvironment().<T>getDataStream(localFirstStreamId), this.firstStreamId);
for (String unionStreamId : localUnionStreamIds) {
dataStream = dataStream.union(convertDataStream(getEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
dataStream = dataStream.union(convertDataStream(getCepEnvironment().<T>getDataStream(unionStreamId), unionStreamId));
}
return dataStream;
}
Expand Down

0 comments on commit 4699f9c

Please sign in to comment.