Skip to content
This repository has been archived by the owner on Aug 16, 2019. It is now read-only.

Feature Request for 'Streaming Requests' #13

Closed
prairie-guy opened this issue Jan 4, 2015 · 21 comments · Fixed by #14
Closed

Feature Request for 'Streaming Requests' #13

prairie-guy opened this issue Jan 4, 2015 · 21 comments · Fixed by #14

Comments

@prairie-guy
Copy link

'Streaming Requests' is a feature that is implemented by the Python package 'Requests': http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests

It would be great to have the same feature in HTTPClient.jl.

A sample use case would be iterating over streaming API's. For example, a GET request that returns a continuous stream of json objects.

My specific need is being able to monitor a remote sensor that returns continuous voltage readings (in json format) in response to a GET call.

Thanks,

Bryan Daniels

@prairie-guy prairie-guy changed the title Streams as returned values Feature Request for 'Streaming Requests' Jan 4, 2015
@amitmurthy
Copy link
Contributor

The following might work - assuming that get writes to STDOUT.

rd, wr = redirect_stdout()
for ln in eachline(rd)
  .... # do your stuff
end

Note that this does not provide a means for closing the client connection after a specified number of json responses have been processed. It also assumes that that the json s are newline separated.

@prairie-guy
Copy link
Author

Amit,

Thanks. Unfortunately, it doesn’t seem to work. This evening, I will see if playing with various combinations of options might work. I had previously tried redirect_stout() with various assumptions for options. The only thing that I could get to work was to set ostream=“some_file", set total_time=“low_time_out”, catch the time_out_error and read/parse “some_file”. Not very elegant . . .

Thanks,

Bryan

On Jan 5, 2015, at 4:33 AM, Amit Murthy notifications@github.com wrote:

The following might work - assuming that get writes to STDOUT.

rd, wr = redirect_stdout()
for ln in eachline(rd)
.... # do your stuff
end
Note that this does not provide a means for closing the client connection after a specified number of json responses have been processed. It also assumes that that the json s are newline separated.


Reply to this email directly or view it on GitHub #13 (comment).

@prairie-guy
Copy link
Author

I believe the server is using Server Sent Events. Here is a link: https://segment.io/blog/2014-04-03-server-sent-events-the-simplest-realtime-browser-spec/

@amitmurthy
Copy link
Contributor

As part of JuliaLang/julia#9434, an in-memory buffer, BufferStream which behaves like an AsyncStream will be introduced. That should allow you to wait for newline separated responses and process them as and when they arrive.

@prairie-guy
Copy link
Author

Thanks for addition and great responsivness! Appreciate it.

@amitmurthy
Copy link
Contributor

It has been merged in 0.4

If you are using 0.3, you could try defining BufferStream - from https://github.com/JuliaLang/julia/blob/a2edd642979ce44932b1dadcd7575a5e3bd3f0f0/base/stream.jl , into your own code and using it.

I have reproduced it below. Do try it out and let me know if it works.

# BufferStream's are non-OS streams, backed by a regular IOBuffer
type BufferStream <: AsyncStream
    buffer::IOBuffer
    r_c::Condition
    close_c::Condition
    is_open::Bool

    BufferStream() = new(PipeBuffer(), Condition(), Condition(), true)
end

isopen(s::BufferStream) = s.is_open
close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing)

function wait_readnb(s::BufferStream, nb::Int)
    while isopen(s) && nb_available(s.buffer) < nb
        wait(s.r_c)
    end

    (nb_available(s.buffer) < nb) && error("closed BufferStream")
end

function eof(s::BufferStream)
    wait_readnb(s,1)
    !isopen(s) && nb_available(s.buffer)<=0
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)

nb_available(s::BufferStream) = nb_available(s.buffer)

function wait_readbyte(s::BufferStream, c::UInt8)
    while isopen(s) && search(s.buffer,c) <= 0
        wait(s.r_c)
    end
end

wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end
start_reading(s::BufferStream) = nothing

write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv)
write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv)
write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv)

eachline on a BufferStream should work as expected.

@amitmurthy
Copy link
Contributor

Sorry, the close was accidental.

You may have to replace AsyncStream with Base.AsyncStream in the code abobe (if you are trying with Julia 0.3)

@prairie-guy
Copy link
Author

I will upgrade to HEAD and test out this evening. Thanks!

@prairie-guy
Copy link
Author

Clearly, I'm missing something:

  • This works, returning "test\1" as expected:
bs = BufferStream()
write(bs,"test1\n")
write(bs,"test2\n")
readline(bs)
  • This continues to block after HTTPC.get(...):
function input(lb::LittleBit)
    url = "$(lb.url)/input"
    bs = BufferStream()
    options = HTTPClient.HTTPC.RequestOptions(headers=lb.headers,content_type="application/json",ostream=bs)
    r=HTTPClient.HTTPC.get(url,options)
    print(r)  # <== BLOCKING. NOT GETTING HERE
    for ln in eachline(bs)
        print(ln)
    end
end

@amitmurthy
Copy link
Contributor

Try with option blocking=false, by default it is true.

@amitmurthy
Copy link
Contributor

With blocking=false, the response is a RemoteRef, which is irrelevant in your case since the request never actually "finishes" . You should still be able to do a readline on bs since incoming data will be written asynchronously to it anyways.

@prairie-guy
Copy link
Author

Previously I tried blocking=false and also using readline instead of eachline. To be sure, I tried again and get the RemoteRef, but it is still blocking. Moreover, setting ostream=STDOUT returns a continuous stream of json elements, showing the server is 'live'.

# This continues to block after HTTPC.get(...)
function input(lb::LittleBit)
    url = "$(lb.url)/input"
    bs = BufferStream()
    #options = HTTPClient.HTTPC.RequestOptions(headers=lb.headers,content_type="application/json",ostream=STDOUT)
    options = HTTPClient.HTTPC.RequestOptions(headers=lb.headers,content_type="application/json",ostream=bs,blocking=false)
    r=HTTPClient.HTTPC.get(url,options)
    print(r)  # <== BLOCKING. NOT GETTING HERE
    readline(bs)
    ## for ln in eachline(bs)
    ##     print(ln)
    ## end
end


@amitmurthy
Copy link
Contributor

Is it blocking on print(r) or readline(bs) ? With blocking=false, print(r) should just print a RemoteRef.

Also, instead of the readline(bs), can you do a print(readavailable(bs)) and see if any data is available at all?

@amitmurthy
Copy link
Contributor

Also, can you paste some raw server output? Would like to see if the output JSON's are separated only with CRs or are they CR-LF, or just LFs. I can see a problem if they are carriage returns only.

@amitmurthy
Copy link
Contributor

I simulated the same with a dummy streaming server and I can see the issue. Will debug it in a while.

@prairie-guy
Copy link
Author

No luck. With blocking=false, print(r ) just prints RemoteRef. Neither print(read available(bs)) nor just read available(bs) return either. I am running Julia in Emacs, so I tried at a raw terminal, with the same result.

Here is a redacted raw output when using ostream=STDOUT:

data:{"type":"input","timestamp":1421997189623,"from":{"user":{"id":XXXXXXXXXXXXXXXXX},"device":{"id":"XXXXXXXXXXXXXXXXX","device":"littlebits-module-cloud","setup_version":"1.0.0","protocol_version":"1.1.0","firmware_version":"1.0.140611a","mac":"00e04c02df1a","hash":"XXXXXXXXXXXXXXXXX","ap":{"ssid":"XXXXXXXXXXXXXXXXX","mac":"XXXXXXXXXXXXXXXXX","strength":85}},"server":{"id":"XXXXXXXXXXXXXXXXX"}},"percent":0,"absolute":3,"name":"amplitude","payload":{"percent":0,"absolute":3}}

data:{"type":"input","timestamp":1421997190383,"from":{"user":{"id":XXXXXXXXXXXXXXXXX},"device":{"id":"XXXXXXXXXXXXXXXXX","device":"littlebits-module-cloud","setup_version":"1.0.0","protocol_version":"1.1.0","firmware_version":"1.0.140611a","mac":"00e04c02df1a","hash":"XXXXXXXXXXXXXXXXX","ap":{"ssid":"XXXXXXXXXXXXXXXXX","mac":"XXXXXXXXXXXXXXXXX","strength":85}},"server":{"id":"XXXXXXXXXXXXXXXXX"}},"percent":0,"absolute":4,"name":"amplitude","payload":{"percent":0,"absolute":4}}

data:{"type":"input","timestamp":1421997191163,"from":{"user":{"id":XXXXXXXXXXXXXXXXX},"device":{"id":"XXXXXXXXXXXXXXXXX","device":"littlebits-module-cloud","setup_version":"1.0.0","protocol_version":"1.1.0","firmware_version":"1.0.140611a","mac":"00e04c02df1a","hash":"XXXXXXXXXXXXXXXXX","ap":{"ssid":"XXXXXXXXXXXXXXXXX","mac":"XXXXXXXXXXXXXXXXX","strength":85}},"server":{"id":"XXXXXXXXXXXXXXXXX"}},"percent":0,"absolute":3,"name":"amplitude","payload":{"percent":0,"absolute":3}}

@prairie-guy
Copy link
Author

Amit - Thanks for working on this. I am way out of my league. . .

@amitmurthy
Copy link
Contributor

I found the issue. While I put out a patch, you can try with the following workaround:

function input(lb::LittleBit)
    url = "$(lb.url)/input"
    bs = BufferStream()
    options = HTTPClient.HTTPC.RequestOptions(headers=lb.headers,content_type="application/json",ostream=bs)
    @schedule HTTPClient.HTTPC.get(url,options)
    for ln in eachline(bs)
        print(ln)
    end
end

Note the @schedule in front of the get call and not setting blocking option, i.e., we are running with blocking=true (the default) in a separate task.

@prairie-guy
Copy link
Author

Success! Adding @schedule and removing blocking=false fixed it. I will look for patch. What code will you be patching?

@amitmurthy
Copy link
Contributor

Patched. The problem was in HTTPC.jl, line number 243, where a deepcopy of RequestOptions caused a new BufferStream to be created.

@prairie-guy
Copy link
Author

Patch checks out for me. Thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants