Skip to content
Permalink
master
Go to file
 
 
Cannot retrieve contributors at this time
381 lines (316 sloc) 11.4 KB

WindowFunctions

John Mount October 13, 2019

This is an tutorial on how to use window functions in either the R rquery package, or in the Python data_algebra package (R example here, Python example here).

(Note: these examples require at least rqdatatable 1.2.3, and rquery 1.3.9 which may not be up on CRAN yet.)

The rquery provides a simplified (though verbose) unified interface to Pandas and SQL data transforms, including windows functions. (Note: for a Python of this please see here.)

Let’s work an example. First bring in our packages.

library(wrapr)
library(rquery)
library(rqdatatable)

Now some example data.

d <- data.frame(
  g = c('a', 'b', 'b', 'c', 'c', 'c'),
  x = c(1, 4, 5, 7, 8, 9),
  v = c(10, 40, 50, 70, 80, 90),
  stringsAsFactors = FALSE)

knitr::kable(d)
g x v
a 1 10
b 4 40
b 5 50
c 7 70
c 8 80
c 9 90

And we can run a number of ordered and un-ordered window functions (the distinction is given by if there is an orderby argument present).

table_description = local_td(d)
shift <- data.table::shift

ops <- table_description %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         ngroup := ngroup(),
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 

d %.>%
  ops %.>%
  knitr::kable(.)
g x v row_number v_shift cumsum_v ngroup size max_v min_v sum_v mean_v
a 1 10 1 NA 10 1 1 10 10 10 10
b 4 40 1 NA 40 2 2 50 40 90 45
b 5 50 2 40 90 2 2 50 40 90 45
c 7 70 1 NA 70 3 3 90 70 240 80
c 8 80 2 70 150 3 3 90 70 240 80
c 9 90 3 80 240 3 3 90 70 240 80

Note: we are taking care in separating opeations beween the ordered block and un-ordered block. In databases, the presence of an order constraint in the window function often switches the operation to a cumulative mode.

One of the benefits of rquery is the commands are saved in an object.

cat(format(ops))
## mk_td("d", c(
##   "g",
##   "x",
##   "v")) %.>%
##  extend(.,
##   row_number := row_number(),
##   v_shift := shift(v),
##   cumsum_v := cumsum(v),
##   partitionby = c('g'),
##   orderby = c('x'),
##   reverse = c()) %.>%
##  extend(.,
##   ngroup := ngroup(),
##   size := n(),
##   max_v := max(v),
##   min_v := min(v),
##   sum_v := sum(v),
##   mean_v := mean(v),
##   partitionby = c('g'),
##   orderby = c(),
##   reverse = c())

We can also present a diagram of the operator chain.

ops %.>%
  op_diagram(.) %.>% 
  DiagrammeR::grViz(.)

And these commands can be re-used and even exported to SQL (including large scale SQL such as PostgreSQL, Apache Spark, or Google Big Query).

For a simple demonstration we will use small-scale SQL as realized in SQLite.

raw_connection <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
RSQLite::initExtension(raw_connection)
db <- rquery_db_info(
  connection = raw_connection,
  is_dbi = TRUE,
  connection_options = rq_connection_tests(raw_connection))

ops_db <- table_description %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 

rq_copy_to(db, 'd',
           d,
           temporary = TRUE, 
           overwrite = TRUE)
## [1] "mk_td(\"d\", c( \"g\", \"x\", \"v\"))"
sql1 <- to_sql(ops_db, db)

cat(sql1)
## SELECT
##  `g`,
##  `x`,
##  `v`,
##  `row_number`,
##  `v_shift`,
##  `cumsum_v`,
##  COUNT ( 1 ) OVER (  PARTITION BY `g` ) AS `size`,
##  max ( `v` ) OVER (  PARTITION BY `g` ) AS `max_v`,
##  min ( `v` ) OVER (  PARTITION BY `g` ) AS `min_v`,
##  sum ( `v` ) OVER (  PARTITION BY `g` ) AS `sum_v`,
##  AVG ( `v` ) OVER (  PARTITION BY `g` ) AS `mean_v`
## FROM (
##  SELECT
##   `g`,
##   `x`,
##   `v`,
##   row_number ( ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `row_number`,
##   LAG ( `v` ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `v_shift`,
##   SUM ( `v` ) OVER (  PARTITION BY `g` ORDER BY `x` ) AS `cumsum_v`
##  FROM (
##   SELECT
##    `g`,
##    `x`,
##    `v`
##   FROM
##    `d`
##   ) tsql_14658958129420030500_0000000000
##  ) tsql_14658958129420030500_0000000001

And we can execute this SQL either to materialize a remote result (which involves no data motion, as we send the SQL commands to the database, not move the data to/from R), or to bring a result back from the database to R.

res1_db <- execute(db, ops_db)

knitr::kable(res1_db)
g x v row_number v_shift cumsum_v size max_v min_v sum_v mean_v
a 1 10 1 NA 10 1 10 10 10 10
b 4 40 1 NA 40 2 50 40 90 45
b 5 50 2 40 90 2 50 40 90 45
c 7 70 1 NA 70 3 90 70 240 80
c 8 80 2 70 150 3 90 70 240 80
c 9 90 3 80 240 3 90 70 240 80

Notice we didn’t calculate the group-id rgroup in the SQL version. This is because this is a much less common window function (and not often used in applications). This is also only interesting when we are using a composite key (else the single key column is already the per-group id). So not all data_algebra pipelines can run in all environments. However, we can compute (arbitrary) group IDs in a domain independent manner as follows.

id_ops_a = table_description %.>%
  project(.,
          groupby = 'g') %.>%
  extend(.,
         ngroup:= row_number(),
         orderby = 'g')

id_ops_b = table_description %.>%
    natural_join(.,
                 id_ops_a, by = 'g', jointype = 'LEFT')

cat(format(id_ops_b))
## mk_td("d", c(
##   "g",
##   "x",
##   "v")) %.>%
##  natural_join(.,
##   mk_td("d", c(
##     "g",
##     "x",
##     "v")) %.>%
##    project(., ,
##     groupby = c('g')) %.>%
##    extend(.,
##     ngroup := row_number()),
##   jointype = "LEFT", by = c('g'))

Here we land the result in the database, without moving data through R.

table_2 <- materialize(db, id_ops_b, 'remote_result')

table_2
## [1] "mk_td(\"remote_result\", c( \"g\", \"x\", \"v\", \"ngroup\"))"

And we later copy it over to look at.

res2_db <- execute(db, table_2)

knitr::kable(res2_db)
g x v ngroup
a 1 10 1
b 4 40 2
b 5 50 2
c 7 70 3
c 8 80 3
c 9 90 3

And we can use the same pipeline in R.

d %.>% 
  id_ops_b %.>%
  knitr::kable(.)
g v x ngroup
a 10 1 1
b 40 4 2
b 50 5 2
c 70 7 3
c 80 8 3
c 90 9 3

And we can diagram the group labeling operation.

id_ops_b %.>%
  op_diagram(., merge_tables = TRUE) %.>% 
  DiagrammeR::grViz(.)

Or all the steps in one sequence.

all_ops <- id_ops_b %.>%
  extend(., 
         row_number := row_number(),
         v_shift := shift(v),
         cumsum_v := cumsum(v),
         orderby = 'x',
         partitionby = 'g') %.>%
  extend(., 
         size := n(),
         max_v := max(v),
         min_v := min(v),
         sum_v := sum(v),
         mean_v := mean(v),
         partitionby = 'g') 

all_ops %.>%
  op_diagram(., merge_tables = TRUE) %.>% 
  DiagrammeR::grViz(.)

And we can run this whole sequence with data.table.

d %.>% 
  all_ops %.>%
  knitr::kable(.)
g v x ngroup row_number v_shift cumsum_v size max_v min_v sum_v mean_v
a 10 1 1 1 NA 10 1 10 10 10 10
b 40 4 2 1 NA 40 2 50 40 90 45
b 50 5 2 2 40 90 2 50 40 90 45
c 70 7 3 1 NA 70 3 90 70 240 80
c 80 8 3 2 70 150 3 90 70 240 80
c 90 9 3 3 80 240 3 90 70 240 80

Or in the database (via automatic SQL generation).

all_ops %.>%
  execute(db, .) %.>%
  knitr::kable(.)
g x v ngroup row_number v_shift cumsum_v size max_v min_v sum_v mean_v
a 1 10 1 1 NA 10 1 10 10 10 10
b 4 40 2 1 NA 40 2 50 40 90 45
b 5 50 2 2 40 90 2 50 40 90 45
c 7 70 3 1 NA 70 3 90 70 240 80
c 8 80 3 2 70 150 3 90 70 240 80
c 9 90 3 3 80 240 3 90 70 240 80
# clean up
DBI::dbDisconnect(raw_connection)
You can’t perform that action at this time.