-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-26205] Support Online Model Save in FlinkML #60
Conversation
a9d1831
to
fe7d882
Compare
Thanks for the PR! I think it's a crucial feature for Flink ML. I suppose this PR also wants to solve the problem raised in this email. In this email an exception was thrown when an unbounded stream was fed to an Estimator, but the test cases introduced by this PR have only covered the bounded situations. It might be better if we could add test cases that corresponds to the conditions described in the email. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments below.
flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java
Outdated
Show resolved
Hide resolved
public static class ModelVersionAssigner<T> extends BasePathBucketAssigner<T> { | ||
@Override | ||
public String getBucketId(T element, Context context) { | ||
return String.valueOf(System.nanoTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make modelVersionAssiginer
independent of execution time?
If model data contains multiple streams and we use the current version assigner(with timestamp as the version), we may not be able to associate the model data from different streams.
modelPath = new org.apache.flink.core.fs.Path(path + "/" + fileName); | ||
} | ||
} | ||
Source<T, ?, ?> source = FileSource.forRecordStreamFormat(modelDecoder, modelPath).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still use ".../data/" as the default model data path?
If there is a directory that is not ".../data/", can the test case still work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will refine it.
} | ||
|
||
/** | ||
* Loads the model data from the given path which has more than one model. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the java doc and explain why do we need this function here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will update the doc. This function can get a model data with special model version from the path which has more than one model version.
|
||
@Test | ||
public void saveAndLoadOnlineModel() throws Exception { | ||
Configuration config = new Configuration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to extract the common logic here in Before
function, same as we did in other test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
||
/* Loads every LogisticRegression model in model path and validates it. */ | ||
String modelVersion; | ||
while ((modelVersion = bufferedReader.readLine()) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also hava a test case that loads all of the model data in a single data stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
tmpPath, | ||
new LogisticRegressionModelData.ModelDataDecoder(), | ||
modelVersion)) | ||
.as("label, vec"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you convert model data as label, vec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validated data has feature(vec) and label. I will check the prediction result agree with given label.
String modelVersion; | ||
while ((modelVersion = bufferedReader.readLine()) != null) { | ||
if (!"metadata".equals(modelVersion)) { | ||
LogisticRegressionModel lrModel = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: LogisticRegressionModel lrModel =..
could be outside of the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
4bfa4d3
to
ca209d3
Compare
293dce8
to
1b8a183
Compare
Closing for now. We can open it later if needed. |
What is the purpose of the change
Support Online Model Save in FlinkML.
Brief change log
Add code of online model save.
Does this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): (no)
The public API, i.e., is any changed class annotated with @public(Evolving): (no)
Does this pull request introduce a new feature? (no)
If yes, how is the feature documented? (Java doc)