Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commas and quote characters in data mess things up when doing dbWrite #50

Closed
OssiLehtinen opened this issue Dec 13, 2019 · 20 comments
Closed
Assignees
Labels
bug

Comments

@OssiLehtinen
Copy link

@OssiLehtinen OssiLehtinen commented Dec 13, 2019

Issue Description

Since data.table::fwrite tries to handle special characters in it's own way, that is, escaping field separators and and quote characters etc, and quoting strings when necessary, things get weird when Athena tries to deal with such source files.

Reproducible Example

library(RAthena)
library(DBI)
con <- dbConnect(RAthena::athena(),
               profile_name = "rathena")

withquote <-
data.frame(col1 = c("abcd,", "15 \"", "\\n"), col2 = c(1,2,3), col3 = Sys.time(), col4 = Sys.Date())
dbWriteTable(con, "temp.quote_test", withquote)
tbl(con, in_schema("temp", "quote_test"))

(The datetimes and dates are there for later.)

Dealing with a comma in the data can be done using tsv file type. However a tab would cause problems then.

The quote character will be problematic in either case. Default behaviour of fwrite is to '"double" (default, same as write.csv), in which case the double quote is doubled with another one.' and then the whole entry is encolsed in another set of quotes, which Athena has no idea how to deal with. The conditional quoting takes place also when a field has a comma in csv of a tab in tsv.

My (wholly unelegant) solution has been to use
, quote=F, row.names=F, col.names=F, sep="|", na=""
when writing the file. And telling athena to use "|" as the separator. Additionally, I've simply removed all "|"s from the data. The point is that the pipe character is not that common in my data.

This is obviously not at all optimal. I think the nicest thing would be to have the file with escaped special characters, but without the enclosing quotes. However, I don't think fwrite can accomplish this.

The other solution would be to enclose everything in double quotes, using quote=TRUE,qmethod='escape', col.names=F, but then one needs to use another SerDe in Athena. Now, parsing dates and datetimes get's complicated and I have not been able to get those to function.

The way around this is to

  1. Define a temp table as, e.g.:
create external table temp.col_test (col1 string, col2 string, col3 string, col4 string)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar'     = '"',
   'escapeChar'    = '\\'
)
LOCATION 's3://bucket/path_to_data/'
  1. Create the final table with
create table col_test2 as (SELECT 
try_cast(col1 as varchar) as col1,
try_cast(col2 as integer) as col2,
try_cast(col3 as timestamp) as col3,
try_cast(col4 as date) as col4

FROM "temp"."col_test")

or if one wishes to append to an existing table:

insert into temp.col_test2 (SELECT 
try_cast(col1 as varchar) as col1,
try_cast(col2 as integer) as col2,
try_cast(col3 as timestamp) as col3,
try_cast(col4 as date) as col4

FROM "temp"."col_test")

I know we are back at using Athena to create the final tables, but this is the only way so far I've been able to get everything to work at the same time. Well, an unescaped new line will still mess things up...

I know this is a bit of a horror story, but on the other hand these are situations that at least I have hade to deal with with 'real data'.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 13, 2019

If a user has a special character "," and they are using the default "csv" file type. I would suggest that they have set the wrong file type as csv. If the basic flat files don't work then setting parquet is always an option.

I am reluctant to change the sep to "|" as currently list data type use "|" example:

list(1,2,3) -> 1|2|3

I wasn't aware that data.table would semi quote character fields.

@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 13, 2019

I see the problem with "|". Well, I guess any character can cause trouble if no escaping is used. The risk with comma and tab is that one cannot always fully control the contents of incoming data. Well, maybe some data cleaning step, such as replacing tabs with spaces would be wise.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 13, 2019

that could possibly be done in sqlData if sep is passed to it, if could edit the special character.

In the mean time have you tried using parquet file format?

@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 13, 2019

I don't have arrow setup on my box at the momen't so couldn't test Parquet with that. Doing the Parquet conversion with Athena, as described above, seems to work fine. Right now on the road again, so can't continue with arrow...

But yeah, I guess one option would be to do some automatic replacement as you propose. I think that would better than the current 'melt down' that can happen.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 13, 2019

I will play around with some ideas once i have the dbRemoveTable branch merged

@DyfanJones DyfanJones added the bug label Dec 14, 2019
@DyfanJones DyfanJones self-assigned this Dec 14, 2019
@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

current issue is that when AWS Athena queries special characters it returns a quoted csv:

"col1","col2","col3","col4","col5"
"abcd.","1.0","2019-12-14 16:05:04.539","2019-12-14","abcd."
"15 ""","2.0","2019-12-14 16:05:04.539","2019-12-14","15 """
"n	","3.0","2019-12-14 16:05:04.539","2019-12-14","n	"

This makes it very difficult to split and may result in double " as a return of it. I don't think this is avoidable.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

To fix this sqlData can prepare the data in the following method:

setMethod("sqlData", "AthenaConnection", 
          function(con, value, row.names = NA, file.type = c("csv", "tsv", "parquet"),...) {
  stopifnot(is.data.frame(value))
  
  file.type = match.arg(file.type)
  
  Value <- copy(value)
  Value <- sqlRownamesToColumn(Value, row.names)
  field_names <- gsub("\\.", "_", make.names(names(Value), unique = TRUE))
  DIFF <- setdiff(field_names, names(Value))
  names(Value) <- field_names
  if (length(DIFF) > 0) message("Info: data.frame colnames have been converted to align with Athena DDL naming convertions: \n",paste0(DIFF, collapse= ",\n"))
  
  # get R col types
  col_types <- sapply(Value, class)
  
  # preprosing proxict format
  posixct_cols <- names(Value)[sapply(col_types, function(x) "POSIXct" %in% x)]
  # create timestamp in athena format: https://docs.aws.amazon.com/athena/latest/ug/data-types.html
  for (col in posixct_cols) set(Value, j=col, value=strftime(Value[[col]], format="%Y-%m-%d %H:%M:%OS3"))
  
  # preprosing list format
  list_cols <- names(Value)[sapply(col_types, function(x) "list" %in% x)]
  for (col in list_cols) set(Value, j=col, value=sapply(Value[[col]], paste, collapse = "|"))
  
  # handle special characters in character and factor column types
  special_char <- names(withquote)[col_types %in% c("character", "factor")]
  switch(file.type,
         csv = {for (col in special_char) set(Value, j=col, value=gsub("," , "\\.", Value[[col]]))
                for (col in special_char) set(Value, j=col, value=gsub("\"" , "'", Value[[col]]))},
         tsv = {for (col in special_char) set(Value, j=col, value=gsub("\t" , " ", Value[[col]]))
                for (col in special_char) set(Value, j=col, value=gsub("\"" , "'", Value[[col]]))})
  
  Value
})

However user will need to be notified around the changes to characters strings. I am still not a 100% comfortable around the replacement of , with .. Happy for other suggestions

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

Another issue will have to be addressed The following special characters will cause issue with delimited files:

\n : new line termination
\\ : escape
, : fields terminated (csv separator)
\t : fields terminated (tsv separator) 
" : quotation
@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

Part of the issue is down to the escape: \\ within the DDL. R and Athena are treating this differently. The removal of the escape parameter seems resolve this.

@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 14, 2019

One quick point: replacing " with ' can be problematic. In my (retail related) data I might for example have some sizes in inches (like 2") and the meaning gets lost with the substitution.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

OK cool, i will see if i can get around that :)

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 14, 2019

@OssiLehtinen " is now not converted to ' and only the separating value is managed. Please feel free to see if branch sqlData solves this issue :) I will added more special characters in the unit test. To make sure this issue is prevented in the future

@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 16, 2019

Seems to work with my tests, except for a minor tweak:

value <- sqlData(conn, value, row.names = row.names)

should be

value <- sqlData(conn, value, row.names = row.names, file.type = file.type),

I think.

btw. I would set tsv as the default file.type, as replacing tabs with spaces feels much less intrusive than replacing commas with dots. Would using ";" as the subsititute be less weird?

I mean. if there is some written text in the data. replacing commas with dots can make the text pretty difficult to read. or what do you think?

Still one thought: the message about replacing tabs with spaces could be more clear as:

Info: Tab characters have been converted to single spaces to help with Athena reading file format tsv

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 16, 2019

Thanks for the spot, will make that change now. I agree with the change of tsv as default. Plus it will help with any support in json column types to athena map (possible future plan).

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 16, 2019

The problem with making tsv as default is is a breaking change as other users are currently using csv as default file.

Would setting file.type = "tsv" in dbWriteTable and file_type in copy_to be sufficient? Can add comments in documentation for what users need to watch out for.

@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 16, 2019

That's right. I guess problems would arise when appending to a csv based table or is there some other situation which would cause trouble?

Should there, btw, be a check that the formats match when using append = T? Or pehaps it is there already, and I'm just missing it.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 16, 2019

Currently no format check has been added. Will have to check in AWS glue will give this information.

@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 16, 2019

Going to add file and compression check when appending to existing AWS Athena Table. Will have to review code at a later date to tidy it up (if necessary):

Athena_write_table <-
  function(conn, name, value, overwrite=FALSE, append=FALSE,
           row.names = NA, field.types = NULL, 
           partition = NULL, s3.location = NULL, file.type = c("csv","tsv", "parquet"),
           compress = FALSE, max.batch = Inf,...) {
    # variable checks
    stopifnot(is.character(name),
              is.data.frame(value),
              is.logical(overwrite),
              is.logical(append),
              is.null(s3.location) || is.s3_uri(s3.location),
              is.null(partition) || is.character(partition) || is.list(partition),
              is.logical(compress))
    
    sapply(tolower(names(partition)), function(x){if(x %in% tolower(names(value))){
      stop("partition ", x, " is a variable in data.frame ", deparse(substitute(value)), call. = FALSE)}})
    
    file.type = match.arg(file.type)
    
    if(max.batch < 0) stop("`max.batch` has to be greater than 0", call. = F)
    
    if(!is.infinite(max.batch) && file.type == "parquet") message("Info: parquet format is splittable and AWS Athena can read parquet format ",
                                                                  "in parrell. `max.batch` is used for compressed `gzip` format which is not splittable.")
    
    # use default s3_staging directory is s3.location isn't provided
    if (is.null(s3.location)) s3.location <- conn@info$s3_staging
    
    # made everything lower case due to aws Athena issue: https://aws.amazon.com/premiumsupport/knowledge-center/athena-aws-glue-msck-repair-table/
    name <- tolower(name)
    s3.location <- tolower(s3.location)
    if(!is.null(partition) && is.null(names(partition))) stop("partition parameter requires to be a named vector or list", call. = FALSE)
    if(!is.null(partition)) {names(partition) <- tolower(names(partition))}
    
    if(!grepl("\\.", name)) Name <- paste(conn@info$dbms.name, name, sep = ".") 
    else{Name <- name
         name <- gsub(".*\\.", "", name)}
    
    if (overwrite && append) stop("overwrite and append cannot both be TRUE", call. = FALSE)

    if(append && is.null(partition)) stop("Athena requires the table to be partitioned to append", call. = FALSE)
    
    # Check if table already exists in the database
    found <- dbExistsTable(conn, Name)
    
    if (found && !overwrite && !append) {
      stop("Table ", Name, " exists in database, and both overwrite and",
           " append are FALSE", call. = FALSE)
    }
    
    if(!found && append){
      stop("Table ", Name, " does not exist in database and append is set to TRUE", call. = T)
    }
    
    if (found && overwrite) {
      dbRemoveTable(conn, Name, confirm = TRUE)
    }
    
    # Check file format if appending
    if(found && append){
      dbms.name <- gsub("\\..*", "" , Name)
      Table <- gsub(".*\\.", "" , Name)
      
      glue <- con@ptr$client("glue")
      tbl_info <- glue$get_table(DatabaseName = dbms.name,
                                 Name = Table)$Table
      
      # Return correct file format when appending onto existing AWS Athena table
      file.type <- switch(tbl_info$StorageDescriptor$SerdeInfo$SerializationLibrary, 
                         "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" = switch(tbl_info$StorageDescriptor$SerdeInfo$Parameters$field.delim, 
                                                                                       "," = "csv",
                                                                                       "\t" = "tsv",
                                                                                       stop("RAthena currently only supports csv and tsv delimited format")),
                         "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" = "parquet",
                         stop("Unable to append onto table: ", Name,"\n", tbl_info$StorageDescriptor$SerdeInfo$SerializationLibrary,
                              ": Is currently not supported by RAthena", call. = F))

      # Return if existing files are compressed or not
      compress = switch(file.type,
                        "parquet" = {if(is.null(tbl_info$Parameters$parquet.compress)) FALSE else {
                                        if(tolower(tbl_info$Parameters$parquet.compress) == "snappy") TRUE else 
                                          stop("RAthena currently only supports SNAPPY compression for parquet", call. = F)}
                                    },
                        "tsv" = {if(is.null(tbl_info$Parameters$compressionType)) FALSE else {
                                    if(tolower(tbl_info$Parameters$compressionType) == "gzip") TRUE else
                                      stop("RAthena currently only supports gzip compression for tsv", call. = F)}},
                        "csv" = {if(is.null(tbl_info$Parameters$compressionType)) FALSE else {
                                    if(tolower(tbl_info$Parameters$compressionType) == "gzip") TRUE else
                                      stop("RAthena currently only supports gzip compression for csv", call. = F)}})
    }
    
    # return original Athena Types
    if(is.null(field.types)) field.types <- dbDataType(conn, value)
    value <- sqlData(conn, value, row.names = row.names, file.type = file.type)
    
    # check if arrow is installed before attempting to create parquet
    if(file.type == "parquet"){
      # compress file
      t <- tempfile()
      FileLocation <- paste(t, Compress(file.type, compress), sep =".")
      
      if(!requireNamespace("arrow", quietly=TRUE))
        stop("The package arrow is required for R to utilise Apache Arrow to create parquet files.", call. = FALSE)
      else {
        cp <- if(compress) "snappy" else NULL
        arrow::write_parquet(value, FileLocation, compression = cp)}
    }
    
    # writes out csv/tsv, uses data.table for extra speed
    if (file.type == "csv"){
      FileLocation <- split_data(value, max.batch = max.batch, compress = compress, file.type = file.type)
    }
    
    if (file.type == "tsv"){
      FileLocation <- split_data(value, max.batch = max.batch, compress = compress, file.type = file.type, sep = "\t")
    }

    # send data over to s3 bucket
    upload_data(conn, FileLocation, name, partition, s3.location, file.type, compress)
    
    if (!append) {
      sql <- sqlCreateTable(conn, table = Name, fields = value, field.types = field.types, 
                            partition = names(partition),
                            s3.location = s3.location, file.type = file.type,
                            compress = compress)
      
      # create Athena table
      rs <- dbExecute(conn, sql)
      dbClearResult(rs)}
    
    # Repair table
    res <- dbExecute(conn, paste0("MSCK REPAIR TABLE ", Name))
    dbClearResult(res)
    
    on.exit({lapply(FileLocation, unlink)
      if(!is.null(conn@info$expiration)) time_check(conn@info$expiration)})
    
    invisible(TRUE)
  }
DyfanJones added a commit that referenced this issue Dec 16, 2019
Handle special characters #50
@DyfanJones
Copy link
Owner

@DyfanJones DyfanJones commented Dec 16, 2019

@OssiLehtinen just merged PR #51. Closing this issue, if the issue persists please feel free to re-open this issue.

If there is any more issue/ features you come across please let me know. I am planning to put these new changes live to the cran by the end of the week, Many thanks for all your help.

@DyfanJones DyfanJones closed this Dec 16, 2019
@OssiLehtinen
Copy link
Author

@OssiLehtinen OssiLehtinen commented Dec 16, 2019

Thanks for your hard work on the package! It's shaping up to be an exellent tool!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
2 participants
You can’t perform that action at this time.