Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] errors when downloading parquet files from s3. #11934

Closed
JasperSch opened this issue Dec 13, 2021 · 11 comments
Closed

[R] errors when downloading parquet files from s3. #11934

JasperSch opened this issue Dec 13, 2021 · 11 comments

Comments

@JasperSch
Copy link

JasperSch commented Dec 13, 2021

When writing a dateset to S3 as parquet files using write_dataset, I get download errors when retrieving the files afterwards.
Error: 'PAR1���2�2L���' does not exist in current working directory ('/tmp/Rtmpk1pQuU').
Despite of the errors, the files do however still get downloaded.
The errors do not seem to occur when I use write_dataset locally and upload the files to s3 manually using aws.s3::put_object.
They also stop occurring if I re-upload the downloaded files.

System info:

R version 3.6.3
arrow 6.0.1
aws.s3 0.3.21

MWE:

# You need an s3 backend to run this.
bucket <- 'xxx'
prefix <- 'yyy'

data <- data.frame(
     x = letters[1:5]
    )

arrow::write_dataset(
    dataset = data,
    path =  file.path(
        "s3:/",
        bucket,
        prefix,
        "test_parquet"))

ref <-   paste0(
    "s3://", bucket, "/",
    gsub("//", "/", paste(collapse = "/", c(prefix, "test_parquet/part-0.parquet")))
  )

aws.s3::save_object(
    object = ref,
    file = "test"
    )

# Here an error is thrown, although the file is still downloaded without problems 
# Error: 'PAR1���2�2L���' does not exist in current working directory ('/tmp/Rtmpk1pQuU'). 
    
retrievedData <- dplyr::collect(arrow::open_dataset('test'))
print(retrievedData)

@thisisnic
Copy link
Member

Thanks for the report @JasperSch . Just to confirm, do you get any problems printing the retrieved data in the last step, or not, i.e. is it just the point at which you're running aws.s3::save_object()?

@paleolimbot
Copy link
Member

I couldn't reproduce this using minio locally...is there anything that I'm not understanding about your setup? If you can modify this example to reproduce your error we will be better able to help fix!

library(arrow, warn.conflicts = FALSE)

dir <- tempfile()
dir.create(dir)
subdir <- file.path(dir, "some_subdir")
dir.create(subdir)
list.files(dir)
#> [1] "some_subdir"

minio_server <- processx::process$new("minio", args = c("server", dir), supervise = TRUE)
Sys.sleep(1)
stopifnot(minio_server$is_alive())
#> Error: minio_server$is_alive() is not TRUE

# make sure we can connect
s3_uri <- "s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000"
bucket <- s3_bucket(s3_uri)
bucket$ls("some_subdir")
#> [1] "some_subdir/test"

# write a dataset to minio
data <- data.frame(x = letters[1:5])

write_dataset(
  dataset = data,
  path = bucket$path("some_subdir/test")
)

bucket$ls("some_subdir/test")
#> [1] "some_subdir/test/part-0.parquet"

dplyr::collect(arrow::open_dataset(bucket$path("some_subdir/test")))
#>   x
#> 1 a
#> 2 b
#> 3 c
#> 4 d
#> 5 e

minio_server$interrupt()
#> [1] FALSE
Sys.sleep(1)
stopifnot(!minio_server$is_alive())

Created on 2022-01-10 by the reprex package (v2.0.1)

@JasperSch
Copy link
Author

JasperSch commented Jan 10, 2022

Thanks for the report @JasperSch . Just to confirm, do you get any problems printing the retrieved data in the last step, or not, i.e. is it just the point at which you're running aws.s3::save_object()?

Just a problem with aws.s3::save_object(). So basically, all arrow functions work without problem. It is only when I try to download the files written by arrow using aws.s3::save_object, that I get an error.

@JasperSch
Copy link
Author

@paleolimbot Thank you for the example.
I'll try to get this running.

I just noticed btw that my MWE was not fully reproducible.
I edited s3ObjectURI to paste0.

@JasperSch
Copy link
Author

@paleolimbot

Ran into some issues in installing minio, but eventually managed to set it up in a docker container.
Two problems I ran into:

  • some_subdir was not accepted as bucket name
  • had to use minio.s3::save_object since I could not get aws.s3::save_object to work.

The example below should be very close to what you proposed.

devtools::install_github("nagdevAmruthnath/minio.s3")

# make sure we can connect
s3_uri <- "s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000"
bucket <- arrow::s3_bucket(s3_uri)
bucket$ls("bucket")
# > [1] "bucket/test"

# write a dataset to minio
data <- data.frame(x = letters[1:5])

arrow::write_dataset(
    dataset = data,
    path = bucket$path("bucket/test")
)

Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials
    "AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials
    "AWS_DEFAULT_REGION" = "eu-west-1",
    "AWS_S3_ENDPOINT" = "localhost:9000")   

minio.s3::save_object(
    object = "test/part-0.parquet",
    bucket = "bucket",
    file = "test",
    use_https = F
)
# Error: 'PAR1���2�4L�
# ���' does not exist in current working directory

So, in your example, I think you could you try running:

minio.s3::save_object(
    object = "test/part-0.parquet",
    bucket = "some_subdir",
    file = "test",
    use_https = F
)

@paleolimbot
Copy link
Member

Thanks for making this example easy for me to reproduce!

You're right, this example fails for me in the same way that it fails for you. Based on the stack trace of the error, it looks like this is coming from the minio.s3 library (and the aws.s3 library in your previous example). From examining the local file that was saved, it doesn't appear that the arrow package wrote an invalid file...rather, it looks like the minio.s3 and aws.s3 packages are interpreting the content of the file as a file path somewhere. Would it be reasonable to open an issue in either or both of those repositories to fix that code?

@JasperSch
Copy link
Author

@paleolimbot

Yes, that would be reasonable. Decided to open it here in the first place since I've got the feeling that the root cause of the issues lies in the way arrow::write_dataset writes the files to s3.

See below an extended version of my example above.
Please ignore the implementation of put_object. I had to fix it since the the version of minio.s3 threw some errors.
The example also still holds with aws backend and using aws.s3::put_object.

Also (not shown here) creating the files locally with arrow::write_dataset and afterwards uploading to s3 using aws.s3::put_object allows you to afterwards download the files with aws.s3::save_object without errors.

Thus conclusively, my assumption is that arrow::write_dataset puts files on s3 in another way than what aws.s3::put_object does. Hereby, something goes wrong with the files, which later on throws (unneeded) errors when downloading (indeed perfectly valid) files. Maybe it's something with the metadata about the files? Indexing? ...?

So, to me it's still a question whether arrow::write_dataset or aws.3::save_object should be fixed.
Maybe it's best to understand this first and get arrow::write_dataset of the table before opening an issue here?

# make sure we can connect
s3_uri <- "s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000"
bucket <- arrow::s3_bucket(s3_uri)
bucket$ls("bucket")
# > [1] "bucket/test"

# write a dataset to minio
data <- data.frame(x = letters[1:5])

arrow::write_dataset(
    dataset = data,
    path = bucket$path("bucket/test")
)


Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials
    "AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials
    "AWS_DEFAULT_REGION" = "eu-west-1",
    "AWS_S3_ENDPOINT" = "localhost:9000")   

setwd((tempdir()))
minio.s3::save_object(
    object = "test/part-0.parquet",
    bucket = "bucket",
    file = "test",
    use_https = F
)
# Error: 'PAR1���2�4L�
# ���' does not exist in current working directory

system("ls")
# test

# FIX for minio.s3 put_object function.
put_object <- function(file, 
    object, 
    bucket, 
    multipart = FALSE, 
    acl = c("private", "public-read", "public-read-write", 
        "aws-exec-read", "authenticated-read", 
        "bucket-owner-read", "bucket-owner-full-control"),
    headers = list(),
    base_url,
    region,
    key,
    secret,
    ...) {
  
  if (missing(base_url)) {
    base_url = Sys.getenv("AWS_S3_ENDPOINT")
  } 
  
  
  if (missing(region)) {
    region = Sys.getenv("AWS_DEFAULT_REGION")
  } 
  
  if (missing(key)) {
    key = Sys.getenv("AWS_ACCESS_KEY_ID")
  } 
  
  if (missing(secret)) {
    secret = Sys.getenv("AWS_SECRET_ACCESS_KEY")
  }      
  
  
  
  acl <- match.arg(acl)
  headers <- c(list(`x-amz-acl` = acl), headers)
  if (isTRUE(multipart)) {
    if (is.character(file) && file.exists(file)) {
      file <- readBin(file, what = "raw")
    }
    size <- length(file)
    partsize <- 1e8 # 100 MB
    nparts <- ceiling(size/partsize)
    
    # if file is small, there is no need for multipart upload
    if (size < partsize) {
      put_object(file = file, object = object, bucket = bucket, multipart = FALSE, headers = headers, ...)
      return(TRUE)
    }
    
    # function to call abort if any part fails
    abort <- function(id) delete_object(object = object, bucket = bucket, query = list(uploadId = id), ...)
    
    # split object into parts
    seqparts <- seq_len(partsize)
    parts <- list()
    for (i in seq_len(nparts)) {
      parts[[i]] <- head(file, partsize)
      if (i < nparts) {
        file <- file[-seqparts]
      }
    }
    
    # initialize the upload
    initialize <- post_object(file = NULL, object = object, bucket = bucket, query = list(uploads = ""), headers = headers, ...)
    id <- initialize[["UploadId"]]
    
    # loop over parts
    partlist <- list(Number = character(length(parts)),
        ETag = character(length(parts)))
    for (i in seq_along(parts)) {
      query <- list(partNumber = i, uploadId = id)
      r <- try(put_object(file = parts[[i]], object = object, bucket = bucket, 
              multipart = FALSE, headers = headers, query = query), 
          silent = FALSE)
      if (inherits(r, "try-error")) {
        abort(id)
        stop("Multipart upload failed.")
      } else {
        partlist[["Number"]][i] <- i
        partlist[["ETag"]][i] <- attributes(r)[["ETag"]]
      }
    }
    
    # complete
    complete_parts(object = object, bucket = bucket, id = id, parts = partlist, ...)
    return(TRUE)
  } else {
    r <- minio.s3::s3HTTP(verb = "PUT", 
        bucket = bucket,
        path = paste0('/', object),
        headers = c(headers, list(
                `Content-Length` = ifelse(is.character(file) && file.exists(file), 
                    file.size(file), length(file))
            )), 
        request_body = file,
        write_disk = NULL,
        accelerate = FALSE,
        dualstack = FALSE,
        parse_response = TRUE, 
        check_region = FALSE,
        url_style = c("path", "virtual"),
        base_url = base_url,
        verbose = getOption("verbose", FALSE),
        region = region, 
        key = key, 
        secret = secret, 
        session_token = NULL,
        use_https = FALSE)
    return(TRUE)
  }
}

put_object(
    object = "test/part-0.parquet",
    bucket = "bucket",
    file = "test",
    use_https = T
)

minio.s3::save_object(
    object = "test/part-0.parquet",
    bucket = "bucket",
    file = "test",
    use_https = F
)
# No error anymore!


@paleolimbot
Copy link
Member

Thank you for your response! I have a feeling the folks who maintain aws.s3 and/or minio.s3 will have a better handle on the mode of failure. I'd suggest opening an issue there and/or submitting your fix as a pull request...the maintainers there may have a suggestion as to whether or not we should be writing to S3 in a different way.

@westonpace
Copy link
Member

I poked around at this a bit. The error seems to be that write_dataset is creating files with the application/xml content type and then minio.s3::save_object is trying to parse the object as XML because of this content type. I'm not entirely sure why application/xml is being set (I'm pretty sure we default in Arrow to not setting the content type at all) so I'll look into that a bit more.

If I hardcode the C++ to set the content-type to something else (application/parquet) then minio.s3::save_object works fine.

(base) pace@pace-desktop:~$ mc stat myminio/bucket/test/part-0.parquet 
Name      : part-0.parquet
Date      : 2022-01-11 09:41:14 HST 
Size      : 1.0 KiB 
ETag      : 6b320c21546ccf5bdb5920a709562598-1 
Type      : file 
Metadata  :
  Content-Type: application/xml 

@westonpace
Copy link
Member

It appears that the AWS SDK forces a content-type. If one isn't set then it will use application/xml (which is rather unfortunate). That being said, I don't understand why minio.s3::save_object would by trying to interpret the content-type at all. That seems to happen here: https://github.com/nagdevAmruthnath/minio.s3/blob/4ae635168ee57bf783314d95f8ae71d08831c0d8/R/s3HTTP.R#L188

So I would argue it is a bug in both libraries. I opened https://issues.apache.org/jira/browse/ARROW-15306 which should be pretty straightforward to fix if everyone agrees it is a good thing to do.

@thisisnic
Copy link
Member

I'm closing this as it appears that the Arrow ticket is resolved and the ticket is opened on the AWS SDK - if this problem persists, feel free to reopen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants