Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default compression method for flat files #36

Closed
DyfanJones opened this issue Dec 3, 2019 · 25 comments
Closed

Default compression method for flat files #36

DyfanJones opened this issue Dec 3, 2019 · 25 comments
Labels
enhancement New feature or request question Further information is requested

Comments

@DyfanJones
Copy link
Owner

DyfanJones commented Dec 3, 2019

Currently RAthena and noctua support gzip compression when uploading data to S3 and Athena. Is there a better compression algorithm for flat files? Top 10 Performance Tuning Tips for Amazon Athena

Algorithm Splittable? Compression ratio Compress + Decompress speed
Gzip (DEFLATE) No High Medium
bzip2 Yes Very high Slow
LZO No Low Fast
Snappy No Low Very fast

For Athena, we recommend using either Apache Parquet or Apache ORC, which compress data by default and are splittable. When they are not an option, then try BZip2 or Gzip with an optimal file size.

From this it looks like BZIP2/GZIP are currently recommended. Might need to benchmark speed of BZip2 and GZIP files when reading from Athena

@DyfanJones DyfanJones changed the title Default compression method for flat fllies Default compression method for flat files Dec 3, 2019
@DyfanJones DyfanJones added the question Further information is requested label Dec 3, 2019
@OssiLehtinen
Copy link

This reply is related to this question and also the other issue about glue and parquet conversion.

The two main concerns for me have been performance and compatibility.

As for csv compression, I've been testing gzip and bzip2 compressions, but ended up giving up on both and just writing uncompressed csv:s.

The issue with gzip is that it is not 'splittable', meaning a single worker node in Athena/Presto needs to take care of uncompressing, which occasionally leads to a performance bottlenecks especially with large files (and for small files compression is unnecessary). Of course would could split the data prior to compression and upload multiple gzipped csv's.

The issue with bzip2 is, that the compression is much slower, which again was a showstopper for me.

My solution:

  • If the content is not huge, an uncompressed csv is good enough
  • If the data is large and compression is necessary, go for parquet or orc (there I've been using the default compression used by Athena)

I'll continue about an option how to create parq's and orcs in the other issue (#37).

@DyfanJones
Copy link
Owner Author

Thanks @OssiLehtinen for the extra insight. Currently Rathena and noctua supports in addition to gzip compression:

  • partitioning of files when uploading to Athena and S3
  • parquet format through the arrow package

If user is concerned about performance then parquet is the way forward. Agreed parquet and orcs formats can be discussed in issue (#37).

For the time being I believe the option to create compressed flat files should be available for users however I might include Top 10 Performance Tuning Tips for Amazon Athena in the readme documentation to help users set up s3 to get the most out of AWS Athena when using RAthena and noctua

@DyfanJones
Copy link
Owner Author

Correct me if I am wrong. In their current states RAthena and noctua do not need extra work when uploading flat files into AWS Athena and S3.

@OssiLehtinen
Copy link

I don't think any changes are required.

@OssiLehtinen
Copy link

Perhaps I'm missing existing functionality, but one way to address the splittability issue with gzipped files would be to split the data into pieces before writing and uploading to S3?

Or, I think, this can be achieved with the current setup by splitting the before hand data into partitions and using copy_top on each, but one could do this for the user automatically (if requested).

Something along the lines of having an optional parameter chunk_size* in copy_to and if this is set to an a positive integer, split the data to such pieces, and recursively run copy_to on each piece with the same destination, but a different file name (e.g., an incrementing suffix). You would end up with a pile of gzipped csv:s containing all the data, and the reading work could be split accordingly. Partitioning would be optional in this case as Athena is fine with having just a pile of files in the same S3 path.

@DyfanJones
Copy link
Owner Author

DyfanJones commented Dec 4, 2019

chunk_size could be built but the only problem is defining the partition. However something like this could be do able:

s3://path/to/bucket/chunk=1/data.csv.gzip
s3://path/to/bucket/chunk=2/data.csv.gzip
s3://path/to/bucket/chunk=3/data.csv.gzip

Only concern would be users setting the chunks too small and losing the benefits of partitioning data.

@OssiLehtinen
Copy link

One thing to keep in mind is that it is not necessary to define partitions when splitting the data. One can just have something like

s3://path/to/bucket/data_1.csv.gzip
s3://path/to/bucket/data_2.csv.gzip
s3://path/to/bucket/data_3.csv.gzip

and it will work just fine.

Having partitions can be useful on top of that. One idea would be to just give the partition column and have R automatically split the data by unique values in said column and do the partitioning.

@OssiLehtinen
Copy link

One more note on partitioning: this can be a limitation also.

Let's say one wishes to append data daily to a table. If one uploads the data in partitioned fashion, things will break down after 100 days, as only 100 partitions are allowed in Athena.

Dumping the data to just the same path without partitioning will not have such problem, however. Probably there will be performance issues eventually, once the number of files grows exceedingly large. Some tables I've been using have some 500 (smallish) files in them and they work OK.

@DyfanJones
Copy link
Owner Author

Correct me if I am wrong but it looks like there is a limitation of :

Number of partitions per table 10,000,000

https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html#limits_glue

@DyfanJones
Copy link
Owner Author

DyfanJones commented Dec 4, 2019

One thing to keep in mind is that it is not necessary to define partitions when splitting the data. One can just have something like

s3://path/to/bucket/data_1.csv.gzip
s3://path/to/bucket/data_2.csv.gzip
s3://path/to/bucket/data_3.csv.gzip

and it will work just fine.

Having partitions can be useful on top of that. One idea would be to just give the partition column and have R automatically split the data by unique values in said column and do the partitioning.

This method looks promising, will investigate a possible implementation. Will have to see if there is an increase in speed by this method.

@OssiLehtinen
Copy link

Correct me if I am wrong but it looks like there is a limitation of :

Number of partitions per table 10,000,000
https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html#limits_glue

Hmm I think I have mixed up things. Apparently the 100 partition limit applies specifically to CTAS queries. Sorry about the confusion!

https://docs.aws.amazon.com/athena/latest/ug/bucketing-vs-partitioning.html

@DyfanJones
Copy link
Owner Author

Not to worry, got alot of information. Going to test the possible solution for spliting gzip files into chunks

@DyfanJones
Copy link
Owner Author

DyfanJones commented Dec 5, 2019

Created branch https://github.com/DyfanJones/RAthena/tree/chunksize to investigate suggestion of max.batch for gzip compression in S3 folder format:

s3://path/to/bucket/data_1.csv.gzip
s3://path/to/bucket/data_2.csv.gzip
s3://path/to/bucket/data_3.csv.gzip

Initial finding:
data used

library(DBI)
library(data.table)

X <- 1e6

df <- data.table(x = 1:X,
                 y = sample(letters, X, replace = T), 
                 z = sample(c(TRUE, FALSE), X, replace = T))

con <- dbConnect(RAthena::athena())

dbWriteTable(con, "test_split1", df, compress = T)

dbWriteTable(con, "test_split2", df, compress = T, max.batch = 10000)

dbWriteTable(con, "test_split3", df, compress = T, max.batch = 100000)

AWS Athena performance results from AWS console (query executed: select count(*) from ....):

  • test_split1: (Run time: 1.73 seconds, Data scanned: 3.2 MB)
  • test_split2: (Run time: 0.63 seconds, Data scanned: 3.23 MB)
  • test_split3: (Run time: 0.39 seconds, Data scanned: 3.2 MB)

It appears there is a significant performance enhancement with splitting gzip file compression into batches. However splitting into too small i.e. 10,000 increased performance but there was over head in comparison to max.batch of 100,000

@DyfanJones
Copy link
Owner Author

This feels like a really good method for users who prefer to use csv.gzip files instead of parquet. Due to the significant increase in performance should RAthena and noctua have a default batch for gzip file compression, i.e. 100,000?

@DyfanJones DyfanJones added the enhancement New feature or request label Dec 5, 2019
@DyfanJones
Copy link
Owner Author

will do further testing on a larger data.frame

@OssiLehtinen
Copy link

Well, the performance tuning guideline from AWS says individual files shouldn't be much smaller than 128 MB, but not sure what that would mean in terms of line counts. Obviously depends linearly on the width of the table too.

Maybe a default of 100k would be ok. There's some overhead in splitting the data before writing, but maybe not that much?

@OssiLehtinen
Copy link

Still not sure if the 128 MB limit applies to compressed or uncompressed files...

@DyfanJones
Copy link
Owner Author

DyfanJones commented Dec 5, 2019

Ran another test to see what is the possible benefits of this method:

library(DBI)
library(data.table)

X <- 1e8

df <- data.table(x = 1:X,
                 y = sample(letters, X, replace = T), 
                 z = sample(c(TRUE, FALSE), X, replace = T))

con <- dbConnect(RAthena::athena())

dbWriteTable(con, "test_split1", df, compress = T, overwrite = T)

dbWriteTable(con, "test_split2", df, compress = T, max.batch = 1000000, overwrite = T)

dbWriteTable(con, "test_split3", df, compress = T, max.batch = 0.05 * nrow(df), overwrite = T)

AWS Athena performance results from AWS console (query executed: select count(*) from ....):

  • test_split1: (Run time: 17.4 seconds, Data scanned: 325.56 MB)
  • test_split2: (Run time: 3.52 seconds, Data scanned: 325.49 MB)
  • test_split3: (Run time: 2.44 seconds, Data scanned: 325.54 MB)

I believe there is a clear benefit of doing this method with gzip files. but i don't think a fixed max.batch is good enough. Possibly a minimum batch size can be set and then a dynamic one can be used.

@OssiLehtinen
Copy link

Interesting!

Could there be some optimal(ish) total number of pieces? Probably the number of workers a query gets varies, but these could be related.

@DyfanJones
Copy link
Owner Author

I am not too sure how to get that information. Plus i am not sure if it is obtainable through the SDK's. If this is the case the could look at object size and the determine the batch size. However I believe this feature should be implement in it's initial state and then future development can happen. There are clear benefits of this and a semi smart solution is better than no solution :)

@OssiLehtinen
Copy link

Yeah I was thinkin more in the direction of making a rough guess what a typical number of workers could be (let's say ten) and split to that many pieces.

@DyfanJones
Copy link
Owner Author

That is fairly easy to implement. Plus think a minimum batch can be set to help to prevent too small of files being created, and causing a large over head

@DyfanJones
Copy link
Owner Author

Last speed test:

library(DBI)
library(data.table)

X <- 1e8

df <- data.table(w =runif(X),
                 x = 1:X,
                 y = sample(letters, X, replace = T), 
                 z = sample(c(TRUE, FALSE), X, replace = T))

con <- dbConnect(RAthena::athena())

dbWriteTable(con, "test_split1", df, compress = T, max.batch = nrow(df), overwrite = T)

dbWriteTable(con, "test_split2", df, compress = T, max.batch = 0.05 * nrow(df), overwrite = T)

dbWriteTable(con, "test_split3", df, compress = T, max.batch = 0.1 * nrow(df), overwrite = T)

AWS Athena performance results from AWS console (query executed: select count(*) from ....):

  • test_split1: (Run time: 38.4 seconds, Data scanned: 1.16 GB)
  • test_split2: (Run time: 3.73 seconds, Data scanned: 1.16 GB)
  • test_split3: (Run time: 5.47 seconds, Data scanned: 1.16 GB)

From these findings I will use the 20 split as default for compressed flat files: #39

If there any objections with the default split I am happy to change it.

Overall this increase in performance is very promising and should make the user experience a lot smoother when working with AWS Athena

@DyfanJones
Copy link
Owner Author

@OssiLehtinen noticed that copy_to method didn't have support for compress. copy_to now has support for compress and the new max.batch functionality in dbWriteTable:

library(DBI)
library(dplyr)

con <- dbConnect(RAthena::athena())

X <- 1e8

example_df <- data.table(w =runif(X),
                 x = 1:X,
                 y = sample(letters, X, replace = T), 
                 z = sample(c(TRUE, FALSE), X, replace = T))

copy_to(con, example_df, compress = T)

This will split the example into 20 csv gzip files to help with AWS Athena performance. After updates have been added to noctua then I will push these changes to cran. Thanks for all your help with this.

@OssiLehtinen
Copy link

Perfect! And I'm learning here too, so it's a win win :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants