Skip to content

Commit

Permalink
Merge pull request #42 from DyfanJones/sql_translate_env
Browse files Browse the repository at this point in the history
add custom athena `dplyr::sql_translate_env`
  • Loading branch information
Larefly committed Dec 2, 2019
2 parents 19edbc9 + 56fada9 commit e6036e9
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 10 deletions.
5 changes: 3 additions & 2 deletions DESCRIPTION
@@ -1,7 +1,7 @@
Package: noctua
Type: Package
Title: Connect to 'AWS Athena' using R 'AWS SDK' 'paws' ('DBI' Interface)
Version: 1.2.1
Version: 1.2.1.9000
Authors@R: person("Dyfan", "Jones", email="dyfan.r.jones@gmail.com",
role= c("aut", "cre"))
Description: Designed to be compatible with the 'R' package 'DBI' (Database Interface)
Expand All @@ -18,7 +18,7 @@ Imports:
Suggests:
arrow,
bit64,
dplyr,
dplyr (>= 0.7.0),
dbplyr,
testthat
Depends: R (>= 3.2.0)
Expand All @@ -37,5 +37,6 @@ Collate:
'Table.R'
'athena_low_api.R'
'dplyr_integration.R'
'sql_translate_env.R'
'utils.R'
'zzz.R'
1 change: 1 addition & 0 deletions NAMESPACE
Expand Up @@ -9,6 +9,7 @@ export(delete_work_group)
export(get_session_token)
export(get_work_group)
export(list_work_groups)
export(sql_translate_env.AthenaConnection)
export(tag_options)
export(update_work_group)
exportClasses(AthenaConnection)
Expand Down
7 changes: 7 additions & 0 deletions NEWS.md
@@ -1,3 +1,10 @@
# RAthena 1.2.1.9000
### Bug Fixed
Thanks to @OssiLehtinen for identifying issue around `sql_translate_env`. Previously `noctua` would take the default `dplyr::sql_translate_env`, now `noctua` has a custom method that uses Data types from: https://docs.aws.amazon.com/athena/latest/ug/data-types.html and window functions from: https://docs.aws.amazon.com/athena/latest/ug/functions-operators-reference-section.html

### Unit tests
* `dplyr sql_translate_env` tests if R functions are correct translated in to Athena sql syntax.

# noctua 1.2.1
### New Features:
* Parquet file type can now be compress using snappy compression when writting data to S3.
Expand Down
4 changes: 4 additions & 0 deletions R/Connection.R
Expand Up @@ -5,7 +5,11 @@ NULL
#'
#' Implementations of pure virtual functions defined in the `DBI` package
#' for AthenaConnection objects.
#' @slot ptr a list of connecting objects from the SDK paws package.
#' @slot info a list of metadata objects
#' @slot connection contains the \code{AthenaConnection} class object
#' @name AthenaConnection
#' @keywords internal
#' @inheritParams methods::show
NULL

Expand Down
80 changes: 80 additions & 0 deletions R/sql_translate_env.R
@@ -0,0 +1,80 @@
#' @include Connection.R
NULL

#' AWS Athena backend dbplyr
#'
#' Create s3 implementation of \code{sql_translate_env} for AWS Athena sql translate envirnoment based off
#' \href{https://docs.aws.amazon.com/athena/latest/ug/data-types.html}{Athena Data Types} and
#' \href{https://docs.aws.amazon.com/athena/latest/ug/functions-operators-reference-section.html}{DML Queries, Functions, and Operators}
#' @param con An \code{\linkS4class{AthenaConnection}} object, produced by
#' [DBI::dbConnect()]
#' @name sql_translate_env
NULL

athena_window_functions <- function() {
# using pkg_method to retrieve external pkg methods
base_win <- pkg_method('base_win', "dbplyr")
sql_translator <- pkg_method('sql_translator', "dbplyr")
win_absent <- pkg_method('win_absent', "dbplyr")
win_recycled <- pkg_method('win_recycled', "dbplyr")
return(sql_translator(
.parent=base_win,
all=win_recycled('bool_and'),
any=win_recycled('bool_or'),
n_distinct=win_absent('n_distinct'),
sd=win_recycled("stddev_samp")
))
}


#' @rdname sql_translate_env
#' @export
sql_translate_env.AthenaConnection <- function(con) {
sql_variant <- pkg_method("sql_variant", "dbplyr")
sql_translator <- pkg_method("sql_translator", "dbplyr")
sql_prefix <- pkg_method("sql_prefix", "dbplyr")
sql_cast <- pkg_method('sql_cast', "dbplyr")
sql <- pkg_method('sql', "dplyr")
build_sql <- pkg_method('build_sql', "dbplyr")
base_scalar <- pkg_method('base_scalar', "dbplyr")
base_agg <- pkg_method('base_agg', "dbplyr")

sql_variant(
sql_translator(.parent = base_scalar,
ifelse = sql_prefix("IF"),
as = function(column, type) {
sql_type <- toupper(dbDataType(athena(), type)) # using toupper to keep dependencies low
build_sql('CAST(', column, ' AS ', sql(sql_type), ')')
},
as.character = sql_cast("STRING"), # using STRING from DataTypes.R conversion from R to Athena
as.numeric = sql_cast("DOUBLE"),
as.double = sql_cast("DOUBLE"),
as.integer = sql_cast("INT"), # using INT from DataTypes.R conversion from R to Athena
as.integer64 = sql_cast("BIGINT"), # as.integer64 reflects bigint for athena
as.Date = sql_cast("DATE"),
as.logical = sql_cast("BOOLEAN"),
as.raw = sql_cast("VARBINARY"),
tolower = sql_prefix("LOWER"),
toupper = sql_prefix("UPPER"),
pmax = sql_prefix("GREATEST"),
pmin = sql_prefix("LEAST"),
is.finite = sql_prefix("IS_FINITE"),
is.infinite = sql_prefix("IS_FINITE"),
is.nan = sql_prefix("IS_NAN"),
`[[` = function(x, i) {
if (is.numeric(i) && all.equal(i, as.integer(i))) {
i <- as.integer(i)
}
build_sql(x, "[", i, "]")
}
),
sql_translator(.parent = base_agg,
n = function() sql("COUNT(*)"),
sd = sql_prefix("STDDEV_SAMP"),
var = sql_prefix("VAR_SAMP"),
all = sql_prefix("BOOL_AND"),
any = sql_prefix("BOOL_OR")
),
athena_window_functions()
)
}
1 change: 1 addition & 0 deletions R/zzz.R
Expand Up @@ -3,6 +3,7 @@
register_s3_method("dbplyr", "db_compute", "AthenaConnection")
register_s3_method("dplyr", "db_save_query", "AthenaConnection")
register_s3_method("dbplyr", "db_copy_to", "AthenaConnection")
register_s3_method("dbplyr", "sql_translate_env", "AthenaConnection")
}

register_s3_method <- function(pkg, generic, class, fun = NULL) {
Expand Down
10 changes: 2 additions & 8 deletions cran-comments.md
@@ -1,11 +1,5 @@
## Release Summary
This is update focuses on fixing a bug with `dbWriteTable`.

**Bug Fix**
The helper function `sqlCreateTable` had it's generic parameters `table` and `fields` set to NULL. For more recent versions of R this is not an issue however for older versions of R this raises an error.

**New Feature**
parquet file format now can be compressed using snappy compression when uploading to amazon s3
TBC

## Examples Note:
* All R examples with `\dontrun` & `\donttest` have been given a note warning users that `AWS credentials` are required to run
Expand All @@ -22,7 +16,7 @@ parquet file format now can be compressed using snappy compression when uploadin
0 errors ✔ | 0 warnings ✔ | 0 notes ✔

## unit tests (using testthat) results
* OK: 37
* OK: 52
* Failed: 0
* Warnings: 0
* Skipped: 0
11 changes: 11 additions & 0 deletions man/AthenaConnection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/sql_translate_env.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions tests/testthat/test-dplyr-sql-trans-env.R
@@ -0,0 +1,45 @@
context("dplyr sql_translate_env")

# NOTE System variable format returned for Unit tests:
# Sys.getenv("noctua_arn"): "arn:aws:sts::123456789012:assumed-role/role_name/role_session_name"
# Sys.getenv("noctua_s3_query"): "s3://path/to/query/bucket/"
# Sys.getenv("noctua_s3_tbl"): "s3://path/to/bucket/"

library(dbplyr)
test_that("Check RAthena s3 dplyr sql_translate_env method",{
skip_if_no_env()
# Test connection is using AWS CLI to set profile_name
con <- dbConnect(athena())

t1 <- translate_sql(as.character(1), con = con)
t2 <- translate_sql(as.numeric("1"), con = con)
t3 <- translate_sql(as.double("1.2"), con = con)
t4 <- translate_sql(as.integer(1.2), con = con)
t5 <- translate_sql(as.integer64(1.2), con = con)
t6 <- translate_sql(as.Date("2019-01-01"), con = con)
t7 <- translate_sql(as.logical("true"), con = con)
t8 <- translate_sql(as.raw(10), con = con)
t9 <- translate_sql(tolower("HELLO"), con = con)
t10 <- translate_sql(toupper("hello"), con = con)
t11 <- translate_sql(pmax(1,2), con = con)
t12 <- translate_sql(pmin(1,2), con = con)
t13 <- translate_sql(is.finite("var1"), con = con)
t14 <- translate_sql(is.infinite("var1"), con = con)
t15 <- translate_sql(is.nan("var1"), con = con)

expect_equal(t1 ,sql("CAST(1.0 AS STRING)"))
expect_equal(t2 ,sql("CAST('1' AS DOUBLE)"))
expect_equal(t3 ,sql("CAST('1.2' AS DOUBLE)"))
expect_equal(t4 ,sql("CAST(1.2 AS INT)"))
expect_equal(t5 ,sql("CAST(1.2 AS BIGINT)"))
expect_equal(t6 ,sql("CAST('2019-01-01' AS DATE)"))
expect_equal(t7 ,sql("CAST('true' AS BOOLEAN)"))
expect_equal(t8 ,sql("CAST(10.0 AS VARBINARY)"))
expect_equal(t9 ,sql("LOWER('HELLO')"))
expect_equal(t10 ,sql("UPPER('hello')"))
expect_equal(t11 ,sql("GREATEST(1.0, 2.0)"))
expect_equal(t12 ,sql("LEAST(1.0, 2.0)"))
expect_equal(t13 ,sql("IS_FINITE('var1')"))
expect_equal(t14 ,sql("IS_FINITE('var1')"))
expect_equal(t15 ,sql("IS_NAN('var1')"))
})

0 comments on commit e6036e9

Please sign in to comment.