Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CreateHadoopBamSplittingIndex on cram? #4506

Closed
jjfarrell opened this issue Mar 7, 2018 · 21 comments
Closed

CreateHadoopBamSplittingIndex on cram? #4506

jjfarrell opened this issue Mar 7, 2018 · 21 comments
Labels

Comments

@jjfarrell
Copy link

Should the CreateHadoopBamSplittingIndex tool also work on a cram? I am getting this error below which suggests not. What are the benefits of a SplittingIndex for a spark job? On average-how long should it take a spark job to get the splits for a 30x bam or cram?

gatk CreateHadoopBamSplittingIndex -I adni/cram/ADNI_002_S_0413.hg38.realign.bqsr.cram
Using GATK jar /share/pkg/gatk/4.0.1.1/install/bin/gatk-package-4.0.1.1-local.jar
Running:
    java -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=true -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=1 -jar /share/pkg/gatk/4.0.1.1/install/bin/gatk-package-4.0.1.1-local.jar CreateHadoopBamSplittingIndex -I adni/cram/ADNI_002_S_0413.hg38.realign.bqsr.cram
11:47:53.243 INFO  NativeLibraryLoader - Loading libgkl_compression.so from jar:file:/share/pkg/gatk/4.0.1.1/install/bin/gatk-package-4.0.1.1-local.jar!/com/intel/gkl/native/libgkl_compression.so
11:47:53.455 INFO  CreateHadoopBamSplittingIndex - ------------------------------------------------------------
11:47:53.455 INFO  CreateHadoopBamSplittingIndex - The Genome Analysis Toolkit (GATK) v4.0.1.1
11:47:53.455 INFO  CreateHadoopBamSplittingIndex - For support and documentation go to https://software.broadinstitute.org/gatk/
11:47:53.455 INFO  CreateHadoopBamSplittingIndex - Executing as farrell@scc-hadoop.bu.edu on Linux v2.6.32-696.10.3.el6.x86_64 amd64
11:47:53.456 INFO  CreateHadoopBamSplittingIndex - Java runtime: Java HotSpot(TM) 64-Bit Server VM v1.8.0_151-b12
11:47:53.456 INFO  CreateHadoopBamSplittingIndex - Start Date/Time: March 7, 2018 11:47:52 AM EST
11:47:53.456 INFO  CreateHadoopBamSplittingIndex - ------------------------------------------------------------
11:47:53.456 INFO  CreateHadoopBamSplittingIndex - ------------------------------------------------------------
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - HTSJDK Version: 2.14.1
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - Picard Version: 2.17.2
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - HTSJDK Defaults.COMPRESSION_LEVEL : 1
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - HTSJDK Defaults.USE_ASYNC_IO_READ_FOR_SAMTOOLS : false
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - HTSJDK Defaults.USE_ASYNC_IO_WRITE_FOR_SAMTOOLS : true
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - HTSJDK Defaults.USE_ASYNC_IO_WRITE_FOR_TRIBBLE : false
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - Deflater: IntelDeflater
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - Inflater: IntelInflater
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - GCS max retries/reopens: 20
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - Using google-cloud-java patch 6d11bef1c81f885c26b2b56c8616b7a705171e4f from https://github.com/droazen/google-cloud-java/tree/dr_all_nio_fixes
11:47:53.457 INFO  CreateHadoopBamSplittingIndex - Initializing engine
11:47:53.458 INFO  CreateHadoopBamSplittingIndex - Done initializing engine
11:47:53.463 INFO  CreateHadoopBamSplittingIndex - Shutting down engine
[March 7, 2018 11:47:53 AM EST] org.broadinstitute.hellbender.tools.spark.CreateHadoopBamSplittingIndex done. Elapsed time: 0.01 minutes.
Runtime.totalMemory()=1115160576
***********************************************************************

A USER ERROR has occurred: Bad input: A splitting index is only relevant for a bam file, but a file with extension cram was specified.

***********************************************************************
Set the system property GATK_STACKTRACE_ON_USER_EXCEPTION (--java-options '-DGATK_STACKTRACE_ON_USER_EXCEPTION=true') to print the stack trace.
@lbergelson
Copy link
Member

@jjfarrell You don't need splitting index for cram. The index works around a bam specific problem which makes it hard to find good split points in the file. Cram is designed in a way that makes it easier to find the split points so the index is unnecessary.

I don't have good numbers for how long it takes to find the split points for bam. It depends on your filesystem. If you have a low latency file system like a local disk or hdfs setup than finding split points takes very little time (~seconds), but if you have a high latency file system like something backed by google object store then finding split points may take a long time (on the order of minutes to tens of minutes depending on latency and file size).

@jjfarrell
Copy link
Author

Thanks for the explanation!

@lbergelson
Copy link
Member

We're working on speeding up the splitting on cloud filesystems, but it's going to take a while before we have a good solution other than the splitting index.

@jjfarrell
Copy link
Author

jjfarrell commented Mar 7, 2018

On our hadoop system, there is a long delay of about 30+ minutes before the tasks begin. See delay in log in job log between 13:24 and 13:59. Once the tasks start, it takes a few minutes. During the delay, the executors are not active and waiting for the tasks to start. Just surprised on how long getting the splits are taking....

This is the commandline...
gatk FlagStatSpark --input adni/cram/ADNI_002_S_0413.hg38.realign.bqsr.cram --reference file:///restricted/projectnb/casa/ref/GRCh38_full_analysis_set_plus_decoy_hla.fa -- --spark-runner SPARK --spark-master yarn --executor-memory 48G --driver-memory 16g --driver-cores 2 --executor-cores 8 --num-executors 8

18/03/07 13:24:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q14.scc.bu.edu:42456 with 25.4 GB RAM, BlockManagerId(2, scc-q14.scc.bu.edu, 42456, None)
18/03/07 13:24:27 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 247.0 KB, free 8.4 GB)
18/03/07 13:24:28 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.5 KB, free 8.4 GB)
18/03/07 13:24:28 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.48.225.55:32895 (size: 25.5 KB, free: 8.4 GB)
18/03/07 13:24:28 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopFile at ReadsSparkSource.java:112
18/03/07 13:24:28 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 7164 for farrell on ha-hdfs:scc
18/03/07 13:24:28 INFO security.TokenCache: Got dt for hdfs://scc; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:scc, Ident: (HDFS_DELEGATION_TOKEN token 7164 for farrell)
18/03/07 13:24:28 INFO input.FileInputFormat: Total input paths to process : 1
18/03/07 13:59:26 INFO spark.SparkContext: Starting job: aggregate at FlagStatSpark.java:73
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Got job 0 (aggregate at FlagStatSpark.java:73) with 252 output partitions
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (aggregate at FlagStatSpark.java:73)
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Missing parents: List()
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at filter at GATKSparkTool.java:220), which has no missing parents
18/03/07 13:59:26 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1148.4 KB, free 8.4 GB)
18/03/07 13:59:26 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 345.8 KB, free 8.4 GB)
18/03/07 13:59:26 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.48.225.55:32895 (size: 345.8 KB, free: 8.4 GB)
18/03/07 13:59:26 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
18/03/07 13:59:26 INFO scheduler.DAGScheduler: Submitting 252 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at filter at GATKSparkTool.java:220)
18/03/07 13:59:26 INFO cluster.YarnScheduler: Adding task set 0.0 with 252 tasks
18/03/07 13:59:26 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 0, scc-q05.scc.bu.edu, executor 4, partition 2, NODE_LOCAL, 6147 bytes)
18/03/07 13:59:26 INFO scheduler.TaskSetManager: Starting task 25.0 in stage 0.0 (TID 1, scc-q10.scc.bu.edu, executor 3, partition 25, NODE_LOCAL, 6147 bytes)
18/03/07 13:59:26 INFO scheduler.TaskSetManager: Starting task 15.0 in stage 0.0 (TID 2, scc-q07.scc.bu.edu, executor 1, partition 15, NODE_LOCAL, 6147 bytes)
18/03/07 13:59:26 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 3, scc-q16.scc.bu.edu, executor 8, partition 4, NODE_LOCAL, 6147 bytes)
18/03/07 13:59:26 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 4, scc-q14.scc.bu.edu, executor 2, partition 3, NODE_LOCAL, 6147 bytes)

