Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up readVcf #59

Closed
bschilder opened this issue May 4, 2022 · 16 comments
Closed

Speed up readVcf #59

bschilder opened this issue May 4, 2022 · 16 comments

Comments

@bschilder
Copy link
Contributor

bschilder commented May 4, 2022

Splitting the Issue originally raised here about improving the efficiency of readVcf.

@lawremi suggested using chunking as is currently implemented in writeVcf here. I imagine this could be further improved through parallelization of that chunking procedure.

This should be quite analogous to implement in readVcf. Following up on discussion here, would this be something that @vjcitn @mtmorgan @vobencha @lawremi or one of the other VariantAnnotation co-authors would be able to implement?

Many thanks in advance,
Brian

@mtmorgan
Copy link
Contributor

mtmorgan commented May 4, 2022

Please see the response in your previous issue, especially VcfFIle(yieldSize=...) and section 7 of the vignette. How about closing this issue?

@bschilder
Copy link
Contributor Author

bschilder commented May 4, 2022

Hi @mtmorgan, thank you for the response.

From my understanding yieldSize can only be used when you want to read in a limited subset of variants. In our case (munging genome-wide summary statistics), we're trying to read the entire VCF into R.

My suggestion is to speed up readVcf by having it automatically break the VCF into chunks and reading in each one (ideally across multiple threads).

As for the other suggestion with specifying specific fields (info=, geno=) this is the indeed the approach I posted here. If readVcf made use of this code (e.g. via an extra argument like drop_empty_cols), queries would automatically be sped up without users having to know a priori which columns are empty or not.

I'll check out GenomicFiles and see if that might work. Thanks for the tip!

@vjcitn
Copy link
Contributor

vjcitn commented May 4, 2022

I do think that your objectives can be met without changes to VariantAnnotation by making use of available parallelism/divide and conquer. As I noted you have a bottleneck with the cbind on thin data.tables and you should try to rectify that in vcf2df. I will demonstrate scalability in ways that I think reflect Martin's suggestion, soon.

@mtmorgan
Copy link
Contributor

mtmorgan commented May 4, 2022

arrgh, I moved my comment to the issue I intended to comment on.

@vjcitn
Copy link
Contributor

vjcitn commented May 4, 2022

@bschilder I hope Martin's examples at #57 (comment) are sufficient to get you to a better place. If more details are needed about the specific use case of vcf->df please say where the gaps are.

@bschilder
Copy link
Contributor Author

bschilder commented May 5, 2022

Hi @mtmorgan, thank you so much for these really helpful examples!

I've tried implementing a version of this but in my case, I actually found that multi-threading increased compute time somehow (from <2sec to >1min). Am I doing something wrong below?

Single- vs. multi-threaded

path <- "https://gwas.mrcieu.ac.uk/files/ieu-a-298/ieu-a-298.vcf.gz"

## Using empty param here simply for demo purposes
param <- VariantAnnotation::ScanVcfParam()

#### Single-threaded ####
system.time({
vcf <- VariantAnnotation::readVcf(file = path, 
                                          param = param) 
})
## Takes 1.6 seconds

#### Multi-threaded (11 cores) ####
BiocParallel::register( 
            BiocParallel::SnowParam(workers = 11, 
                                         progressbar = T) 
        ) 
        vcf_file <- VariantAnnotation::VcfFile(file = path, 
                                               index = paste0(path,".tbi"))
        ## Tile ranges across the genome 
        tiles <-
            GenomicRanges::seqinfo(vcf_file) |>
            GenomeInfoDb::keepSeqlevels(as.character(1:22)) |>
            GenomicRanges::tileGenome(cut.last.tile.in.chrom = TRUE, 
                                      tilewidth = 1e7L)
        ## Create mapping function
        MAP <- function(range, file) {
            param <- VariantAnnotation::ScanVcfParam(which = range, )
            vcf <- VariantAnnotation::readVcf(file = file, 
                                              param = param, genome="HG19/GRCh37")
            nrow(vcf)
        }
        ## Parallelised query
system.time({
        vcf <- GenomicFiles::reduceByRange(ranges = tiles, 
                                           files = path, 
                                           MAP = MAP, 
                                           REDUCE = `+`)
})
## Takes 1.2 minutes

Pre-selecting fields

I agree, my strategy of sampling the top N rows to determine which columns are likely empty is not ideal. In an ideal world, VCF headers would only contain info that is actually present (and populated with not just NAs). But in the course of munging many different GWAS sumstats VCFs, we've found this is rarely the case (@Al-Murphy). Almost all of the sumstats VCFs from OpenGWAS contain 2-8 columns filled entirely with NAs (genome-wide!). Thus, I think for our use case, sampling the top 1000 rows is pretty predictive of whether the columns are entirely empty (though I haven't yet gathered stats on this).

@mtmorgan
Copy link
Contributor

mtmorgan commented May 5, 2022

There are three things going on.

  1. readVcf(file=path, param = param) is one call to the C library, and one call to the iterator through the library. For me this takes 0.32s. readVcf(file = path, param = ScanVcfParam(which = tiles)) is one call to the C library, 300 calls that construct an iterator for the range, and the code to put the ranges together; this takes 2.97s. The parallel evaluation is more like

    lapply(seq_along(tiles), function(i) {
        vcf <- VariantAnnotation::readVcf(
            vcf_file,
            param = VariantAnnotation::ScanVcfParam(which = tiles[i])
            )
        nrow(vcf)
    })
    

    which is 300 calls to the library and takes 82s! This points to the appeal of using yieldSize= in serial mode, where the number of calls to the library 'adapts' to file size, e.g., if there are 10000 variants and yieldSize = 100000 then there is only one call, whereas if there 10e7 variants there are 1000 calls. Also, the number of tiles indirectly influences the number of records input, so one could end up requesting more memory (proportional to # processes x records per tile) than available, leading to swapping and terrible performance.

  2. A 'snow' cluster is like a separate process (starting R from the command line) so when the process starts you need to load the VariantAnnotation library.

    BiocParallel::register( 
        BiocParallel::SnowParam(workers = 8L, progressbar = TRUE)
    ) 
    system.time({
        BiocParallel::bplapply(1:8, function(i) requireNamespace("VariantAnnotation", quietly = TRUE))
    })
    

    For me this takes about 17 s (so 17s to load VariantAnnotation).

    The MulticoreParam() is a forked process and inherits packages from the master process -- VariantAnnotation is already loaded, so no extra overhead. However, as mentioned, it isn't 100% clear that a library like htslib will respond well to being forked (I think it does, actually...). And perhaps a consideration is that Windows users don't have access to forked processes. So by working with SnowParam you're reaching the broadest number of users in the safest way.

    If you were processing many VCF files, you could amortize the cost of loading VariantAnnotation by starting the workers once, and calling reduceByRanges() etc several times. The first call pays the cost of loading VariantAnnotation, but subsequent calls re-use the workers who already have VariantAnnotation loaded. The paradigm for this is something like

    f = function() {
        bpstart()            # start the cluster
        on.exit(bpstop())    # stop the cluster regardless of whether errors occur
        reduceByRanges(...)  # e.g., first file
        reduceByRanges(...)  # ...second file
        ...
    }
    
  3. htslib downloads the tbi file to a local cache (I think...). It's not really clear what's going to happen in the SnowParam case -- best case is that htslib handles this gracefully, next best is that htslib downloads one copy for each process, next best (worst) is that the second process thinks it has the tbi file that the first process has just started to download. Even if htslib accesses the tbi file without a local cache, there will be network traffic to read 300 copies of the tbi file, one for each tile. There are similar consequences for the vcf file itself -- there will be 300 network calls at least, for each tile; this might not actually be too different from reading the file directly, since probably htslib 'streams' through the file with a sequence of calls, and the transfer itself, rather than initiation of the call, likely dominates these times.

    For these reasons I think at a minimum you should download the tbi file, maybe to tempdir() or more permanently using BiocFileCache. The index can be provided as a separate argument in VcfFile.. For my work here I've downloaded both files to my local laptop; this will be faster, even if using samtools / htslib directly, as soon as you make more than one pass through the entire file.

@bschilder
Copy link
Contributor Author

Thanks again for all the very detailed and helpful explanations @mtmorgan

I've tried a number of ways to improve readVcf's efficiency based on your suggestions but so far no luck, even after downloading the file and querying locally.

@bschilder
Copy link
Contributor Author

bschilder commented May 5, 2022

It's worth noting that vcfR has an analogous function to readVcf called read.vcfR. It seems vcfR's version is much faster (by a factor of >4-5x):

 URL <- "https://gwas.mrcieu.ac.uk/files/ieu-a-298/ieu-a-298.vcf.gz"
    utils::download.file(URL, basename(URL))
    utils::download.file(paste0(URL,".tbi"), paste0(basename(URL),".tbi"))
    path <- basename(URL)

  res <- microbenchmark::microbenchmark(
        VariantAnnotation = {
            vcf1 <- VariantAnnotation::readVcf(path)
        },
        vcfR = {
            vcf2 <- vcfR::read.vcfR(path)
        },
        times = 1
    )

Screenshot 2022-05-05 at 19 04 50

This speed difference is even greater with larger VCFs (11 millions variants). In this case, vcfR was 9x faster (only 1.5 min) compared to VariantAnnotation (13.7 min).

URL <- "https://gwas.mrcieu.ac.uk/files/ubm-a-2929/ubm-a-2929.vcf.gz"

Screenshot 2022-05-05 at 19 00 10

Maybe the author of vcfR has some insights as to why there's such a notable performance difference. @knausb

If for whatever reason VariantAnnotation:::readVcf can't be improved we may need to remove VariantAnnotation as a dependency from our packages (including MungeSumstats) and switch to using vcfR.

@mtmorgan
Copy link
Contributor

mtmorgan commented May 5, 2022

For the small file with times = 10 I had (n.b., the time unit is milliseconds and these differences are not something to get excited about)

> res
Unit: milliseconds
              expr      min       lq     mean   median       uq      max neval
 VariantAnnotation 315.0752 319.3693 328.1418 325.3153 338.2770 342.1750    10
              vcfR 178.0379 179.1408 180.1794 180.2401 180.9466 182.3503    10

For the large file with times = 1 I had

> res2
Unit: seconds
              expr      min       lq     mean   median       uq      max neval
 VariantAnnotation 181.3438 181.3438 181.3438 181.3438 181.3438 181.3438     1
              vcfR 299.0507 299.0507 299.0507 299.0507 299.0507 299.0507     1

I know that for VariantAnnotation times can be strongly influenced by R's garbage collector, especially if it runs frequently as R consumes more memory, e.g., when first allocating a large number of character vectors. As far as I can tell the files are parsed differently, e.g.,

> geno(vcf1, "ES") |> head(3)
          ubm-a-2929
rs2462492 -0.0354
rs3107975 0.1696
rs2691311 0.2794

I thought I could

> vcfR::extract.gt(vcf2, "ES")
Error in vcfR::extract.gt(vcf2, "ES") :
  ID column contains non-unique names

It looks like the file is only partially parsed so additional time in extracting?

> head(vcf2@gt, 3)
     FORMAT           ubm-a-2929
[1,] "ES:SE:LP:AF:ID" "-0.0354:0.026:0.119186:0.399665:rs2462492"
[2,] "ES:SE:LP:AF:ID" "0.1696:0.132:0.154902:0.0089751:rs3107975"
[3,] "ES:SE:LP:AF:ID" "0.2794:0.3047:0.356547:0.0018298:rs74447903"

Some of the IDs appear to be duplicated...

> id = geno(vcf1, "ID")
> anyDuplicated(id)
[1] 1826

@vjcitn
Copy link
Contributor

vjcitn commented May 5, 2022

I did a little bit of work on this today. Here is the basic code:

suppressPackageStartupMessages({
 library(GenomicFiles)
 library(GenomicRanges)
 library(VariantAnnotation)
 library(BiocParallel)
})
fl = "ubm-a-2929.vcf.gz"
tiles <-
    seqinfo(VcfFile(fl)) |>
    keepSeqlevels(as.character(1:22)) |>
    tileGenome(cut.last.tile.in.chrom = FALSE, ntile=70)

MAP = function(range, file) {
         param = VariantAnnotation::ScanVcfParam(which=range)
         vcf = VariantAnnotation::readVcf( file, "hg19", param)
         MungeSumstats:::vcf2df(vcf)
}
BiocParallel::register(MulticoreParam(70, progressbar=TRUE))
system.time(
x <- reduceByRange( tiles, fl, MAP=MAP )
)
system.time({
newdf <- do.call(rbind, unlist(x, recursive=FALSE))
})
  • As I see it the use case is producing an 11.7M x 21 data.frame from the VCF.
  • Does the number of tiles matter? Yes. With 70 tiles on a large 80 core machine I can ingest and convert the ubm-a-2929 to a list of data.frames in 75 sec. Concatenating to a single data.frame takes an additional 65 sec. I did not see how to use REDUCE to accomplish this concatenation, not sure if time would be saved. With 140 or 280 tiles the jobs are smaller but you have to cycle around so the times go up a bit, to 83 and 111 sec respectively with multicore; the 140 tile job took 115 sec with snow
  • I did not see any difference between outputs of multicore and snow based runs so I assume we can just use multicore on non-windows platforms
  • a straight readVcf on the file took 177 seconds. I did not try to convert that large entity to a data.frame. Parallel ingestion alone, with no conversion to data.frame, was not impressively faster than doing the conversion too.

In summary, I personally have no basis for going into readVcf and trying to speed it up. It may be possible, but given that the original authors have left the project and there is a reasonable path to divide and conquer with the ingestion process, I am not inclined to do much more.

@bschilder
Copy link
Contributor Author

bschilder commented May 6, 2022

@mtmorgan

I know that for VariantAnnotation times can be strongly influenced by R's garbage collector, especially if it runs frequently as R consumes more memory, e.g., when first allocating a large number of character vectors.

I've noticed this as well, R seems to quickly get clogged up after calling readVcf a number of times. How would one go about addressing this? Simply run gc(), restarting the R session, or something else? Wondering if vcfR is somehow less susceptible to this?

Duly noted about vcfR::read.vcf only partially parsing the file.

@vjcitn
Thank you for testing this out. I had tried something very similar (with not much success) but it seems that matching ntile closer to the number of available cores has quite a large impact.

As far a reducing, data.table::rbindlist is quite useful for this, as it is much more efficient than rbind. Here's a modified version of your example:

Download VCF

URL <- "https://gwas.mrcieu.ac.uk/files/ieu-a-298/ieu-a-298.vcf.gz"
utils::download.file(URL, basename(URL))
utils::download.file(paste0(URL,".tbi"), paste0(basename(URL),".tbi"))
path <- basename(URL)

reduceByRange

vcf_file <- VariantAnnotation::VcfFile(file = path, 
                                       index = paste0(path,".tbi"))
ntile <- nThread <- 11
tiles <-
    GenomicRanges::seqinfo(vcf_file) |>
    GenomeInfoDb::keepSeqlevels(1:22) |>
    GenomicRanges::tileGenome(cut.last.tile.in.chrom = FALSE,
                              ntile = ntile)
MAP <- function(range,
                file,
                genome="HG19/GRCh37",
                as_datatable=TRUE,
                ...) { 
    param2 <- VariantAnnotation::ScanVcfParam(which = range) 
    vcf <- VariantAnnotation::readVcf(file = vcf_file,
                                      genome = genome,
                                      param = param2)
    if(as_datatable){
        return(MungeSumstats:::vcf2df(vcf = vcf,
                      add_sample_names = FALSE,
                      verbose = FALSE))
    } else {return(vcf)}
}
REDUCE <- data.table::rbindlist
system.time({
    vcf <- GenomicFiles::reduceByRange(ranges = tiles,
                                       files = vcf_file$path,
                                       MAP = MAP,
                                       REDUCE = REDUCE,
                                       iterate = TRUE)
    ## second round of rbind
    vcf <- data.table::rbindlist(vcf)
})  
  
# user  system elapsed 
# 3.826   0.019   3.847 

reduceByYield

I also attempted to optimise a reduceByYield strategy by first estimating yieldSize based on the max size of the tiles.

vcf_file <- VariantAnnotation::VcfFile(file = path, 
                                       index = paste0(path,".tbi"))
ntile <- nThread <- 11
tiles <-
    GenomicRanges::seqinfo(vcf_file) |>
    GenomeInfoDb::keepSeqlevels(1:22) |>
    GenomicRanges::tileGenome(cut.last.tile.in.chrom = FALSE, 
                              ntile = ntile) 
#### Construct VcfFile obj with yieldSize arg
gr <- unlist(tiles, use.names=FALSE)
GenomicRanges::mcols(gr)$width <- GenomicRanges::width(gr)
yieldSize <- max(gr$width, na.rm=TRUE)
vcf_file_y <- VariantAnnotation::VcfFile(file = vcf_file$path, 
                                         index = vcf_file$index,
                                         yieldSize = yieldSize)

system.time({
    vcf_out <- GenomicFiles::reduceByYield(X = vcf_file_y, 
                                           YIELD =  function(x,
                                                             ...){
                                               VariantAnnotation::readVcf(file = x,
                                                                          genome = "HG19/GRCh37")
                                           }, 
                                           MAP = function(value,
                                                          gr, 
                                                          as_datatable=TRUE,
                                                          ...){
                                               if(as_datatable){
                                                   return(MungeSumstats:::vcf2df(vcf = value, 
                                                                 add_sample_names = FALSE,
                                                                 verbose = FALSE))
                                               } else {return(value)}
                                           }, 
                                           REDUCE = `+`,
                                           gr = gr, 
                                           parallel = TRUE
    )
})

#    user  system elapsed 
# 14.336  15.926  30.807 

Unfortunately, this strategy (at least as I've formulated it here) seems to be slower than all the others.
When I tried to run this on the larger VCF (ubm-a-2929.vcf.gz), it didn't finish after >12 hours and froze R (had to restart my Rstudio Docker container).

readVcf (direct)

vcf_file <- VariantAnnotation::VcfFile(file = path, 
                                       index = paste0(path,".tbi"))
system.file({
    vcf <- VariantAnnotation::readVcf(vcf_file, genome = "hg19")
    df <- MungeSumstats:::vcf2df(vcf = vcf)
})

#   user  system elapsed 
#  0.690   0.007   0.698 

Summary

For small files, the overhead of the reduceByRange strategy seems to make it >5x slower than simply running readVcf directly. But perhaps reduceByRange is still useful for scaling VariantAnnotation up to large files (something that will become increasingly important for the bioinformatics community over time as VCF-stored datasets continue to get larger). Thus my bid for making VariantAnnotation::readVcf more efficient without each user developing their own wrappers independently (most of which likely won't have to time/expertise to come up with these strategies).

I do commiserate with the dilemma of not having staff that is paid to maintain/improve these core Bioconductor packages, as it's mainly done on a volunteer basis (from my understanding). It might be worth discussing in the future how this could be ameliorated by seeking dedicated funding sources. A discussion for another time!

I do greatly appreciate you both taking the time to investigate this and explain the underlying mechanisms.

@knausb
Copy link

knausb commented May 6, 2022

Hi @bschilder ,

VCF data can be seen as consisting of a meta region, a fixed region, and a gt region. An example of the gt region is as follows.

     FORMAT        NA00001          NA00002          NA00003       
[1,] "GT:GQ:DP:HQ" "0|0:48:1:51,51" "1|0:48:8:51,51" "1/1:43:5:.,."
[2,] "GT:GQ:DP:HQ" "0|0:49:3:58,50" "0|1:3:5:65,3"   "0/0:41:3"    
[3,] "GT:GQ:DP:HQ" "1|2:21:6:23,27" "2|1:2:0:18,2"   "2/2:35:4"    
[4,] "GT:GQ:DP:HQ" "0|0:54:7:56,60" "0|0:48:4:51,51" "0/0:61:2"    
[5,] "GT:GQ:DP"    "0/1:35:4"       "0/2:17:2"       "1/1:40:3"  

This is a tab delimited, variant (row) by sample (column) matrix of data. The first column specifies the format for the subsequent sample columns. The value of each cell is further delimited by colons.

In vcfR the file is read in with vcfR::read.vcfR(). This results in a vcfR object where the gt data is in the @gt slot, which retains the variant by sample geometry. It parses the tab delimited data but does not parse the colon delimited data. The data in this slot is not really accessible because it is still colon delimited. The function vcfR::extract.gt() parses this information in an 'as needed' manner. Both vcfR::read.vcfR() and vcfR::extract.gt() use Rcpp and C++ to improve performance.

I am less familiar with VariantAnnotation::readVcf(). But this is the "VariantAnnotation" repo, so I'm going to assume you understand it better than I do. Last I knew, VariantAnnotation::readVcf() parses all of the information from the gt region to make it all accessible to the user. If the user desires anything less than 'everything' then they should expect a performance increase from vcfR simply because they are asking for less work to be done.

Hope that helps!
Brian

@mtmorgan
Copy link
Contributor

mtmorgan commented May 6, 2022

The garbage collector problems I mentioned are actually at the other end of things, when R is growing its memory use in response to successive allocation requests; you can see the garbage collector in action by placing gcinfo(TRUE) at the top of a script. My timings for readVcf / read.vcf, differing so much from yours, were in a long-running R session where memory had been allocated and could be reused. Maybe you wrote a script and ran it in a new R session, and the way the benchmarking works is that the first function gets called first -- readVcf gets charged the cost of R memory allocation, read.vcf gets the benefit of reusing allocated memory. You can see this effect in a new R session

> vcffile <- "ubm-a-2929.vcf.gz"
> system.time(readVcf(vcffile))
   user  system elapsed 
162.065  45.629 226.222 
> system.time(readVcf(vcffile))
   user  system elapsed 
119.184  15.053 140.779 

I also ran this again and walked away; my computer went to low-power mode and took 640s so timings, especially on personal computers, are very susceptible to other processes. R memory management is outside the scope of VariantAnnotation. For what it's worth I ran the no-op

$ time bcftools view ubm-a-2929.vcf.gz > /dev/null
31.22s user 0.18s system 99% cpu 31.456 total

so times for the extra work done by readVcf() are not extraordinarily long.

For reduceByYield(), yieldSize= is the number of records, rather than the number of nucleotides. A typical value might be 100000. Your choice meant that R tried to read the entire VCF file, and then serialize the object in memory, and then send the object to one worker for deserialization. Probably you were running out of memory. Also as noted in the other issue, reduceByYield() would not be a good choice for this situation, where there is little 'work' being done on each chunk.

@vjcitn MAP maps across ranges, REDUCE is meant to perform a reduction across files, so is not relevant for the discussions here. I'll look at and update the documentation.

@mtmorgan
Copy link
Contributor

To summarize, some degree of performance improvement with large VCF files (e.g., millions of records) can be obtained with GenomicFiles::reduceByRange() and GenomicRanges::tileGenome(). This strategy is most effective when the number of tiles equals the number of cores available (memory permitting) and the MAP phase transforms a large VCF object into a smaller result.

There will not be effort in the near term to improve performance of VCF parsing to VCF objects.

@bschilder
Copy link
Contributor Author

bschilder commented May 13, 2022

To summarize, some degree of performance improvement with large VCF files (e.g., millions of records) can be obtained with GenomicFiles::reduceByRange() and GenomicRanges::tileGenome(). This strategy is most effective when the number of tiles equals the number of cores available (memory permitting) and the MAP phase transforms a large VCF object into a smaller result.

There will not be effort in the near term to improve performance of VCF parsing to VCF objects.

I've gone ahead implemented this functionality into MungeSumstats via the internal function MungeSumstats:::read_vcf_parallel which is called by the exported functions MungeSumstats::read_vcf and MungeSumstats::read_sumstats.

We will be pushing these changes to Bioconductor shortly. @Al-Murphy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants