Skip to content

Commit

Permalink
Update ReceiverTracker.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkamravi2 committed Jun 5, 2015
1 parent 6e3515c commit 02dbdb8
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,30 +291,39 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// so as to distribute them evenly over executors in a round-robin fashion
// If num_executors > num_receivers, distribute executors among receivers
val locations = new Array[ArrayBuffer[String]](receivers.length)
var roundRobin = false
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
val executors = getExecutors(ssc)
var i = 0
for (i <- 0 to (receivers.length - 1)) {
locations(i) = new ArrayBuffer[String]()
}
if (receivers.length >= executors.length) {
if (!executors.isEmpty) {
for (i <- 0 to (receivers.length - 1)) {
locations(i) += executors(i % executors.length)
locations(i) = new ArrayBuffer[String]()
}
} else {
for (i <- 0 to (executors.length - 1)) {
locations(i % receivers.length) += executors(i)
if (receivers.length >= executors.length) {
for (i <- 0 to (receivers.length - 1)) {
locations(i) += executors(i % executors.length)
System.out.println("RECEIVERS > executors " + executors(i % executors.length))
}
} else {
for (i <- 0 to (executors.length - 1)) {
locations(i % receivers.length) += executors(i)
System.out.println("EXECUTORS > receivers " + executors(i))
}
}
roundRobin = true
}
}

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
} else if (roundRobin) {
// ssc.sc.makeRDD(receivers, receivers.size)
val roundRobinReceivers = (0 to (receivers.length - 1)).map(i => (receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
tempRDD
}
Expand Down

0 comments on commit 02dbdb8

Please sign in to comment.