-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28399][ML][PYTHON] implement RobustScaler #25160
Conversation
Test build #107682 has finished for PR 25160 at commit
|
|
||
/** | ||
* Scale features using statistics that are robust to outliers. | ||
* This Scaler removes the median and scales the data according to the quantile range |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you probably want to clarify what 'scales' means here. You divide through by the IQR?
Also the IQR isn't necessarily 25%-75% because it's configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, after optional remove the median, the features will be divided by the quantile range.
IQR is a special quantile range, from 25% to 75%. But if the lower
/upper
are set to other values, then the range is not IQR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK can you say it's IQR by default but can be configured, something like that?
} | ||
} | ||
|
||
if (localAgg != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be clearer as:
if (localAgg == null) {
Iterator.empty
} else {
...
}
private[spark] def transformDenseWithScale(scale: Array[Double], | ||
values: Array[Double]): Array[Double] = { | ||
var i = 0 | ||
while(i < values.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space after while
} | ||
|
||
|
||
def assertResult: Row => Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
Test build #107770 has finished for PR 25160 at commit
|
Test build #107811 has finished for PR 25160 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking OK, but how about adding to pyspark?
@srowen I am adding it to the pyspark side in this PR. |
Test build #107893 has finished for PR 25160 at commit
|
Test build #107894 has finished for PR 25160 at commit
|
Test build #107895 has finished for PR 25160 at commit
|
agg = Array.fill(vec.size)( | ||
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, 0.001)) | ||
} | ||
require(vec.size == agg.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a meaningful error message here?
if (agg == null) { | ||
Iterator.empty | ||
} else { | ||
var i = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agg.map(_.compress())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to keep current impl, since this can avoid create a tmp array, and should be a little faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the perf gain is quite irrelevant compared to the operations and the objects created in compress...
require(agg1.length == agg2.length) | ||
var i = 0 | ||
while (i < agg1.length) { | ||
agg1(i) = agg1(i).merge(agg2(i)).compress() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you adding compress
here? AFAIK it is not needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added this to confirm compression, I will remove it.
range.toArray.map { v => if (v == 0) 0.0 else 1.0 / v } | ||
} else Array.emptyDoubleArray | ||
|
||
val func = StandardScalerModel.getTransformFunc(shift, scale, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val func = StandardScalerModel.getTransformFunc(
shift, scale, $(withCentering), $(withScaling))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, but is it a convention? It is easy to find similar indent/style other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always saw the above mentioned style. Maybe SQL part is more strict on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am neutral on this, and will follow you advice.
case d: DenseVector => d.values.clone() | ||
case v: Vector => v.toArray | ||
} | ||
val newValues = NewStandardScalerModel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can go on one line
values | ||
} | ||
|
||
private[spark] def transformWithShift(shift: Array[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
values | ||
} | ||
|
||
private[spark] def transformDenseWithScale(scale: Array[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
values | ||
} | ||
|
||
private[spark] def transformSparseWithScale(scale: Array[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
values | ||
} | ||
|
||
private[ml] def getTransformFunc(shift: Array[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #108007 has finished for PR 25160 at commit
|
WDYT @mgaido91 ? |
LGTM, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing: can we add this to ml-features.md? It should be documented.
It would involve copy-pasting another scaler's examples, ideally, to match what we have for other implementations.
@srowen OK, I am going to add the documents. |
Test build #108293 has finished for PR 25160 at commit
|
Test build #108294 has finished for PR 25160 at commit
|
Merged to master |
What changes were proposed in this pull request?
Implement
RobustScaler
Since the transformation is quite similar to
StandardScaler
, I refactor the transform function so that it can be reused in both scalers.How was this patch tested?
existing and added tests