Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] Can't use RecordBatchStreamWriter with Socket connection #38828

Closed
pinduzera opened this issue Nov 21, 2023 · 2 comments · Fixed by #38897
Closed

[R] Can't use RecordBatchStreamWriter with Socket connection #38828

pinduzera opened this issue Nov 21, 2023 · 2 comments · Fixed by #38897
Assignees
Milestone

Comments

@pinduzera
Copy link

pinduzera commented Nov 21, 2023

Not sure if there is anything that can be done about it, seems to be an R limitation regarding socket connection and seek().

Let's create an R session to listen to a socket connection (this is just a simulation, can be any other language reading a socket).

library(arrow)

server <- function() {
  while (TRUE) {
    writeLines("Listening...")
    con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                            server = TRUE, open = "r+b")
    socketTimeout(con, 3600)
    
    data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
    print(head(as.data.frame(data)))

    }
}

server()

Now, in another session lets send a stream:

library(arrow)
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                 port = "6011",
                 blocking = TRUE,
                 server = FALSE,
                 open = "w+b")

outputStream <- arrow:::make_output_stream(socketDriver) ## couldn't find a better alternative as well

writer <- arrow::RecordBatchStreamWriter$create(outputStream, rb$schema)

writer$write_batch(rb) # ideally would loop, but can't even write a single batch
Error in seek.connection(12L) : 'seek' not enabled for this connection

If I write all at once it works, but then it defeats the purpose. The idea is to write in batches to avoid reading big tables all at once:

library(arrow)
socketDriver <- socketConnection(host = "localhost", 
                            port = "6011",
                            blocking = TRUE,
                            server = FALSE,
                            open = "w+b")

rawTbl <- arrow::write_to_raw(arrow::as_arrow_table(x = iris),
                    format = "stream")
writeBin(rawTbl, socketDriver )

Component(s)

R

@paleolimbot
Copy link
Member

Thanks for bumping this! It has come up a few times and should be fixed...I forget exactly why this happens but I think it's just a matter of better logic for creating an OutputStream from an R connection.

@paleolimbot
Copy link
Member

I'll work on the PR that enables this usage more directly, but I'd like to point out that if you don't use dictionary encoding (i.e., no factors in your input data frame), you can serialize the schema and batches directly and send them to the connection. This might even be faster than doing it directly from Arrow (since Arrow C++ under the hood is just calling writeBin() anyway):

tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

# Have to pick something without factors for this to work currently:
# dictionary encoding doesn't quite work without the stream writer
rb <- arrow::record_batch(nycflights13::flights)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

reader <- as_record_batch_reader(rb)

writeBin(reader$schema$serialize(), socketDriver)
while (!is.null(batch <- reader$read_next_batch())) {
  writeBin(batch$serialize(), socketDriver)
}

# Write end-of-stream
writeBin(as.raw(c(0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00)), socketDriver)

close(socketDriver)

Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013     1   1      517            515         2      830            819
#> 2 2013     1   1      533            529         4      850            830
#> 3 2013     1   1      542            540         2      923            850
#> 4 2013     1   1      544            545        -1     1004           1022
#> 5 2013     1   1      554            600        -6      812            837
#> 6 2013     1   1      554            558        -4      740            728
#>   arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1        11      UA   1545  N14228    EWR  IAH      227     1400    5     15
#> 2        20      UA   1714  N24211    LGA  IAH      227     1416    5     29
#> 3        33      AA   1141  N619AA    JFK  MIA      160     1089    5     40
#> 4       -18      B6    725  N804JB    JFK  BQN      183     1576    5     45
#> 5       -25      DL    461  N668DN    LGA  ATL      116      762    6      0
#> 6        12      UA   1696  N39463    EWR  ORD      150      719    5     58
#>             time_hour
#> 1 2013-01-01 05:00:00
#> 2 2013-01-01 05:00:00
#> 3 2013-01-01 05:00:00
#> 4 2013-01-01 05:00:00
#> 5 2013-01-01 06:00:00
#> 6 2013-01-01 05:00:00
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE

Created on 2023-11-27 with reprex v2.0.2

paleolimbot added a commit that referenced this issue Apr 3, 2024
#38897)

### Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

### What changes are included in this PR?

Implements `Tell()` for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

### Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

### Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```

<sup>Created on 2023-11-27 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: #38828
* GitHub Issue: #38828

Authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Dewey Dunnington <dewey@voltrondata.com>
@paleolimbot paleolimbot added this to the 16.0.0 milestone Apr 3, 2024
tolleybot pushed a commit to tmct/arrow that referenced this issue May 2, 2024
…ections (apache#38897)

### Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

### What changes are included in this PR?

Implements `Tell()` for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

### Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

### Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```

<sup>Created on 2023-11-27 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: apache#38828
* GitHub Issue: apache#38828

Authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Dewey Dunnington <dewey@voltrondata.com>
vibhatha pushed a commit to vibhatha/arrow that referenced this issue May 25, 2024
…ections (apache#38897)

### Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

### What changes are included in this PR?

Implements `Tell()` for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

### Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

### Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```

<sup>Created on 2023-11-27 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: apache#38828
* GitHub Issue: apache#38828

Authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Dewey Dunnington <dewey@voltrondata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants