Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library #2487

Closed
wants to merge 22 commits into from

Conversation

@haoch
Copy link
Member

commented Sep 10, 2016

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General
    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation
    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build
    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

Abstraction

Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under Apache Software License v2.0. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries.

It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.

Features

  • Integrate Siddhi CEP as an stream operator (i.e. TupleStreamSiddhiOperator), supporting rich CEP features like
    • Filter
    • Join
    • Aggregation
    • Group by
    • Having
    • Window
    • Conditions and Expressions
    • Pattern processing
    • Sequence processing
    • Event Tables
      ...
  • Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See SiddhiCEP and SiddhiStream)
    • Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
    • Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan
    • Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema
  • Integrate siddhi runtime state management with Flink state (See AbstractSiddhiOperator)
  • Support siddhi plugin management to extend CEP functions. (See SiddhiCEP#registerExtension)

Test Cases

Example

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

 cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);

 cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp");
 cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp");

 DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
    .from("inputStream1").union("inputStream2")
    .sql(
        "from every s1 = inputStream1[id == 2] "
         + " -> s2 = inputStream2[id == 3] "
         + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
         + "insert into outputStream"
    )
    .returns("outputStream");

 env.execute();
public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;
this.dataStreams = new HashMap<>();
this.dataStreamSchemas = new HashMap<>();

This comment has been minimized.

Copy link
@apivovarov

apivovarov Sep 10, 2016

Contributor

lines 64 and 65 can be removed.
Add = new HashMap<>(); to lines 36 and 37 similar as it was done on line 38

}

public void registerExtension(String extensionName, Class<?> extensionClass) {
if(extensions.containsKey(extensionName)){

This comment has been minimized.

Copy link
@apivovarov

apivovarov Sep 10, 2016

Contributor

not formatted code, put space after if and before {

return (DataStream<T>) this.dataStreams.get(streamId);
}else{
throw new UndefinedStreamException("Undefined stream "+streamId);
}

This comment has been minimized.

Copy link
@apivovarov

apivovarov Sep 10, 2016

Contributor

block 108-112 is not formatted

@haoch

This comment has been minimized.

Copy link
Member Author

commented Sep 10, 2016

@apivovarov thanks very much for the comments. I have formatted all code as required. Pls. kindly help continue reviewing.

}
}

public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

Could you put constructor after the fields definitions?

}

public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) {
this.executionEnvironment = streamExecutionEnvironment;

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

I would suggest to use Preconditions class to check the input.

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

The same comment is for other public methods of this class.

* Siddhi CEP Execution Environment
*/
@PublicEvolving
public class SiddhiCEP {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

I think this class would benefit from more JavaDocs.

public abstract class SiddhiStream {
private final SiddhiCEP environment;

public SiddhiStream(SiddhiCEP environment) {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

Could we name this cepEnvironment?

this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
this.streamRecordSerializers = new HashMap<>();

for (String streamId : this.siddhiPlan.getInputStreams()) {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

This can be moved into a separate method.

}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

This code seems to be similar to the code from CEP library. Can we reuse it somehow?

/**
* SiddhiCEP Operator Execution Context
*/
public class SiddhiOperatorContext implements Serializable {

This comment has been minimized.

Copy link
@mushketyk

mushketyk Sep 12, 2016

Contributor

I think all public methods would benefit from JavaDocs

@mushketyk

This comment has been minimized.

Copy link
Contributor

commented Sep 12, 2016

Hi @haoch,

I think it would be beneficial if you write a few words describing your design or add more JavaDocs. This would make the review process more straightforward.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Sep 13, 2016

@mushketyk thanks very much for reviewing, will fix as required soon.

@StephanEwen

This comment has been minimized.

Copy link
Contributor

commented Sep 19, 2016

Thank you for that big contribution. Siddhi looks like a cool approach to CEP.

Before digging into the details, I would like to start a discussion about whether we should have this as a part of the core Flink repository, as a subproject, or if it would be best to have it initially as an external project.

The reason is that that Flink repository is becoming a bit big right now. Build times are very long, test stability hard to manage, and there is quite a bit of "dead" code that was contributed by someone at some point but seems rarely used and is not maintained by the contributors.

To help have a good discussion, it would be great to learn a bit more:

  • How complete is the implementation?
  • Would you be up for maintaining this code?
  • Are you building this as an experiment, or building a production use case based on Siddhi on Flink?

Thanks,
Stephan

@haoch

This comment has been minimized.

Copy link
Member Author

commented Sep 20, 2016

@mushketyk I have added lots of java docs as required in latest commit: 4699f9c, please continue to help review.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Sep 20, 2016

@StephanEwen thanks for the comments, I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather. As an alternative solution for too test stability and dead code, may it possible to create another code repository say "flink-library" with independent ci?

BTW: here are the answers to your questions one by one:

How complete is the implementation?

Siddhi is a rich-featured CEP and has its own community, and maybe almost the only open source CEP solutions compatible with Apache License. And this library flink-siddhi is mainly focused on bring siddhi's capability to flink users seamlessly by:

  • Integrate Siddhi CEP runtime with flink lifecycle
  • Schema and DataStream source mapping
  • State management and fault-tolerant.

So I think it would be extremely light-weight but useful, and the current implementation should be almost completed.

Would you be up for maintaining this code?

Sure, first of all, personally I am very willing to keep continuously contributing to Flink project in any way.

And also we used siddhi with distributed streaming system a lot in production, and currently considering to support flink as well under consideration of better state management and window supporting. So I would continuously maintain the code if merged, it not, I would maintain as separated project to make sure it's open sourced and workable as well.

Are you building this as an experiment, or building a production use case based on Siddhi on Flink?

We use siddhi with streaming environment in production a lot, currently supports storm and spark streaming, and also consider extending to Flink.

@StephanEwen

This comment has been minimized.

Copy link
Contributor

commented Sep 21, 2016

That all sounds very good.
I would personally like it if this contribution would enter Flink in some way.

There have been thoughts and discussions one in a while about creating a dedicated sub-projects for libraries/extensions like this, or at least a dedicated repository under Flink. I think this would be a great opportunity to revive those discussions.

Let me start a thread on the mailing list.

@haoch I hope you are okay with waiting for a few days for that discussion to come to a conclusion.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Sep 21, 2016

@StephanEwen sure, that's all ok. :-)

@rmetzger

This comment has been minimized.

Copy link
Contributor

commented Oct 24, 2016

Hi @haoch, thanks alot for this contribution. I'm sorry for the late response.

I recently started moving some of the streaming connectors of Flink to Apache Bahir, a community for extensions to Spark, Flink (and maybe others).
You wrote in an earlier comment:

I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather.

I think Bahir is addressing this issue nicely. So far we added only streaming connectors to Bahir, but I would like to see libraries and other things build on top of Flink there as well.
I'm a committer at Bahir and can help you to get the code in there.
The Bahir repository is located here https://github.com/apache/bahir-flink

By the way, the tests you've added are failing on our CI system. Can you look into it? https://s3.amazonaws.com/archive.travis-ci.org/jobs/166483919/log.txt

@uce

This comment has been minimized.

Copy link
Contributor

commented Feb 20, 2017

@haoch What do you think about Robert's suggestion to move this to Bahir? Seems like a reasonable first step to me.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Feb 22, 2017

@uce sure, will fix it and resend PR to Bahir.

@rmetzger

This comment has been minimized.

Copy link
Contributor

commented Feb 22, 2017

Cool, thank you!

@asdf2014

This comment has been minimized.

Copy link
Member

commented Jun 9, 2017

Hi, @haoch @rmetzger . Why i cannot find flink-siddhi in bahir or bahir-flink? In addition, the flink-siddhi still depence on flink v1.1.2. May i ask how long will i could use these advanced features of Siddhi CEP on flink...

@dianfu

This comment has been minimized.

Copy link
Contributor

commented Jun 9, 2017

@asdf2014 We're currently working on integrating Flink Table & SQL API with CEP. It will add more capacity to the Flink cep library and also makes it more easily to use. An initial design doc can be got here. The prototype and detailed design doc will come out very soon.

@asdf2014

This comment has been minimized.

Copy link
Member

commented Jun 9, 2017

@dianfu Great! Look forward to.

@rmetzger

This comment has been minimized.

Copy link
Contributor

commented Jun 14, 2017

@asdf2014 I think there was never a PR for siddhi at the Bahir project.
But if you are interested, you could work on contributing it to bahir.

@asdf2014

This comment has been minimized.

Copy link
Member

commented Jun 14, 2017

@rmetzger Alright. Thank you for your asking, but i think our company still plan to use Flink-CEP and contributing Bahir-Siddhi feature is a huge job, so... I'm so sorry.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Jun 15, 2017

@asdf2014 sorry for delayed response, will find some time to finalize this PR and propose to Bahir project .

@haoch

This comment has been minimized.

Copy link
Member Author

commented Nov 18, 2017

As mentioned above, moved the patch to Apache Bahir at apache/bahir-flink#22

@rmetzger could you please kindly help look?

@haoch

This comment has been minimized.

Copy link
Member Author

commented Nov 22, 2017

Keep this PR open until apache/bahir-flink#22 merged.

@haoch

This comment has been minimized.

Copy link
Member Author

commented Nov 27, 2017

Finally, the PR merged into apache/bahir-flink#22, thanks everyone for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants
You can’t perform that action at this time.