@jjfarrell jjfarrell reopened this Mar 7, 2018
@lbergelson
Copy link
Member

@jjfarrell Huh. I expected that sort of annoying delay from splitting on a cloud system but not on a hadoop one. Does running with splitting index avoid the delay?

@jjfarrell
Copy link
Author

jjfarrell commented Mar 8, 2018

@lbergelson

I ran the FlagStatSpark tool on a bam with a splitting-index on the hdfs. It ran blazing fast with no delay at all with a total time of 1m41s.

So it looks like it must be related to the splits on the cram. I see a similar delay(30-40 min) when testing the StructuralVariationDiscoveryPipelineSpark jobs on 50 crams.

Below are some excerpts from the log of the fast FlagStatSpark on the bam.

No delay and tasks start right up....

18/03/07 20:31:40 INFO cluster.YarnClientSchedulerBackend: Application application_1507317228518_0369 has started running.
18/03/07 20:31:40 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41567.
18/03/07 20:31:40 INFO netty.NettyBlockTransferService: Server created on 10.48.225.55:41567
18/03/07 20:31:40 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/03/07 20:31:40 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.48.225.55, 41567, None)
18/03/07 20:31:40 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.48.225.55:41567 with 8.4 GB RAM, BlockManagerId(driver, 10.48.225.55, 41567, None)
18/03/07 20:31:40 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.48.225.55, 41567, None)
18/03/07 20:31:40 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.48.225.55, 41567, None)
18/03/07 20:31:40 INFO handler.ContextHandler: Started o.e.j.s.ServletContextHandler@4d9cf71d{/metrics/json,null,AVAILABLE}
18/03/07 20:31:48 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.186:36002) with ID 8
18/03/07 20:31:48 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.196:45956) with ID 2
18/03/07 20:31:48 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q02.scc.bu.edu:37393 with 25.4 GB RAM, BlockManagerId(8, scc-q02.scc.bu.edu, 37393, None)
18/03/07 20:31:48 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.197:57832) with ID 3
18/03/07 20:31:48 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q12.scc.bu.edu:36422 with 25.4 GB RAM, BlockManagerId(2, scc-q12.scc.bu.edu, 36422, None)
18/03/07 20:31:48 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q13.scc.bu.edu:42480 with 25.4 GB RAM, BlockManagerId(3, scc-q13.scc.bu.edu, 42480, None)
18/03/07 20:31:48 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.190:34084) with ID 5
18/03/07 20:31:48 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q06.scc.bu.edu:39736 with 25.4 GB RAM, BlockManagerId(5, scc-q06.scc.bu.edu, 39736, None)
18/03/07 20:31:48 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.193:34094) with ID 1
18/03/07 20:31:48 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q09.scc.bu.edu:38854 with 25.4 GB RAM, BlockManagerId(1, scc-q09.scc.bu.edu, 38854, None)
18/03/07 20:31:49 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.187:33854) with ID 4
18/03/07 20:31:49 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.198:41138) with ID 7
18/03/07 20:31:49 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q03.scc.bu.edu:35635 with 25.4 GB RAM, BlockManagerId(4, scc-q03.scc.bu.edu, 35635, None)
18/03/07 20:31:49 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
18/03/07 20:31:49 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q14.scc.bu.edu:36726 with 25.4 GB RAM, BlockManagerId(7, scc-q14.scc.bu.edu, 36726, None)
18/03/07 20:31:49 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.18.195:47862) with ID 6
18/03/07 20:31:49 INFO storage.BlockManagerMasterEndpoint: Registering block manager scc-q11.scc.bu.edu:46002 with 25.4 GB RAM, BlockManagerId(6, scc-q11.scc.bu.edu, 46002, None)
18/03/07 20:31:49 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 246.6 KB, free 8.4 GB)
18/03/07 20:31:50 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.3 KB, free 8.4 GB)
18/03/07 20:31:50 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.48.225.55:41567 (size: 25.3 KB, free: 8.4 GB)
18/03/07 20:31:50 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopFile at ReadsSparkSource.java:112
18/03/07 20:31:50 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 7175 for farrell on ha-hdfs:scc
18/03/07 20:31:50 INFO security.TokenCache: Got dt for hdfs://scc; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:scc, Ident: (HDFS_DELEGATION_TOKEN token 7175 for farrell)
18/03/07 20:31:50 INFO input.FileInputFormat: Total input paths to process : 1
18/03/07 20:31:51 INFO spark.SparkContext: Starting job: aggregate at FlagStatSpark.java:73
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Got job 0 (aggregate at FlagStatSpark.java:73) with 629 output partitions
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (aggregate at FlagStatSpark.java:73)
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Missing parents: List()
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at filter at GATKSparkTool.java:220), which has no missing parents
18/03/07 20:31:51 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 53.9 KB, free 8.4 GB)
18/03/07 20:31:51 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 15.5 KB, free 8.4 GB)
18/03/07 20:31:51 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.48.225.55:41567 (size: 15.5 KB, free: 8.4 GB)
18/03/07 20:31:51 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
18/03/07 20:31:51 INFO scheduler.DAGScheduler: Submitting 629 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at filter at GATKSparkTool.java:220)
18/03/07 20:31:51 INFO cluster.YarnScheduler: Adding task set 0.0 with 629 tasks
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 0, scc-q03.scc.bu.edu, executor 4, partition 1, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 0.0 (TID 1, scc-q13.scc.bu.edu, executor 3, partition 6, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 2, scc-q14.scc.bu.edu, executor 7, partition 4, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 0.0 (TID 3, scc-q12.scc.bu.edu, executor 2, partition 5, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 4, scc-q09.scc.bu.edu, executor 1, partition 2, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 17.0 in stage 0.0 (TID 5, scc-q11.scc.bu.edu, executor 6, partition 17, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 6, scc-q06.scc.bu.edu, executor 5, partition 0, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 7, scc-q02.scc.bu.edu, executor 8, partition 3, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 13.0 in stage 0.0 (TID 8, scc-q03.scc.bu.edu, executor 4, partition 13, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 0.0 (TID 9, scc-q13.scc.bu.edu, executor 3, partition 7, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 0.0 (TID 10, scc-q14.scc.bu.edu, executor 7, partition 8, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 14.0 in stage 0.0 (TID 11, scc-q12.scc.bu.edu, executor 2, partition 14, NODE_LOCAL, 6107 bytes)
18/03/07 20:31:51 INFO scheduler.TaskSetManager: Starting task 9.0 in stage 0.0 (TID 12, scc-q09.scc.bu.edu, executor 1, parti

Processed 1,2 billion reads in less than 2 minutes.....

18/03/07 20:32:55 INFO scheduler.DAGScheduler: Job 0 finished: aggregate at FlagStatSpark.java:73, took 64.566359 s
1205535516 in total
0 QC failure
37791118 duplicates
1157122594 mapped (95.98%)
1205535516 paired in sequencing
602767758 read1
602767758 read2
1145853318 properly paired (95.05%)
1150449216 with itself and mate mapped
6673378 singletons (0.55%)
4595898 with mate mapped to a different chr
3316623 with mate mapped to a different chr (mapQ>=5)
18/03/07 20:32:55 INFO server.ServerConnector: Stopped ServerConnector@79f5a6ed{HTTP/1.1}{0.0.0.0:4041}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@221ca495{/stages/stage/kill,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@28b458e6{/jobs/job/kill,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@3ccb12d{/api,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@1544ded3{/,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@537b3b2e{/static,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@30036a18{/executors/threadDump/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@2cc75b25{/executors/threadDump,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@2bcda694{/executors/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@37c1cfa{/executors,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@69feb4d9{/environment/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@23648d2d{/environment,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@d84b3a2{/storage/rdd/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@7cc2c551{/storage/rdd,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@10fb4575{/storage/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@ac4915e{/storage,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@6682e6a5{/stages/pool/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@19eea77c{/stages/pool,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@11e17893{/stages/stage/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@64a1116a{/stages/stage,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@c5e69a5{/stages/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@10131289{/stages,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@50f4b83d{/jobs/job/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@5d66ae3a{/jobs/job,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@30159886{/jobs/json,null,UNAVAILABLE}
18/03/07 20:32:55 INFO handler.ContextHandler: Stopped o.e.j.s.ServletContextHandler@33de7f3d{/jobs,null,UNAVAILABLE}
18/03/07 20:32:55 INFO ui.SparkUI: Stopped Spark web UI at http://10.48.225.55:4041
18/03/07 20:32:55 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
18/03/07 20:32:55 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
18/03/07 20:32:55 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
18/03/07 20:32:55 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/03/07 20:32:55 INFO cluster.YarnClientSchedulerBackend: Stopped
18/03/07 20:32:55 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/03/07 20:32:55 INFO memory.MemoryStore: MemoryStore cleared
18/03/07 20:32:55 INFO storage.BlockManager: BlockManager stopped
18/03/07 20:32:55 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
18/03/07 20:32:55 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/03/07 20:32:55 INFO spark.SparkContext: Successfully stopped SparkContext
20:32:55.769 INFO  FlagStatSpark - Shutting down engine
[March 7, 2018 8:32:55 PM EST] org.broadinstitute.hellbender.tools.spark.pipelines.FlagStatSpark done. Elapsed time: 1.60 minutes.
Runtime.totalMemory()=2091384832
18/03/07 20:32:55 INFO util.ShutdownHookManager: Shutdown hook called
18/03/07 20:32:55 INFO util.ShutdownHookManager: Deleting directory /tmp/farrell/spark-9e0f0525-00f3-4b37-b1d2-4cf55b4c8cb0

real    1m41.113s
user    0m49.698s
sys     0m4.432s

@jjfarrell
Copy link
Author

FlagStatSpark was also run on the bam file without the splitting index. There was no delay just a slightly longer run of 2.5 min. Both had the same results. The 30-40 minute delay is only found when reading crams. @lbergelson Is there a fix for this long delay for processing a cram?

@cmnbroad
Copy link
Collaborator

@lbergelson @jjfarrell I'm not sure how much of the difference this accounts for, but the cram splitter iterates through all of the cram containers using htsjdk's CramContainerIterator, which decodes and materializes all SAMRecords in each container it sees. The bam (probabilistic) splitter only materializes a few records around each putative split boundary. And decoding cram is inherently slower than bam to start with.

@jjfarrell
Copy link
Author

@cmnbroad @lbergelson For Spark tools, shouldn't the cram-splitter be using Hadoop-Bam and not htsjdk's CramContainerIterator? That would probably explain the 30-40 minute extra time for cram splits versus bam splits.

@cmnbroad
Copy link
Collaborator

@jjfarrell Yes, but Hadoop-Bam in turn uses htsjdk.

@jjfarrell
Copy link
Author

jjfarrell commented Mar 14, 2018

@cmnbroad @lbergelson The cram index looks like it has all the info required to generate the splits without using the CramContainerIterator to look at the cram file directly.

Could using the crai index for splits be a potential solution to the glacially slow cram split generation?

CRAM index

A CRAM index is a gzipped tab delimited file containing the following columns:

  1. Sequence id
  2. Alignment start
  3. Alignment span
  4. Container start byte offset in the file
  5. Slice start byte offset in the container data (‘blocks’)
  6. Slice bytes
    Each line represents a slice in the CRAM file. Please note that all slices must be listed in index file.

In Hadoop-bam this code could read the crai instead of the cram to find the container boundaries.

public List getSplits(List splits, Configuration conf)
throws IOException {
// update splits to align with CRAM container boundaries
List newSplits = new ArrayList();
Map<Path, List> fileToOffsets = new HashMap<Path, List>();
for (InputSplit split : splits) {
FileSplit fileSplit = (FileSplit) split;
Path path = fileSplit.getPath();
List containerOffsets = fileToOffsets.get(path);
if (containerOffsets == null) {
containerOffsets = getContainerOffsets(conf, path);
fileToOffsets.put(path, containerOffsets);
}
long newStart = nextContainerOffset(containerOffsets, fileSplit.getStart());
long newEnd = nextContainerOffset(containerOffsets, fileSplit.getStart() +
fileSplit.getLength());
long newLength = newEnd - newStart;
if (newLength == 0) { // split is wholly within a container
continue;
}
FileSplit newSplit = new FileSplit(fileSplit.getPath(), newStart, newLength,
fileSplit.getLocations());
newSplits.add(newSplit);
}
return newSplits;
}

@tomwhite
Copy link
Contributor

@jjfarrell I agree that it would make a lot more sense to use the .crai file. That's something we could certainly do in Hadoop-BAM or the new code.

@cmnbroad does CramContainerIterator materialize each record? I was under the impression that it was just finding boundaries - but the slow runtime suggests it may not.

@fnothaft
Copy link
Contributor

@jjfarrell I agree that it would make a lot more sense to use the .crai file. That's something we could certainly do in Hadoop-BAM or the new code.

We've already got code that does this for BAM indices; even if CRAIs have a different API, they should be able to reuse the split calculation code.

@cmnbroad
Copy link
Collaborator

@tomwhite I just took a look and I did overstate the case when I said CramContainerIterator materializes SAMRecords. It stops short of doing that, but it does crack each container open and iterate through and decompress each data block in each slice in each container as it goes along. Its not clear to me how much this affects the difference in split calc time vs. bam.

@tomwhite
Copy link
Contributor

tomwhite commented Apr 5, 2018

I wrote an alternative to CramContainerIterator called CramContainerHeaderIterator that doesn't decompress blocks:

https://github.com/tomwhite/Hadoop-BAM/blob/new-bam-other-formats/src/main/java/org/seqdoop/hadoop_bam/spark/CramContainerHeaderIterator.java

On a 6GB CRAM file, CramContainerIterator took around 10 minutes to read all the offsets, while CramContainerHeaderIterator took less than a minute. Using the .crai file took less than a minute too. This was on HDFS; I haven't tried on cloud yet.

So I think we should use the .crai file if it's available, and fallback to CramContainerHeaderIterator.

@cmnbroad
Copy link
Collaborator

cmnbroad commented Apr 5, 2018

@tomwhite That sounds like a pretty good improvement! For reasons that are not clear to me, htsjdk doesn't generate .crai index files, only .bai, so we'd definitely want something like the CramContainerHeaderIterator method for those.

One other thought that occurs to me is that we should think about how to ensure that mates are kept together for CRAM. The spec doesn't require that mates be contained in the same slice, and since the default slices-per-container for both htslib and htsjdk is 1, they don't even have to be in the same container.

@tomwhite
Copy link
Contributor

tomwhite commented Apr 6, 2018

@cmnbroad do you know what would be needed for htsjdk to generate .crai files? There's a CRAMCRAIIndexer#writeIndex method that should do it.

Keeping mates in a pair together is something we already do in GATK (https://github.com/broadinstitute/gatk/blob/master/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java#L216), but it would make sense to keep it in the new Spark code in #196.

@cmnbroad
Copy link
Collaborator

cmnbroad commented Apr 6, 2018

@tomwhite Yes, it has to do with htsjdk crai->bai conversion. For some reason, the original CRAM implementation used the bai structure internally to satisfy CRAM queries, instead of crai, probably because that was easier than writing a native crai implementation. It writes .bai, prefers .bai on read, and accepts .crai as a fallback, but auto-converts crai internally to bai. A while back I went to implement .crai write support, but in doing so discovered a bug in the crai->bai conversion that can't be fixed without a change to the crai spec. Since that bug only exists as a result of conversion (and not in .bai files that are written natively by htsjdk), we decided to hold off on enabling crai writing until either the spec changed, or we had a query implementation that used crai directly.

@jjfarrell
Copy link
Author

There is a initial release of the faster and more accurate replacement for Hadoop-Bam at:

https://github.com/disq-bio/disq

It would be great to see faster reading of crams in spark GATK with this. Any plans for testing this release?

@lbergelson
Copy link
Member

@jjfarrell Yes, we're working on migrating to disq. See #5138

@jjfarrell
Copy link
Author

4.1.0.0 version with disq is reading crams much faster!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants