Skip to content

ARROW-822: [Python] StreamWriter Wrapper for Socket and File-like Objects without tell()#569

Closed
BryanCutler wants to merge 2 commits intoapache:masterfrom
BryanCutler:pyarrow-stream-writer-socket-ARROW-822
Closed

ARROW-822: [Python] StreamWriter Wrapper for Socket and File-like Objects without tell()#569
BryanCutler wants to merge 2 commits intoapache:masterfrom
BryanCutler:pyarrow-stream-writer-socket-ARROW-822

Conversation

@BryanCutler
Copy link
Copy Markdown
Member

Added a wrapper for StreamWriter to implement the required tell() method so that python sockets and file-like objects can be used as sinks. The tell() method will report the position by starting at 0 when the StreamWriter is created and incrementing by number of bytes after each write.

Added unittests that use local socket as the source/sink for streaming.

…d unittest for StreamWriter and StreamReader over local socket
@BryanCutler
Copy link
Copy Markdown
Member Author

@wesm, I just added a simple wrapper to implement tell() which works fine for the case of using a python socket or file-like object from socket.makefile(). I still get the below error when trying to use the socket file, for example sink = os.fdopen(sock.fileno(), "wb", 65536)

terminate called after throwing an instance of 'std::logic_error'
  what():  basic_string::_M_construct null not valid

This is the way Spark sets up sockets, so it would be great to get that fixed here. I'm not sure where exactly this is happening, just somewhere in the StreamWriter constructor, but I'll keep trying to debug it.


def stopClientServer(self):
import struct
self.sink.write(struct.pack('i', 0))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wesm, would it be possible to include the writing of a 0 when the StreamWriter is closed to signal the EOS? I believe the Java implementation does this here https://github.com/apache/arrow/blob/master/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java#L54

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I am not sure if it will be more complicated than writing a 4-byte 0 in https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L623. If you'd rather do that in a separate patch I can take a look after this goes in

if isinstance(sink, socket.socket):
# Use a buffer of 0 to flush after each write, otherwise it is
# possible to close socket without flushing
sink_file = sink.makefile("wb", 0)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was kind of on the fence about automatically wrapping a socket object like this, let me know if you prefer to take this out and maybe just require the application to call makefile instead

if not hasattr(sink, "tell"):
return _StreamSinkWrapper(sink)
else:
return sink
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using this wrapper class, can you instead add int64_t position_ member to PyOutputStream here https://github.com/apache/arrow/blob/master/cpp/src/arrow/python/io.cc#L192, initalized to 0, and return that position_ from Tell instead of calling the file's tell method? Then in PyOutputStream::Write, you would increment the position.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so PyOutputStream would always use this position_ member for Tell or is it still possible to check if the python handle object already definestell() and then make it conditional there? I'm just wondering if there is a sink that implements a valid tell() that we wouldn't want to override?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of one off hand; I will probably take the YAGNI approach on this one and keep things simple (no need to invoke tell() on the Python object)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, np. That will probably fix the other issue with using os.fdopen(sock.fileno(),... too

# NOTE: must start and stop server in test
pass

def startClientServer(self, do_read_all):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use start_client_server here and same naming convention below?


def stopClientServer(self):
import struct
self.sink.write(struct.pack('i', 0))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I am not sure if it will be more complicated than writing a 4-byte 0 in https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L623. If you'd rather do that in a separate patch I can take a look after this goes in

@wesm
Copy link
Copy Markdown
Member

wesm commented Apr 19, 2017

This is the way Spark sets up sockets

Can you paste a standalone repro here (or with this PR) and I can help debug?

@BryanCutler
Copy link
Copy Markdown
Member Author

BryanCutler commented Apr 19, 2017

Ok, that strange error is coming from CheckPyError() because the bytes obj in stringified is NULL, then tries to construct the std::string

49	    std::string message(stringified.bytes);
(gdb) print stringified
$14 = {
  tmp_obj = {
    obj_ = 0x0
  }, 
  bytes = 0x0, 
  size = 2
}

Looks like the real error is

(gdb) print PyErr_Print()
TypeError: expected string or Unicode object, tuple found

after calling
PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()");

Is it taking the empty parenthesis to be a tuple argument?

repro is here
just uncomment out the line to use
sink = os.fdopen(sock.fileno(), "wb", 65536)

@BryanCutler
Copy link
Copy Markdown
Member Author

@wesm I removed the wrapper and made changes to PyOutputStream instead. This fixes both ways of using StreamWriter with file-like objects from a socket - of course you can't use the socket object directly but I think that's fine. I can look into the issue with CheckPyError a bit later.

Copy link
Copy Markdown
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This looks good, thanks @BryanCutler! I will take a look at the other error also

@asfgit asfgit closed this in 6c352e2 Apr 21, 2017
@wesm
Copy link
Copy Markdown
Member

wesm commented Apr 21, 2017

@BryanCutler I can't get the code from ARROW-822 to work either with Python 2.7 or 3.5. Can you paste a reproduction?

In [3]: paste
import pyarrow as pa
import socket, os, io

port = 8080

for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
    af, socktype, proto, canonname, sa = res
    sock = socket.socket(af, socktype, proto)
    try:
        sock.settimeout(3)
        sock.connect(sa)
    except socket.error:
        sock.close()
        sock = None
        continue
        raise
    break

if not sock:
    raise Exception("could not open socket")
## -- End pasted text --
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-3-010c27b951bd> in <module>()
     18 
     19 if not sock:
---> 20     raise Exception("could not open socket")

Exception: could not open socket

@BryanCutler
Copy link
Copy Markdown
Member Author

BryanCutler commented Apr 21, 2017 via email

jeffknupp pushed a commit to jeffknupp/arrow that referenced this pull request Jun 3, 2017
…ects without tell()

Added a wrapper for StreamWriter to implement the required tell() method so that python sockets and file-like objects can be used as sinks.  The tell() method will report the position by starting at 0 when the StreamWriter is created and incrementing by number of bytes after each write.

Added unittests that use local socket as the source/sink for streaming.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes apache#569 from BryanCutler/pyarrow-stream-writer-socket-ARROW-822 and squashes the following commits:

6cdec4f [Bryan Cutler] Removed StreamWriter wrapper and put position handling in PyStreamWriter instead
2bd669f [Bryan Cutler] Added StreamSinkWrapper to ensure stream sink has tell() method, added unittest for StreamWriter and StreamReader over local socket
@BryanCutler BryanCutler deleted the pyarrow-stream-writer-socket-ARROW-822 branch November 7, 2017 23:49
pribor pushed a commit to GlobalWebIndex/arrow that referenced this pull request Oct 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants