Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arrow as a backend for speedup of around 10-20x for PLP and other Hades packages #35

Open
egillax opened this issue Apr 19, 2022 · 117 comments
Assignees
Milestone

Comments

@egillax
Copy link
Contributor

egillax commented Apr 19, 2022

Hello everyone,

Recently I've been experimenting with using apache arrow for out of memory computation instead of andromeda with rsqlite. The speedups I'm getting are quite significant. Arrow can use dplyr verbs so the code changes required are minimal. I've already implemented it for a regular PLP workflow and tested using simulated data (using simulatePlpData) for 10e3, 100e3 and 1e6 cohort sizes (my fork of PLP is here and of Cyclops here). There's an example script in extras/testArrow.R

Following numbers are with all default settings and Lasso, I used the same simulated dataset for each comparison and same splitSeed.

10e3 - Normal PLP: 6.7 minutes, Arrow PLP: 21.5 seconds - a speedup of 19x

100e3 - Normal PLP: 58.5 minutes, Arrow PLP: 5.2 minutes - a speedup of 11x

1e6 - Normal PLP: 18.9 hours, Arrow PLP: 2.5 hours - a speedup of 7.6

My suggestion would be to add arrow as a backend. It should be quite simple. The function I used to create the arrow dataset is defined here. I also have a function to convert andromeda tables to arrow here.

I also saw that there has been some duckdb experimentation/discussion here. With adding arrow you'd get duckdb for free since there is tight integration and near copyless conversion between the two formats. So for example you can do something like:

arrowDataset %>% dplyr::stuff() %>% arrow::toduck_db() %>% duckdb::stuff() %>% arrow::to_arrow()

I've not tested this though since everything I had do do was already available in arrow.

Tagging various people for visibility and people I have discussed this with at some point.

@ablack3 @schuemie @jreps @rekkasa @tomseinen @lhjohn

Regards
Egill Fridgeirsson

@lhjohn
Copy link

lhjohn commented Apr 19, 2022

Arrow sounds great! I have been struggling with the speed of our current implementation at times, especially with massive data sets.

@jreps
Copy link

jreps commented Apr 19, 2022

+1 from me - sqlite is very slow, it would be great to have data manipulation done ~10x-20x faster!

@ablack3
Copy link
Collaborator

ablack3 commented Apr 19, 2022

This sounds great. I'll work on it for the next release.

@msuchard
Copy link
Member

@egillax -- whether or not we change the backend of Andromeda, pretty please make a pull-request to handle the Arrow tables in Cyclops. It's a primary aim of that package to provide highest-performance standards.

@schuemie
Copy link
Member

Arrow was considered when we were looking to replace ff, but at the time I found it to be unstable. It also appears to keep all the data in memory, which is the problem we're trying to avoid with Andromeda.

@egillax
Copy link
Contributor Author

egillax commented Apr 20, 2022

Hi @schuemie,

It possible was unstable at that time, version 1.0 didn´t come until july 2020. I've not found it unstable and it seems to have been developing fast since then. It´s never crashed on me like it seems to have done for you in 2020.

Just to clarify, all the above timings are using arrow datasets which are for working with file base datasets without loading them in memory. This is also something that appears to not have been around in 2020. This is all done using dplyr verbs, which they have been adding support for fast in the last year. Aggregations were only added recently.

@egillax
Copy link
Contributor Author

egillax commented Apr 20, 2022

A little more data. I ran three version of tidyCovariates from FeatureExtraction. The arrow implementation from my plp fork, an in memory version I made and the original version which uses andromeda with rqslite. I benchmarked it using bench which measures the memory allocation as well. I used simulated data with 10000 subjects.

benchmark <- bench::mark(tidyCovariateDataArrow(plpData$covariateData),
                         tidyCovariateDataMemory(memoryData),
                         FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData), 
                         check = FALSE, filter_gc = FALSE)

# A tibble: 3 × 13
  expression                                                            min   median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time result memory                  time       gc      
  <bch:expr>                                                       <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm> <list> <list>                  <list>     <list>  
1 tidyCovariateDataArrow(plpData$covariateData)                       985ms    985ms    1.02     27.88MB   1.02       1     1      985ms <NULL> <Rprofmem [4,547 × 3]>  <bench_tm> <tibble>
2 tidyCovariateDataMemory(memoryData)                                  4.5s     4.5s    0.222     2.47GB   2.89       1    13       4.5s <NULL> <Rprofmem>              <bench_tm> <tibble>
3 FeatureExtraction::tidyCovariateData(plpAndromeda$covariateData)    31.2s    31.2s    0.0321   505.3MB   0.0642     1     2      31.2s <NULL> <Rprofmem [12,739 × 3]> <bench_tm> <tibble>

As can be seen the arrow implementation is faster than the in-memory version, and uses the least memory of all the approaches. Another interesting thing I hadn't mentioned before is that the file sizes of the arrow dataset is smaller, in this case 253MB vs 577MB for rsqlite.

@schuemie
Copy link
Member

Cool! That does look promising.

@ablack3
Copy link
Collaborator

ablack3 commented Apr 27, 2022

Here are some more experiments with the arrow functionality. I think this could work. I guess the directory structure would be something like

AndromedaTempFolder/AndromedaObjectFolder/Dataset/file.feather
So we would create one folder for each table in the Andromeda object which could contain partitions or a single file.

We'd be trading SQL for the dplyr implemented in arrow which would likely be more limited. What would be helpful for me is to better understand the operations/functions Andromeda needs to support (e.g. datediff?) Is there anything in SQL that is not available in arrow that we need?

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(nycflights13)

write_dataset(airlines, "airlines", format = "feather")
write_dataset(flights, "flights", format = "feather")

andr <- list()
andr$airlines <- open_dataset("airlines", format = "feather")
andr$flights <- open_dataset("flights", format = "feather")

# count rows
andr$flights %>% 
  tally() %>% 
  collect()
#> # A tibble: 1 × 1
#>        n
#>    <int>
#> 1 336776

# head
andr$flights %>% 
  head(10) %>% 
  collect()
#> # A tibble: 10 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>, time_hour <dttm>

# joins
andr$flights %>% 
  inner_join(andr$airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 336,776 × 20
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> #   name <chr>

# joins with dataframe in R
andr$flights %>% 
  inner_join(airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 336,776 × 20
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # … with 336,766 more rows, and 12 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>,
#> #   name <chr>

# I'm kind of surprised this works

# filtering joins with data in R
andr$flights %>% 
  semi_join(head(airlines, 1), by = "carrier") %>% 
  collect()
#> # A tibble: 18,460 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      810            810         0     1048           1037
#>  2  2013     1     1     1451           1500        -9     1634           1636
#>  3  2013     1     1     1452           1455        -3     1637           1639
#>  4  2013     1     1     1454           1500        -6     1635           1636
#>  5  2013     1     1     1507           1515        -8     1651           1656
#>  6  2013     1     1     1530           1530         0     1650           1655
#>  7  2013     1     1     1546           1540         6     1753           1748
#>  8  2013     1     1     1550           1550         0     1844           1831
#>  9  2013     1     1     1552           1600        -8     1749           1757
#> 10  2013     1     1     1554           1600        -6     1701           1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# filtering join with data in arrow
andr$flights %>% 
  semi_join(head(andr$airlines, 1), by = "carrier") %>% 
  collect()
#> # A tibble: 18,460 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      810            810         0     1048           1037
#>  2  2013     1     1     1451           1500        -9     1634           1636
#>  3  2013     1     1     1452           1455        -3     1637           1639
#>  4  2013     1     1     1454           1500        -6     1635           1636
#>  5  2013     1     1     1507           1515        -8     1651           1656
#>  6  2013     1     1     1530           1530         0     1650           1655
#>  7  2013     1     1     1546           1540         6     1753           1748
#>  8  2013     1     1     1550           1550         0     1844           1831
#>  9  2013     1     1     1552           1600        -8     1749           1757
#> 10  2013     1     1     1554           1600        -6     1701           1734
#> # … with 18,450 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# filter using data in arrow (not supported)
andr$flights %>% 
  filter(carrier %in% andr$airlines$carrier[1:3]) %>% 
  collect()
#> Error: Filter expression not supported for Arrow Datasets: carrier %in% andr$airlines$carrier[1:3]
#> Call collect() first to pull data into R.

# filter using values in R
andr$flights %>% 
  filter(carrier %in% airlines$carrier[1:3]) %>% 
  collect()
#> # A tibble: 51,903 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      542            540         2      923            850
#>  2  2013     1     1      558            600        -2      753            745
#>  3  2013     1     1      559            600        -1      941            910
#>  4  2013     1     1      606            610        -4      858            910
#>  5  2013     1     1      623            610        13      920            915
#>  6  2013     1     1      628            630        -2     1137           1140
#>  7  2013     1     1      629            630        -1      824            810
#>  8  2013     1     1      635            635         0     1028            940
#>  9  2013     1     1      656            700        -4      854            850
#> 10  2013     1     1      656            659        -3      949            959
#> # … with 51,893 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

# sum
andr$flights %>% 
  count(carrier) %>% 
  inner_join(andr$airlines, by = "carrier") %>% 
  mutate(number_of_flights = n) %>% 
  select(name, number_of_flights) %>% 
  collect()
#> # A tibble: 16 × 2
#>    name                        number_of_flights
#>    <chr>                                   <int>
#>  1 United Air Lines Inc.                   58665
#>  2 American Airlines Inc.                  32729
#>  3 JetBlue Airways                         54635
#>  4 Delta Air Lines Inc.                    48110
#>  5 ExpressJet Airlines Inc.                54173
#>  6 Envoy Air                               26397
#>  7 US Airways Inc.                         20536
#>  8 Southwest Airlines Co.                  12275
#>  9 Virgin America                           5162
#> 10 AirTran Airways Corporation              3260
#> 11 Alaska Airlines Inc.                      714
#> 12 Endeavor Air Inc.                       18460
#> 13 Frontier Airlines Inc.                    685
#> 14 Hawaiian Airlines Inc.                    342
#> 15 Mesa Airlines Inc.                        601
#> 16 SkyWest Airlines Inc.                      32

# distinct
andr$flights %>% 
  distinct(carrier) %>% 
  left_join(andr$airlines, by = "carrier") %>% 
  collect()
#> # A tibble: 16 × 2
#>    carrier name                       
#>    <chr>   <chr>                      
#>  1 UA      United Air Lines Inc.      
#>  2 AA      American Airlines Inc.     
#>  3 B6      JetBlue Airways            
#>  4 DL      Delta Air Lines Inc.       
#>  5 EV      ExpressJet Airlines Inc.   
#>  6 MQ      Envoy Air                  
#>  7 US      US Airways Inc.            
#>  8 WN      Southwest Airlines Co.     
#>  9 VX      Virgin America             
#> 10 FL      AirTran Airways Corporation
#> 11 AS      Alaska Airlines Inc.       
#> 12 9E      Endeavor Air Inc.          
#> 13 F9      Frontier Airlines Inc.     
#> 14 HA      Hawaiian Airlines Inc.     
#> 15 YV      Mesa Airlines Inc.         
#> 16 OO      SkyWest Airlines Inc.

# mean, min, max
andr$flights %>% 
  group_by(carrier) %>% 
  summarise(mean_delay = mean(dep_delay, na.rm = TRUE),
            min_delay = min(dep_delay, na.rm = TRUE),
            max_delay = max(dep_delay, na.rm = TRUE)) %>% 
  left_join(andr$airlines, by = "carrier") %>% 
  select(name, mean_delay, min_delay, max_delay) %>% 
  arrange(desc(mean_delay)) %>% 
  collect()
#> # A tibble: 16 × 4
#>    name                        mean_delay min_delay max_delay
#>    <chr>                            <dbl>     <dbl>     <dbl>
#>  1 Frontier Airlines Inc.           20.2        -27       853
#>  2 ExpressJet Airlines Inc.         20.0        -32       548
#>  3 Mesa Airlines Inc.               19.0        -16       387
#>  4 AirTran Airways Corporation      18.7        -22       602
#>  5 Southwest Airlines Co.           17.7        -13       471
#>  6 Endeavor Air Inc.                16.7        -24       747
#>  7 JetBlue Airways                  13.0        -43       502
#>  8 Virgin America                   12.9        -20       653
#>  9 SkyWest Airlines Inc.            12.6        -14       154
#> 10 United Air Lines Inc.            12.1        -20       483
#> 11 Envoy Air                        10.6        -26      1137
#> 12 Delta Air Lines Inc.              9.26       -33       960
#> 13 American Airlines Inc.            8.59       -24      1014
#> 14 Alaska Airlines Inc.              5.80       -21       225
#> 15 Hawaiian Airlines Inc.            4.90       -16      1301
#> 16 US Airways Inc.                   3.78       -19       500

# date functions
library(wakefield)
#> 
#> Attaching package: 'wakefield'
#> The following object is masked from 'package:dplyr':
#> 
#>     id
#> The following object is masked from 'package:arrow':
#> 
#>     string
date_df <- tibble::tibble(date1 = date_stamp(1e5, T), date2 = date_stamp(1e5, T))
write_dataset(date_df, "date_df", format = "feather")
andr$date_df <- open_dataset("date_df", format = "feather")

# date types are preserved
andr$date_df %>% 
  collect()
#> # A tibble: 100,000 × 2
#>    date1      date2     
#>  * <date>     <date>    
#>  1 2022-01-27 2021-08-27
#>  2 2021-05-27 2021-07-27
#>  3 2021-05-27 2021-11-27
#>  4 2021-10-27 2022-04-27
#>  5 2021-05-27 2021-07-27
#>  6 2021-10-27 2021-07-27
#>  7 2022-02-27 2021-06-27
#>  8 2021-11-27 2021-10-27
#>  9 2022-01-27 2022-01-27
#> 10 2022-03-27 2022-01-27
#> # … with 99,990 more rows

# date difference does not work
andr$date_df %>% 
  mutate(date_diff = date2 - date1) %>% 
  collect()
#> Error in `handle_csv_read_error()`:
#> ! NotImplemented: Function 'subtract_checked' has no kernel matching input types (array[date32[day]], array[date32[day]])

# probably not much we can do with dates without first pulling the data into R
andr$date_df %>% 
  mutate(year = lubridate::year(date1)) %>% 
  collect()
#> Error: Expression lubridate::year(date1) not supported in Arrow
#> Call collect() first to pull data into R.

andr$date_df %>% 
  mutate(min_date = min(date1)) %>% 
  collect()
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.

# interestingly this works
andr$date_df %>% 
  mutate(date_number = as.integer(date1)) %>% 
  collect()
#> # A tibble: 100,000 × 3
#>    date1      date2      date_number
#>  * <date>     <date>           <int>
#>  1 2022-01-27 2021-08-27       19019
#>  2 2021-05-27 2021-07-27       18774
#>  3 2021-05-27 2021-11-27       18774
#>  4 2021-10-27 2022-04-27       18927
#>  5 2021-05-27 2021-07-27       18774
#>  6 2021-10-27 2021-07-27       18927
#>  7 2022-02-27 2021-06-27       19050
#>  8 2021-11-27 2021-10-27       18958
#>  9 2022-01-27 2022-01-27       19019
#> 10 2022-03-27 2022-01-27       19078
#> # … with 99,990 more rows

andr$date_df %>% 
  summarise(min_date_number = min(as.integer(date1))) %>% 
  collect()
#> # A tibble: 1 × 1
#>   min_date_number
#>             <int>
#> 1           18774


# window functions are not supported
andr$flights %>% 
  group_by(carrier) %>% 
  mutate(mean_delay = mean(dep_delay))
#> Error: window functions not currently supported in Arrow
#> Call collect() first to pull data into R.

# can we create a modified copy of an arrow table without pulling the data into R?
andr$flights %>% 
  semi_join(andr$airlines[1:4,], by = "carrier") %>% 
  write_dataset("flights2", format = "feather")

andr$flights2 <- open_dataset("flights2", format = "feather")

andr$flights2 %>% 
  collect()
#> # A tibble: 106,538 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      542            540         2      923            850
#>  2  2013     1     1      544            545        -1     1004           1022
#>  3  2013     1     1      555            600        -5      913            854
#>  4  2013     1     1      557            600        -3      838            846
#>  5  2013     1     1      558            600        -2      753            745
#>  6  2013     1     1      558            600        -2      849            851
#>  7  2013     1     1      558            600        -2      853            856
#>  8  2013     1     1      559            600        -1      941            910
#>  9  2013     1     1      559            559         0      702            706
#> 10  2013     1     1      600            600         0      851            858
#> # … with 106,528 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>



# check memory usage
bench::mark(andr$flights %>% write_dataset("flights2", format = "feather"),
            andr$flights %>% collect() %>% write_dataset("flights2", format = "feather"),
            check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#>   expression                                                                  
#>   <bch:expr>                                                                  
#> 1 andr$flights %>% write_dataset("flights2", format = "feather")              
#> 2 andr$flights %>% collect() %>% write_dataset("flights2", format = "feather")
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> #   mem_alloc <bch:byt>, `gc/sec` <dbl>


# seems like joins are happening outside of R
bench::mark(andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect(),
            flights %>% inner_join(airlines, by = "carrier") %>% collect(),
            check = FALSE, filter_gc = FALSE)
#> # A tibble: 2 × 6
#>   expression                                                              
#>   <bch:expr>                                                              
#> 1 andr$flights %>% inner_join(andr$airlines, by = "carrier") %>% collect()
#> 2 flights %>% inner_join(airlines, by = "carrier") %>% collect()          
#> # … with 5 more variables: min <bch:tm>, median <bch:tm>, `itr/sec` <dbl>,
#> #   mem_alloc <bch:byt>, `gc/sec` <dbl>

Created on 2022-04-27 by the reprex package (v2.0.1)

@leeevans
Copy link

leeevans commented Apr 28, 2022

@ablack3

I haven't tried it but DuckDB can be used for SQL access to Apache Arrow tables:
https://duckdb.org/2021/12/03/duck-arrow.html

@schuemie
Copy link
Member

I don't think we need the SQL at all. Andromeda was set up to not rely on SQL, just on dplyr syntax (which happened to be supported through SQL when using RSQLite as engine).

The most complicated uses of Andromeda I'm aware of are this code in FeatureExtraction and this code in CohortMethod. I'm not aware of any datediffs needed. @jreps : what are your requirements for Andromeda operations?

@ablack3
Copy link
Collaborator

ablack3 commented May 2, 2022

I started working on an Andromeda implementation that uses arrow (no SQL or duckdb). I have not checked in my code yet. I'm currently stuck on this function. I switched to using S3. The current implementation has S4 methods but creates the object by assigning the class attribute only and doesn't call new. It was extending SQLite connection which was S4. Anyway, I'll keep at it.

#' @param x    An [`Andromeda`] object.
#' @param i    The name of a table in the [`Andromeda`] object.
#' @param value A data frame, [`Andromeda`] table, or other 'DBI' table.
#' @export
#' @rdname
#' Andromeda-class
"[[<-.Andromeda" <- function(x, i, value) { 
  # checkIfValid(x)
  if(!is.null(value) && !inherits(value, "data.frame") && !inherits(value, "arrow_dplyr_query")) {
    abort("value must be null, a dataframe, or an dplyr query using an existing andromeda table")
  }
  
  if (is.null(value)) {
    if (i %in% names(x)) {
      r <- unlink(file.path(attr(x, "path"), i), recursive = TRUE)
      if (r == 1) abort(paste("Removal of Andromeda dataset", i, "failed."))
    }
  } else {
    # .checkAvailableSpace(x)
    arrow::write_dataset(value, file.path(attr(x, "path"), i), format = "feather")
    `[[<-.list`(x, i, arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather")) # This line won't work.
     x[[i]] <- arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather") # This doesn't work either
     NextMethod(generic = "[[<-", object = x, i = i, value = arrow::open_dataset(file.path(attr(x, "path"), i), format = "feather")) # does not work
  } 
  x
}

Seems like I need nextMethod.

@ablack3
Copy link
Collaborator

ablack3 commented May 2, 2022

I think I have a the basic idea working for assignment.

"[[<-.Andromeda" <- function(x, i, value) {
  print("using Andromeda method")
  arrow::write_dataset(value, file.path("temp", i), format = "feather")
  value <- arrow::open_dataset(file.path("temp", i), format = "feather")
  NextMethod()
}

a <- list()
class(a) <- "Andromeda"

a[["cars"]] <- cars
#> [1] "using Andromeda method"

class(a$cars)
#> [1] "FileSystemDataset" "Dataset"           "ArrowObject"      
#> [4] "R6"

Created on 2022-05-02 by the reprex package (v2.0.1)

@ablack3
Copy link
Collaborator

ablack3 commented May 3, 2022

There is a problem with loading and saving zero row dataframes. arrow::write_dataset() does nothing with a zero row dataframe.

df <- cars[cars$speed > 1e6,]
print(df)
#> [1] speed dist 
#> <0 rows> (or 0-length row.names)

arrow::write_dataset(df, here::here("df"), format = "parquet")

d <- arrow::open_dataset(here::here("df"), format = "parquet")
#> Error: IOError: Cannot list directory '.../RtmpvMgShC/reprex-850311752e99-bonny-fox/df'. Detail: [errno 2] No such file or directory

Created on 2022-05-03 by the reprex package (v2.0.1)

@ablack3
Copy link
Collaborator

ablack3 commented May 3, 2022

I pushed my initial implementation to the arrow branch in case anyone wants to give it a try. Arrow greatly simplifies Andromeda because we only have to deal with a set of files rather than a database (e.g. indexes are not needed anymore).

I could use help thinking through how the functions in Operations.R file should be adapted to take advantage of arrow. I'm not sure the current implementations make as much sense when you have partitioned feather files. Perhaps the batches should be the partitions. Maybe we can take advantage of arrow::map_batches()

@schuemie
Copy link
Member

Great work @ablack3 !

I tried playing with the new branch. One behavior that is different (arguably better) with the arrow backend is that copies are independent:

a <- Andromeda::andromeda()

b <- a

b$cars <- cars
a
# # Andromeda object
# # Physical location:  C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
# 
# Tables:

b
# # Andromeda object
# # Physical location:  C:\Users\mschuemi.EU\AppData\Local\Temp\Rtmp8IUZjQ\file2c287dc86e5c
# 
# Tables:
# $cars (speed, dist)

However, I can't seem to actually access the tables?

a <- Andromeda::andromeda(cars = cars)
colnames(a$cars)
# NULL

@ablack3
Copy link
Collaborator

ablack3 commented May 10, 2022

thanks for giving it a try! I think you found a bug. Copies are not actually independent. a and b are two different Andromeda objects that point to the same files. The 'path' attribute points to the folder where the feather files are stored.

library(Andromeda)
a <- andromeda(cars = cars)

# names is implemented by the arrow package
names(a$cars)
#> [1] "speed" "dist"

# colnames is not implemented by arrow but could be added in Andromeda
colnames(a$cars)
#> NULL

# colnames works if you pull the data into R first 
colnames(collect(a$cars))
#> [1] "speed" "dist"



b <- a
attr(a, "path") == attr(b, "path")
#> [1] TRUE

list.dirs(attr(a, "path"), full.names = F)
#> [1] ""     "cars"
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""     "cars"

b$cars <- NULL

list.dirs(attr(a, "path"), full.names = F)
#> [1] ""
list.dirs(attr(a, "path"), full.names = F)
#> [1] ""

a
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpmPXb7A/file1274499a75dd
#> 
#> Tables:
#> $cars (speed, dist)


a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpmPXb7A/file1274499a75dd/cars/part-0.feather'. Detail: [errno 2] No such file or directory

Created on 2022-05-10 by the reprex package (v2.0.1)

@ablack3
Copy link
Collaborator

ablack3 commented May 11, 2022

This is tricky. I fixed 'names.Andromeda' but if someone copies an an Andromeda object I can't really update all the other andromeda objects that refer to the same file location.

So perhaps it would be better to create a copy every time an andromeda object is modified? However this seems like potentially a lot of unnecessary copying of large datasets. Maybe another option is to use reference classes

library(Andromeda)
a <- andromeda(cars = cars)
b <- a
names(a)
#> [1] "cars"

# remove the cars file
b$cars <- NULL

# the new state is reflected in both a and b
a
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#> 
#> Tables:
b
#> # Andromeda object
#> # Physical location:  /var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T//RtmpwFFo6h/file1182f64ac99ff
#> 
#> Tables:
names(a)
#> character(0)

# however a still contains a reference to the file that no longer exists
length(a)
#> [1] 1
length(b)
#> [1] 0

a$cars
#> FileSystemDataset with 1 Feather file
#> speed: double
#> dist: double
#> 
#> See $metadata for additional Schema metadata

a$cars %>% collect()
#> Error in `handle_csv_read_error()`:
#> ! IOError: Failed to open local file '/private/var/folders/xx/01v98b6546ldnm1rg1_bvk000000gn/T/RtmpwFFo6h/file1182f64ac99ff/cars/part-0.feather'. Detail: [errno 2] No such file or directory

Created on 2022-05-11 by the reprex package (v2.0.1)

@ablack3
Copy link
Collaborator

ablack3 commented May 13, 2022

UPDATE: I think I have a solution for copies of the andromeda object by syncing the dataset reference with the existing files every time any method is called.

The arrow branch can now handle 0 row dataframes but if a dplyr query produces a result that has zero rows which is then written to andromeda the assignment fails (NULL is assigned). so that is still an issue. I created an issue about it on the arrow Jira https://issues.apache.org/jira/browse/ARROW-16575.

@ablack3
Copy link
Collaborator

ablack3 commented May 17, 2022

I think the arrow branch is ready for some alpha testing and some early user feedback.
devtools::install_github("OHDSI/Andromeda", ref = "arrow")
What works: object creation, dplyr syntax for data manipulation, saving and loading (tests are passing). (Andromeda objects are now cleaned up only when the R session ends and not before.)
What does not work: batchApply, groupApply, appendToTable, batchTest (These implementations should be reconsidered to take advantage of the feather file format)
What has been removed: everything related to indexes, ability to set column names, user defined attributes are now saved as json instead of rds.

If this seems like a significant improvement and the way to go I can work on implementing the apply functions. I should also create some meaningful performance tests.

@ablack3
Copy link
Collaborator

ablack3 commented May 18, 2022

I think this could work for appending. Feather files don't seem to support appending but they can be read an written without copying (not sure how this works but it is pretty cool).

library(dplyr)
library(arrow)
library(wakefield)

# create a dataframe with 10 million rows
df <- r_data_theme(n = 1e7, "survey")
df
#> # A tibble: 10,000,000 × 11
#>    ID    Item_1 Item_2 Item_3 Item_4 Item_5 Item_6 Item_7 Item_8 Item_9 Item_10
#>    <chr>  <int>  <int>  <int>  <int>  <int>  <int>  <int>  <int>  <int>   <int>
#>  1 00001      1      4      2      4      5      2      3      3      2       5
#>  2 00002      5      2      4      3      5      1      5      4      1       5
#>  3 00003      3      3      2      4      2      4      1      5      2       3
#>  4 00004      3      4      4      4      1      5      2      1      2       4
#>  5 00005      4      4      2      1      5      5      3      3      4       4
#>  6 00006      2      4      5      4      2      4      1      3      5       3
#>  7 00007      3      5      5      3      2      4      4      2      5       3
#>  8 00008      1      1      5      3      3      2      3      3      4       4
#>  9 00009      4      4      2      2      2      3      3      2      4       3
#> 10 00010      4      3      5      1      2      5      3      4      2       3
#> # … with 9,999,990 more rows

# save the data in arrow format and open a link to it in R
system.time({
  write_dataset(df, "df", "arrow")
  d <- open_dataset("df", format = "arrow")
})
#>    user  system elapsed 
#>   0.594   0.155   0.792

# append to an existing file system dataset
system.time({
  s <- Scanner$create(d)
  write_dataset(concat_tables(s$ToTable(), arrow_table(df)), "df", format = "arrow")
  d <- open_dataset("df", format = "arrow")
})
#>    user  system elapsed 
#>   0.575   0.377   0.940

nrow(d)
#> [1] 20000000

# remove the file
unlink("df", recursive = T)

Created on 2022-05-18 by the reprex package (v2.0.1)

Kind of crazy how fast arrow read/writes are.

@egillax
Copy link
Contributor Author

egillax commented May 24, 2022

Hi @ablack3,

I did some testing. Looks very good. I could run the tidyCovariate function on the new andromeda object by changing one word (one count() to n()) and by disabling the checks in the beginning of that function. And I get similar speedup as I have seen before, so significantly faster than the sqlite version.

However the reasons the checks are failing are because FeatureExtraction has it's own S4 class covariateData which inherits from the Andromeda class. Now that you changed Andromeda to an S3 maybe this inheritance is broken? At least running the following used to give true but now gives false:

> covData <- FeatureExtraction:::createEmptyCovariateData(cohortId=1, aggregated=F, temporal=F)
> inherits(covData, 'Andromeda')
[1] FALSE

Also another of the test FeatureExtraction::isAggregatedCovariateData() does depend on the object having colnames() defined which used to be the case but doesn't work anymore, so maybe needs to be defined.

@ablack3
Copy link
Collaborator

ablack3 commented Jun 18, 2022

So I think there are a couple next steps for this issue (which I think will solve all of the currently open issues on Andromeda).

  1. Adjustments need to be made in the packages that extend Andromeda (FeatureExtraction, possibly CohortMethod, ...)
  2. Assess if we actually need the Andromeda methods that have not been implemented yet and come up with implementations that take advantage of arrow. (e.g batchApply). I'm currently not sure how to do this since arrow is quite different than SQLite.

@schuemie
Copy link
Member

Could you explain a bit more about what you mean by (2) ('Asses if actually...')?

The batchApply() function is used for example in Cyclops. In general, it seems essential for a package like Andromeda to have some way to process the large data (that may not fit in memory) in smaller chunks.

@egillax
Copy link
Contributor Author

egillax commented Jun 20, 2022

With regards to batchApply() I think it could just be a wrapper around arrows map_batches. That's what I did when trying it out earlier with Cyclops and seemed to work.

Also @ablack3 I see a possible issue with your appendToTable implementation. By converting to an arrow table I believe you are loading everything in memory. Did you try this with data bigger than memory?

@ablack3
Copy link
Collaborator

ablack3 commented Jun 21, 2022

Could you explain a bit more about what you mean by (2) ('Assess if actually...')?

I want to check that all of the remaining functions are necessary and need to be implemented on arrow of is any of them can be deprecated. The remaining functions are
batchApply
groupApply
BatchTest

batchApply and possibly groupApply functionality could be covered by map_batches. One problem with map_batches is that "This is experimental and not recommended for production use."

Should we use map_batches? Is map_batches a drop in replacement for batchApply?
If so can we simply export map_batches instead of wrapping it and renaming it batchApply?

The batchApply() function is used for example in Cyclops. In general, it seems essential for a package like Andromeda to have some way to process the large data (that may not fit in memory) in smaller chunks.

But batchApply does read all the data into R, just not all at once. Possibly a dumb question but how does this save space? If I have 10GB of data and read it in 1GB at a time don't I still have to allocate 10GB of RAM? Is the limitation the size of a single object (e.g. I can create 10 object of 1GB each but not a single object of 10GB). Or perhaps garbage collection is running in between reads?

Also @ablack3 I see a possible issue with your appendToTable implementation. By converting to an arrow table I believe you are loading everything in memory. Did you try this with data bigger than memory?

I think I'm lacking a good mental model for how arrow works. Where, when, and how is the data processing actually done? I understand that arrow can only read in the data that is necessary but aggregation is actually performed in R when collect is called right?

There is an arrow course at the upcoming useR conference but unfortunately it is full. :/

So one thing I did learn is that arrow supports "zero copy reads" so apparently even though I convert the "Dataset" into an arrow table it is somehow read into memory without being read into memory. Black magic I guess?

library(pryr)
library(arrow)

# datasource: https://www.kaggle.com/datasets/raddar/amex-data-integer-dtypes-parquet-format?resource=download
file.size("~/Downloads/test.parquet")
#> [1] 3301659934

system.time({
  d <- open_dataset("~/Downloads/test.parquet", format = "parquet")
})
#> user  system elapsed 
#> 0.017   0.001   0.018 

object_size(d)
# 261,088 B

# append to an existing file system dataset
system.time({
  dataScanner <- arrow::Scanner$create(d)
  dataArrowTable <- dataScanner$ToTable()
})
#> user  system elapsed
#> 8.225   5.432   3.820 

class(dataArrowTable)
#> [1] "Table"        "ArrowTabular" "ArrowObject"  "R6"  

object_size(dataScanner)
#> 172,136 B

object_size(dataArrowTable)
#> 284,632 B

So it looks like reading in an Arrow table does not require much memory. But I did notice that RStudio reported the "memory used by session" increased to about 2 GB.

image

Aside: The torch package is on CRAN and the tarball is 42 megabytes. I thought CRAN had a size limitation of 5MB.

Sorry for all the questions and thanks for the input!

@schuemie
Copy link
Member

but how does this save space?

The idea of batchApply() is that you only have small parts of the entire data in memory at one time, do some operation on it, and then no longer keep it in memory. The Cyclops example is a bit awkward in that the operation actually implies storing it in memory, just in Cyclops' internal hyper-efficient sparse-matrix memory storage.

can we simply export map_batches instead of wrapping it and renaming it batchApply?

I prefer we wrap map_batches. That makes for a nice separation between Andromeda's interface and it's implementation. Similarly, groupApply() and batchTest() should continue to be supported, and if possible, colnames() and count() as well.

@egillax
Copy link
Contributor Author

egillax commented Jun 21, 2022

+1 for wrapping map_batches into batchApply.

@ablack3 I believe the arrow table is an in memory object. I think what you are measuring with object_size() on the table is the c++ pointer. And what you see in rstudio under memory usage by session is the actual memory usage. An arrow table actually has an nbytes() method which I believe to be the memory usage of the underlying c++ buffers.

I did some experiments with the nyc_taxi dataset referenced on the arrow website. After some initial troubles where I had to exclude some columns that were giving me issues I managed to crash my rsession by calling collect() on it. The dataset is 37GiB and my RAM is 32 GiB, so no surprises there. I however also crash my session when emulating the appendToTable process of creating a scanner and converting it to table.

I did come up with a Linux specific way of measuring the peak memory usage by my rsession process . It's option number 3 from here.

Then I can run the emulated appendToTable process on the nyc-taxi dataset:

library(dplyr)
library(arrow)

# some of the columns in the dataset were giving me errors, something about
# strings and null, so I made a new dataset without string or null columns
# ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
# 
# write_dataset(ds %>% select(-c(vendor_id, store_and_fwd_flag,
#                                payment_type, rate_code_id)), 
#               path,
#               partitioning = c('year', "month"))
path <- './new_taxi/'
new_ds <- open_dataset(path)

subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()

scanner <- Scanner$create(new_ds %>% filter(year==2017))
table <- scanner$ToTable()

arrow::concat_tables(subset, table) 

This results in about 9 GiB of peak memory use.

On the other hand I made an alternative implementation which dumps the dataframe to be appended to a file and then creates a new dataset from the list of files of the current dataset and the dataframe file.

library(dplyr)
library(arrow)

path <- './new_taxi/'
new_ds <- open_dataset(path)

subset <- new_ds %>% filter(year==2018) %>% filter(month==1) %>% compute()

tempFile <- tempfile()

write_parquet(subset, tempFile)

appended_ds <- open_dataset(c(new_ds$files, tempFile))

This give me about 1 GiB peak memory usage of the process. the subset$nbytes() tells me the arrow table should contain about 600 MiB. This seems to match up approximately with what rstudio reports under used by session.

@ablack3
Copy link
Collaborator

ablack3 commented Jun 21, 2022

Thanks for the experiments. I'll implement batchApply.

I think my implementation was inspired by Wes McKinney's comment here:
image

@egillax
Copy link
Contributor Author

egillax commented Jun 22, 2022

I did try this today as well with feather files instead of parquet with the same results.

I don't think zero-copy means the data is still on disk. I think it means that the data is moved to the destination without any unnecessary copies. See for example this.

@solis9753
Copy link

solis9753 commented Nov 21, 2022

Hi @ablack3, looking forward to this and I think this is great job.
I just noticed something that I hope this is a good place to mention it.

The new implementation of arrow creates a json file to store attributes instead of the previously rds format. I believe with a small patch to identify the specification format and read it within loadAndromeda(), we could work with objects saved with either file format. This will save time and probably make it backwards compatible in cases/studies where data does not need to be extracted from database again (i.e saved using the previous implementation). I think I am not too specific.
What do people think?

@ablack3
Copy link
Collaborator

ablack3 commented Nov 21, 2022

Good point @solis9753. Thinking through it a bit more, both the attribute format and the data table format have changed so I think you're suggesting that if loadAndromeda were used on an old format object it would read the SQLite database and rdf object and convert it to arrow-feather files (the new format). Is that right?

@solis9753
Copy link

solis9753 commented Nov 21, 2022

That's right. And yes on a second look the data table format has to be read correctly as well.

@ablack3
Copy link
Collaborator

ablack3 commented Nov 21, 2022

Ok yea that seems like it might be a good idea. Can I ask a question - How long do you store saved Andromeda objects? Are you using them for permanent storage of results?

@solis9753
Copy link

solis9753 commented Nov 22, 2022

Not so experienced, but I definitely keep them until publication. However, I am not sure anymore it is a good idea.
Such a fix is simpler than having to maintain another package in the dependencies.

@schuemie
Copy link
Member

To allow for a smooth transition, would it be possible to first release an Andromeda version with the old backend, but already having the isAndromedaTable() function? That way, the HADES packages can support both backends. Currently, if we modify HADES package for the new backend (using isAndromedaTable()) they throw an error on the currently release version of Andromeda.

@schuemie
Copy link
Member

On Windows, close() is throwing the following warning:

a <- Andromeda::andromeda(cars = cars)
x <- collect(a$cars)
close(a)
# Attempt to remove andromeda file unsuccessful.

And indeed the andromeda temp folder still shows the file exists.

@schuemie
Copy link
Member

Just documenting some weird behavior here when renaming columns on a zero-row table:

# When there's data, it's fine:
a <- Andromeda::andromeda(cars = filter(cars, speed > 1))
a$cars <- dplyr::rename_with(a$cars, toupper)

# When there's no data, we get an error message:
a <- Andromeda::andromeda(cars = filter(cars, speed > 1000))
a$cars <- dplyr::rename_with(a$cars, toupper)
# Error: IOError: Cannot list directory 'D:/temp/Rtemp/RtmpM3Ubhh/temp3078133b153c'. Detail: [Windows error 3] The system cannot find the path specified.

This seems like an arrow issue.

@schuemie
Copy link
Member

Or maybe not. This works fine too:

a <- Andromeda::andromeda(cars = filter(cars, speed > 1000))
a$cars2 <- dplyr::rename_with(a$cars, toupper)

So maybe it's the temp copy we make when replacing a table slot in an Andromeda object?

@egillax
Copy link
Contributor Author

egillax commented Dec 5, 2022

It seems Andromeda is still not able to remove files sometimes on windows. This happens sporadically and I haven't been able to reproduce it without using the PLP (arrow_S4 branch of Plp, FeatureExtraction and Andromeda needed).

I've tried putting a browser in the removeTableIfExists inside the if condition (if r==1), and indeed cannot remove the file even when calling gc().

Note the reprex below fails in different places, sometimes inside the simulatePlpData and sometimes in the next line when I try to remove covariateRef. And sometimes it doesn't fail, that's why I put a for loop around it. It always happens when removing tables from an existing andromeda object.

Reprex:

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(PatientLevelPrediction) # branch arrow_S4 of plp, featureExtraction and Andromeda

data("plpDataSimulationProfile")

for (i in 1:100) {
  print(i)
  # simulate some data
  plpData <- PatientLevelPrediction::simulatePlpData(plpDataSimulationProfile,
                                                     n=100)
  plpData$covariateData$covariateRef <- NULL
}
#> [1] 1
#> Generating covariates
#> Generating cohorts
#> Generating outcomes
#> Error in `removeTableIfExists()` at Andromeda/R/Object.R:255:4:
#> ! Removal of Andromeda dataset coefficients failed.

#> Backtrace:
#>     ▆
#>  1. └─PatientLevelPrediction::simulatePlpData(...)
#>  2.   └─PatientLevelPrediction:::predictCyclopsType(...) at PatientLevelPrediction/R/Simulation.R:138:4
#>  3.     ├─methods (local) `$<-`(`*tmp*`, coefficients, value = `<NULL>`) at PatientLevelPrediction/R/Simulation.R:138:4
#>  4.     └─Andromeda (local) `$<-`(`*tmp*`, coefficients, value = `<NULL>`)
#>  5.       ├─methods (local) `[[<-`(`*tmp*`, name, value = `<NULL>`) at Andromeda/R/Object.R:225:2
#>  6.       └─Andromeda (local) `[[<-`(`*tmp*`, name, value = `<NULL>`)
#>  7.         └─Andromeda (local) .local(x, i, ..., value)
#>  8.           └─Andromeda (local) removeTableIfExists(x, i) at Andromeda/R/Object.R:255:4
#>  9.             └─rlang::abort(paste("Removal of Andromeda dataset", i, "failed.")) at Andromeda/R/Object.R:247:8

Created on 2022-12-05 with reprex v2.0.2

@ablack3
Copy link
Collaborator

ablack3 commented Dec 5, 2022

To allow for a smooth transition, would it be possible to first release an Andromeda version with the old backend, but already having the isAndromedaTable() function? That way, the HADES packages can support both backends. Currently, if we modify HADES package for the new backend (using isAndromedaTable()) they throw an error on the currently release version of Andromeda.

Yes of course.

I'm stumped on the file removal issue. Someone suggested "And at least it should be easy for a R developer to dump logs on C++ file opens and closes, and see what happens exactly here." https://issues.apache.org/jira/browse/ARROW-16421. I'm not sure how to do that.

@schuemie
Copy link
Member

schuemie commented Dec 6, 2022

I'm not sure how to do that.

I too have no clue.

I'm not even able to compile the arrow package on Windows. When I hit Install I get

Warning message:
In unzip("lib.zip", exdir = "windows") :
  error 1 in extracting from zip file

which likely comes from here, and causes all sorts of downstream compile issues. Have you been able to compile it?

@schuemie
Copy link
Member

schuemie commented Dec 6, 2022

(Figured it out: you need to check out a release tag (e.g. git checkout apache-arrow-10.0.0) to have a consistent codebase and be able to compile)

@marianschmidt
Copy link

Just as a comment: I have read this thread because I was also interested in handling large out of memory health data. My test for some usually quite demanding group_by, joins and summarize function is that currently duckdb is factor 4-10 times faster than rsqlite and gets the same results while arrow alone cannot correctly handle group_by in combination with window functions and is not consistently throwing errors. So I decided to go for a local duckdb in combination with arkdb to get data in and out.

@marianschmidt
Copy link

Just as a comment: I have read this thread because I was also interested in handling large out of memory health data. My test for some usually quite demanding group_by, joins and summarize function is that currently duckdb is factor 4-10 times faster than rsqlite and gets the same results while arrow alone cannot correctly handle group_by in combination with window functions and is not consistently throwing errors. So I decided to go for a local duckdb in combination with arkdb to get data in and out.

Have add a couple of limitations to my previous comment. My previous speed comparisons were based on artificial data and simple joins and sorting operations. Now I have tested a few more real life examples with actual data and frequent operations (checking for unique rows, joining, group_by and counts).
Obviously, duckdb is still not very stable and quite inefficient with regard to memory allocation of the R integration. I creates multiple internal copies before creating the output, which can result in memory allocation errors (at least for my use case of 1E9 rows). See https://github.com/duckdb/duckdb/issues?q=is%3Aissue+is%3Aopen+memory
Also, I noticed that for join and aggregate operations, duckdb loses the speed comparison to sqlite for objects bigger 1E8 rows. Then sqlite is factor 3-4 faster than duckdb.

@egillax
Copy link
Contributor Author

egillax commented Jan 19, 2023

A small update regarding the file lock issue on windows. In the upcoming arrow version (11) there have been added .unsafe_delete() methods on a bunch of objects related to the arrow datasets. I briefly tested them last weekend and now it's easier to remove the files, without calling gc(). For me there were two issues I encountered with filelocks:

  1. A warning was issued when a removal of a file associated with an Andromeda object failed, this seemed to happen most often after running Andromeda::batchApply . batchApply uses an arrow scanner and a reader. By unsafe deleting both those objects when finished with them in batchApply I've not encountered this warning anymore.
  2. When overwriting an andromeda table with some modified data, if the file associated with the original table couldn't be removed an error was thrown. The error was thrown after trying to call gc() if the file couldn't still be removed. The new methods do help in removing this file, but I found out sometimes you need to call them and unlink multiple times before the file is removed.

So in summary I think these file lock issues can be solved by using the new methods, no gc calls needed. I think the new arrow version is just around the corner (1-2 weeks away).

@ablack3
Copy link
Collaborator

ablack3 commented Jan 19, 2023

Thanks for the information @egillax! I guess I'll try calling unlink and if that fails use the unsafe_delete method.

@ablack3
Copy link
Collaborator

ablack3 commented Jan 20, 2023

Both of these code snippets are crashing R.

library(Andromeda)
a <- andromeda(cars = cars)

a$cars %>% 
  tally()
library(dplyr)
arrow::write_dataset(cars, here::here("cars.feather"), format = "feather")
a <- arrow::open_dataset(here::here("cars.feather"), format = "feather")
a %>% tally()

I'm using arrow 10.0.1 on R 4.2.
I created a bug report here: apache/arrow#33807

@schuemie
Copy link
Member

Would it be possible to add support for colnames() when using arrow? Should just be a matter of adding the generic method I think? Many HADES packages rely on it to work on Andromeda.

Unrelated, when I use pull() I get:

x$analysisRef %>% count() %>% pull()
[1] 2
Warning message:
Default behavior of `pull()` on Arrow data is changing. Current behavior of returning an R vector is deprecated, and in a future release, it will return an Arrow `ChunkedArray`. To control this:
i Specify `as_vector = TRUE` (the current default) or `FALSE` (what it will change to) in `pull()`
i Or, set `options(arrow.pull_as_vector)` globally
This warning is displayed once every 8 hours. 

Apart from the warning being annoying, the message means we'll need to add as_vector = TRUE to all our pull() calls, which will throw an error with RSQLite.

@schuemie
Copy link
Member

colnames() is not a generic, but it calls dimnames, which is a generic. Adding this code to Andromeda will add support for colnames():

dimnames.ArrowObject <- function(x, do.NULL = TRUE, prefix = "col") {
  return(list(NULL, names(x)))
}

@schuemie
Copy link
Member

I'm still seeing Andromeda temp objects not being cleaned up on the the disk. Did you test these get cleaned up eventually?

@schuemie
Copy link
Member

That isAndromedaTable() function would have been really helpful...

@ablack3
Copy link
Collaborator

ablack3 commented Mar 20, 2023

isAndromedaTable has been added. It's just this

isAndromedaTable <- function(tbl) {
  `||`(inherits(tbl, "FileSystemDataset"),
       inherits(tbl, "tbl_SQLiteConnection"))
}

I can't easily reproduce the file cleanup problem or write a test for it to make sure my code fixes it. A test like that would be really helpful.

I made an attempt to fix it on the develop branch.

@schuemie
Copy link
Member

Arrow tables can also inherit from arrow_dplyr_query (which does not inherit from FileSystemDataset). Could you add that? (And create a new release?) (And push the release to CRAN?)

@ablack3
Copy link
Collaborator

ablack3 commented Mar 21, 2023

Yes I can by end of the day Friday if not before.

@ablack3
Copy link
Collaborator

ablack3 commented Mar 26, 2023

Well that deadline came and went. Working on it this today.

@ablack3
Copy link
Collaborator

ablack3 commented Mar 27, 2023

Arrow tables can also inherit from arrow_dplyr_query (which does not inherit from FileSystemDataset). Could you add that? (And create a new release?) (And push the release to CRAN?)

I think this is complete. Andromeda 0.6.3 is on CRAN https://cran.rstudio.com/web/packages/Andromeda/index.html

@schuemie
Copy link
Member

Great! Could you also merge 0.6.3 into main, just to avoid confusion?

@schuemie
Copy link
Member

While testing FeatureExtraction with the develop branch of Andromeda, I get these warnings at random points (not reproducible, the same command will not throw the same warning twice). Would you happen to have an idea where this is coming from?

Warning message:
Not a validObject(): no slot of name "ptr" for this object of class "CovariateData" 

@schuemie
Copy link
Member

Nevermind, I (again) forgot how S4 inheritance works. After installing the new version of Andromeda, I forgot to remove and reinstall FeatureExctraction, so CovariateData was still inheriting from the old version.

Works good now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants