-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2223: PMMLBolt #1816
STORM-2223: PMMLBolt #1816
Conversation
hmcl
commented
Dec 7, 2016
- PMML Bolt supporting pluggable runners
- JPMML runner implementation
- Test Topology
hi @hmcl Maybe the licenses between Apache Storm and JPMML(licensed under AGPLv3) are incompatible.
|
pmml-evaluator is AGPLv3. http://search.maven.org/#artifactdetails%7Corg.jpmml%7Cpmml-evaluator%7C1.3.3%7Cjar pmml-model and pmml-schema are, at least pom file license description, BSD. But what's the difference between jpmml and pmml on artifact prefix, and why they share the group name? |
@vesense @HeartSaVioR this was discussed in the JIRA - please take a look at the discussion thread. I believe that the JPMML license (the legacy code) is OK as discussed there. |
@hmcl @vesense |
@hmcl @HeartSaVioR Got it. Thanks. |
@vesense there is no alternate option for PMML in java. This is atleast give us a good start . regarding the jpmml-storm, I reached out the developer in his words he said that just an experiment code and not really prod quality and they have no interest in contributing to the storm. |
@harshach Thanks for your detailed explanations. Yes, this is a good start. I'm +1 for adding this in. This might attract many people who interest NLP, ML. |
|
||
import java.util.List; | ||
|
||
public interface ModelRunner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since bolt can be executed in distributed environment, ModelRunner
interface should extends Serializable
.
@hmcl Code looks good. Left one comment. I think it is also necessary to add a README file to let people know how to use it. |
6d6b712
to
da61a8f
Compare
@vesense I addressed your serialization comment. I had to do some refactoring of the code because it was a strong requirement to enforce @harshach @HeartSaVioR can you please take a look as well. |
Looks like a good start, but it really needs some documentation. It would also be helpful to include a sample model + CSV data, without that it's not very clear how to run the example. |
Also, what about unit tests? |
/** | ||
* Creates a new {@link PMML} object representing the PMML model defined in the XML {@link File} specified as argument | ||
*/ | ||
public static PMML newPmml(File file) throws JAXBException, SAXException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Consider adding support for getting newPmml from an input stream as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also be very useful to be able to load/update models from the blob store so users don't have to redeploy a topology to update their model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptgoetz I created STORM-2240 to address this as follow up
@hmcl I am +1 on merging. I would like to see the InputStream option added to it. |
@hmcl you can add me as sponsor to the module. |
rawInputs.put(activeField, tuple.getValueByField(activeField.getValue())); | ||
} | ||
LOG.debug("Raw inputs = [{}]", rawInputs); | ||
return rawInputs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the streams being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. removed it.
} | ||
|
||
|
||
public static class ModelRunner implements ModelRunnerFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming, Having two ModelRunner artifacts (Class here) and the interface might be confusing.
25d4464
to
0b78876
Compare
@csivaguru @ptgoetz @harshach I uploaded a reviewed version addressing your comments. Can you please take a look. thanks. |
+1 |
1 similar comment
+1 |
|
||
#Run Bundled Examples | ||
|
||
To run the examples you must copy the `storm-pmml` uber jar to `STORM-HOME/extlib` and then run the command: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't the example just be a self contained shaded jar that is ready for deployment? It seems odd to require users to modify the pom and copy jars into extlib to run the example. Most of the other examples are deployable jars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptgoetz do you mean that you would like to have an uber jar like storm-pmml-examples-2.0.0-SNAPSHOT.jar under external/storm-pmml such that the users could just do
storm jar storm-pmml-examples-2.0.0-SNAPSHOT.jar org.apache.storm.pmm.TopologyMain` and run it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptgoetz users can now just run the examples Jar
@@ -0,0 +1,259 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know what the license of this file is? It wasn't clear to me from the dmg.org website.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptgoetz I found PMML Notice and License.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, it's BSD 3-clause. So there needs to be an entry in both source and binary distribution LICENSE files per this guide: http://www.apache.org/dev/licensing-howto.html#permissive-deps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE
@@ -0,0 +1,51 @@ | |||
ID,Age,Employment,Education,Marital,Occupation,Income,Gender,Deductions,Hours,IGNORE_Accounts,RISK_Adjustment,TARGET_Adjusted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also interested in what the license of the data set is.
## Committer Sponsors | ||
* Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) | ||
|
||
This general abstraction has the purpose of supporting arbitrary implementations that compute predicted scores from raw inputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: This seems more appropriate for the introduction section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I forgot to clean this up. This info is already mentioned in the introduction section in slight different way, but with the same exact meaning. Will delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through at a high level and left some comments.
|
||
private File loadExample(File file, String example) { | ||
try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) { | ||
file = File.createTempFile("/tmp/" + example, ".tmp"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first argument is a prefix of the file name not a directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Override | ||
public void activate() { | ||
openReader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems every time the spout is deactivated/activated it starts reading the file from the beginning which may not be desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -39,7 +39,6 @@ | |||
import java.io.InputStreamReader; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import java.util.concurrent.TimeUnit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated file KafkaSpoutTopologyMainNamedTopics.java updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public class PMMLPredictorBolt extends BaseRichBolt { | ||
protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class); | ||
|
||
private ModelOutputFields outFields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
protected static final Logger LOG = LoggerFactory.getLogger(PMMLPredictorBolt.class); | ||
|
||
private ModelOutputFields outFields; | ||
private ModelRunnerFactory runnerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Override | ||
public void declareOutputFields(OutputFieldsDeclarer declarer) { | ||
if (outFields != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the execute
it seems the results are always emitted, wont it fail when outFields==null
?
Also how are you ensuring that the streams declared in declareStream
here are the ones returned by runner.scoredTuplePerStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have documented this. The bottom line is that only the user knows which scores he wants to emit on each stream. If she/he chooses to emit on a stream that does not exist, that's a user error.
Also fixed outFields==null
/** | ||
* Represents a stream emitted by a Storm component | ||
*/ | ||
public class Stream implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the Stream
just wraps the stream name and the direct flag. However in the PMMLPredictorBolt emitDirect
is not used at all. If all you want is to capture the stream name where the tuples have to be emitted just use String instead of wrapping it in another class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It's a String now.
for (Map.Entry<? extends Stream, ? extends Fields> of : outFields.toMap().entrySet()) { | ||
final Stream stream = of.getKey(); | ||
final Fields fields = of.getValue(); | ||
declarer.declareStream(stream.getId(), stream.isDirect(), fields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesnt make sense to declare stream as direct unless you intend to emit directly to specific tasks via emitDirect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
final Map<Stream, List<Object>> valuesMap = new HashMap<>(); | ||
|
||
for (Map.Entry<? extends Stream, ? extends Fields> entry : modelOutputFields.toMap().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use keySet. It may be better if ModelOuputFields itself has a method to returns the keys, right now its more of map wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Added a method to return all the streams, which is what we need here.
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
public class JpmmlRunnerTestTopology { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some short description of what this topology does/predicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@ptgoetz @arunmahadevan @harshach Thanks for your reviews. I believe I addressed all the comments. There is a separate commit with the comments changes. When we all agree that the patch is ready to merge, I will squash the commits to keep the log clean. Thanks. |
+1 the changes looks good. After unzipping the distribution, I am not able to find the examples jar file under examples/storm-pmml-examples. Its under external/storm-pmml. |
2fa294e
to
39039d2
Compare
@arunmahadevan done! |
+1 |
@hmcl can you upmerge this. |
@hmcl Also can you open a PR against 1.x-branch as well. |
3388be5
to
737f314
Compare
<include>storm*jar</include> | ||
</includes> | ||
</fileSet> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want the sample model and data to be included in the distribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptgoetz done
@hmcl Do you plan to add unit tests for this? |
737f314
to
87941a9
Compare
- PMML Bolt supporting pluggable runners and configurable stream output fields - JPMML runner implementation - JPMML Prediction Test Topology - README - LICENSE information
87941a9
to
c85cc19
Compare
@ptgoetz As for unit tests, I have filed this JIRA to assert for edge cases and some common cases. |
@hmcl can you open a PR for 1.x-branch as well. I would like to merge this in today. |