New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed of Reading into ADAM RDDs from S3 #2003

Closed
nick-phillips opened this Issue Jun 26, 2018 · 5 comments

Comments

Projects
None yet
3 participants
@nick-phillips
Copy link

nick-phillips commented Jun 26, 2018

Hi all,

We're using ADAM via the Python API, and we're running into some bottlenecks loading data from S3 using s3a. We're experiencing a max throughput of about 100mbps when reading into ADAM RDDs from S3, for bams and vcfs. Loading the same files into Spark as textfiles is ~1gbps. I realize many factors could affect this performance, but are these numbers ballpark of what's expected for this use case of ADAM? If not, are there recommended troubleshooting steps?

Could provide more info if needed.

Thanks!

@heuermh

This comment has been minimized.

Copy link
Member

heuermh commented Jun 26, 2018

Hello @nick-phillips, thanks for the question.

Things have been strange for me and others reading BAM and VCF from S3 via s3a recently. Parquet works fine though. See #1951

Perhaps it might be useful discussing this further on gitter? Feel free to start a one-on-one if there is anything sensitive about your environment.

@pjongeneel

This comment has been minimized.

Copy link

pjongeneel commented Jun 27, 2018

@heuermh - would you care to elaborate on "Things have been strange for me and others reading BAM and VCF from S3 via s3a recently."

Also I am attempting to load vcfs via parquet via the python API, and as you suggested for @nick-phillips, and have saved a vcf I loaded tp parquet via

df=adamContext.loadVariants(path).toDF()
df.write.format("parquet").save("s3a://my_bucket/df.parquet").saveMetadata("s3a://my_bucket/df.parquet")

when I try to load this however,
df2 = adamContext.loadVariants("s3a://my_bucket/df.parquet").toDF()

I get the following errors.

Py4JJavaError: An error occurred while calling o72.loadVariants.
: java.io.FileNotFoundException: Couldn't find any files matching s3a://my_bucket/df.parquet for the requested PathFilter
at org.bdgenomics.adam.rdd.ADAMContext.getFsAndFilesWithFilter(ADAMContext.scala:1427)
at org.bdgenomics.adam.rdd.ADAMContext.loadAvroSequenceDictionary(ADAMContext.scala:1225)
at org.bdgenomics.adam.rdd.ADAMContext.loadParquetVariants(ADAMContext.scala:2416)
at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVariants$1.apply(ADAMContext.scala:3153)
at org.bdgenomics.adam.rdd.ADAMContext$$anonfun$loadVariants$1.apply(ADAMContext.scala:3148)
at scala.Option.fold(Option.scala:158)
at org.apache.spark.rdd.Timer.time(Timer.scala:48)
at org.bdgenomics.adam.rdd.ADAMContext.loadVariants(ADAMContext.scala:3146)
at org.bdgenomics.adam.api.java.JavaADAMContext.loadVariants(JavaADAMContext.scala:387)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

My parquet vcf exists as a directory on s3 with many partitioned files, and I did verify there was the SUCCESS file, but although the documentation says that loadVariants in the python API supports parquet, it can't seem to load it with the s3a protocol. Is there something I am missing here?

@heuermh

This comment has been minimized.

Copy link
Member

heuermh commented Jun 27, 2018

would you care to elaborate on "Things have been strange for me and others reading BAM and VCF from S3 via s3a recently."

@pjongeneel It's in the linked issue, there are thread leaks upstream in Hadoop libraries that cause trouble.

df=adamContext.loadVariants(path).toDF()
df.write.format("parquet")
  .save("s3a://my_bucket/df.parquet")
  .saveMetadata("s3a://my_bucket/df.parquet")

I haven't tried writing the DataFrame directly to Parquet before this way, I'm not sure the metadata will work correctly.

You can write to Parquet via the VariantContextRDD returned via the load call, without converting to DataFrame first

adamContext.loadVariants(path).saveAsParquet("s3a://my_bucket/df.parquet")
@pjongeneel

This comment has been minimized.

Copy link

pjongeneel commented Jun 27, 2018

@heuermh , thanks for the info, I tried that and it worked fine!

Side note: I actually got the original code to save my dataframe from the ADAM scala api

override def saveAsParquet(filePath: String,
blockSize: Int = 128 * 1024 * 1024,
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
log.info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
.option("spark.sql.parquet.compression.codec", compressCodec.toString.toLowerCase())
.save(filePath)
saveMetadata(filePath)
}

however when I saved it manually like that, I got
df.parquet/part--174521dd-7ee0-4693-ae76-8edf140340f1-c000.snappy.parquet files
and when I saved via the adam context I got
df.parquet/part-r-
.gz.parquet files.

Not sure yet if there is an easy way to save the dataframe directly as the .gz.parquet files but I have a solution that works for now, so thank you!

@heuermh heuermh self-assigned this Jul 3, 2018

@heuermh

This comment has been minimized.

Copy link
Member

heuermh commented Sep 21, 2018

Closing as resolved.

@heuermh heuermh closed this Sep 21, 2018

@heuermh heuermh added this to the 0.24.1 milestone Sep 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment