New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24555][ML] logNumExamples in KMeans/BiKM/GMM/AFT/NB #21561
Conversation
Test build #91823 has finished for PR 21561 at commit
|
Test build #93864 has finished for PR 21561 at commit
|
96e8425
to
2e48282
Compare
Test build #93865 has finished for PR 21561 at commit
|
Test build #93866 has finished for PR 21561 at commit
|
// Aggregates term frequencies per label. | ||
// TODO: Calling aggregateByKey and collect creates two stages, we can implement something | ||
// TODO: similar to reduceByKeyLocally to save one stage. | ||
val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd | ||
.map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) | ||
.map { row => | ||
countAccum.add(1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed to work correctly, given that this is in a map operation? wondering if this introduces a correctness issue or whether this number is available elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should work correctly, however, to guarantee the correctness, I update the pr to compute the number without Accumulator
def run(input: RDD[Vector]): BisectingKMeansModel = { | ||
|
||
private[spark] def run(input: RDD[Vector], | ||
instr: Option[Instrumentation]): BisectingKMeansModel = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elsewhere I see the instrumentation made available with "insrumented" -- is this different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instrumented
will create a new Instrumentation
, and instrumented
is only used in ml
When mllib's impls is called, the Instrumentation
will be passed as a parameters, like what KMeans does (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala#L362).
Test build #94669 has finished for PR 21561 at commit
|
def run(input: RDD[Vector]): BisectingKMeansModel = { | ||
|
||
private[spark] def run(input: RDD[Vector], | ||
instr: Option[Instrumentation]): BisectingKMeansModel = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
fb3ff2b
to
5f403fa
Compare
Test build #94718 has finished for PR 21561 at commit
|
@@ -299,7 +299,7 @@ class KMeans private ( | |||
val bcCenters = sc.broadcast(centers) | |||
|
|||
// Find the new centers | |||
val newCenters = data.mapPartitions { points => | |||
val collected = data.mapPartitions { points => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we find a better name than collected
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am neutral on this.
}.collectAsMap() | ||
|
||
if (iteration == 0) { | ||
val numSamples = collected.values.map(_._2).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about moving this in the foreach
, so it is computed only id needed?
@@ -171,6 +169,8 @@ class BisectingKMeans private ( | |||
val vectors = input.zip(norms).map { case (x, norm) => new VectorWithNorm(x, norm) } | |||
var assignments = vectors.map(v => (ROOT_INDEX, v)) | |||
var activeClusters = summarize(d, assignments, dMeasure) | |||
val numSamples = activeClusters.values.map(_.size).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
5f403fa
to
ecab85c
Compare
Test build #94781 has finished for PR 21561 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM apart from the two comments still to address (naming and extra newline)
*/ | ||
@Since("1.6.0") | ||
def run(input: RDD[Vector]): BisectingKMeansModel = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra newline
* @param input RDD of vectors | ||
* @return model for the bisecting kmeans | ||
*/ | ||
@Since("1.6.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this should be since 2.4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this api was already existing since 1.6.0, so we should keep the since annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You couldn't call BisectingKMeans.run(...)
before this, right? it wasn't in a superclass or anything. In that sense I think this method needs to be marked as new as of 2.4.0, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def run(input: RDD[Vector]): BisectingKMeansModel
is a public api since 1.6, and users can call it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right I get it now, this isn't a new method, it's 'replacing' the definition above. 👍
Merged to master |
What changes were proposed in this pull request?
logNumExamples in KMeans/BiKM/GMM/AFT/NB
How was this patch tested?
existing tests