-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DO NOT MERGE #22404
DO NOT MERGE #22404
Conversation
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes apache#22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <irashid@cloudera.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org> (cherry picked from commit 38391c9)
1b4e8eb
to
2b0d10d
Compare
Test build #96000 has finished for PR 22404 at commit
|
Test build #4338 has finished for PR 22404 at commit
|
Test build #96001 has finished for PR 22404 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits -- none of these comments should hold up progress
|
||
/** | ||
* The path of the file, intended only for debug purposes. | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just nits, but you can put the text on the line above in the tag here
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.network.shuffle; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit, I think this can be preceded by a blank line
} | ||
|
||
private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel { | ||
private final WritableByteChannel channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More nits, would put blank lines around this
import org.apache.spark.network.buffer.ManagedBuffer; | ||
import org.apache.spark.network.util.TransportConf; | ||
|
||
import java.io.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we unroll star imports, but not a big deal
try { | ||
return new SimpleDownloadWritableChannel(); | ||
} catch (FileNotFoundException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it bad to just let the method declare that it throws IOException for FileNotFoundException? I know they're checked, but it's more precise
// if you just cast a byte to an int, then anything > 127 is negative, which is interpreted | ||
// as an EOF | ||
val b = into(0) | ||
if (b < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon is this just trying to treat it as an unsigned byte? then just b & 0xFF
?
// if there is encryption, we setup a server which reads the encrypted files, and sends | ||
// the decrypted data to python | ||
val idsAndFiles = broadcastVars.flatMap { broadcast => | ||
if (!oldBids.contains(broadcast.id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: flip the if condition for clarity?
python/pyspark/context.py
Outdated
# this call will block until the server has read all the data and processed it (or | ||
# throws an exception) | ||
r = server.getResult() | ||
return r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, do you want to just return server.getResult()
in cases like this?
thanks Sean, all good points, just updated. |
Test build #96002 has finished for PR 22404 at commit
|
Tested with updates to RowQueueSuite
Covered by tests in DistributedSuite
4f3e29e
to
38dd9d8
Compare
Test build #96041 has finished for PR 22404 at commit
|
just for testing