# Testing with R

In [1]:
install.packages("sparklyr")

Installing package into ‘/usr/local/spark-3.0.1-bin-hadoop3.2/R/lib’
(as ‘lib’ is unspecified)



In [2]:
install.packages("tidyverse")

Installing package into ‘/usr/local/spark-3.0.1-bin-hadoop3.2/R/lib’
(as ‘lib’ is unspecified)



In [3]:
install.packages("nycflights13")

Installing package into ‘/usr/local/spark-3.0.1-bin-hadoop3.2/R/lib’
(as ‘lib’ is unspecified)



### Import Libraries

In [4]:
library(sparklyr)
library(tidyverse)

── [1mAttaching packages[22m ─────────────────────────────────────── tidyverse 1.3.0 ──

[32m✔[39m [34mggplot2[39m 3.3.2     [32m✔[39m [34mpurrr  [39m 0.3.4
[32m✔[39m [34mtibble [39m 3.0.4     [32m✔[39m [34mdplyr  [39m 1.0.2
[32m✔[39m [34mtidyr  [39m 1.1.2     [32m✔[39m [34mstringr[39m 1.4.0
[32m✔[39m [34mreadr  [39m 1.4.0     [32m✔[39m [34mforcats[39m 0.5.0

── [1mConflicts[22m ────────────────────────────────────────── tidyverse_conflicts() ──
[31m✖[39m [34mdplyr[39m::[32mfilter()[39m masks [34mstats[39m::filter()
[31m✖[39m [34mpurrr[39m::[32minvoke()[39m masks [34msparklyr[39m::invoke()
[31m✖[39m [34mdplyr[39m::[32mlag()[39m    masks [34mstats[39m::lag()



In [5]:
conf <- spark_config()

In [6]:
conf$`sparklyr.cores.local` <- 6
conf$`sparklyr.shell.driver-memory` <- "12G"

In [7]:
sc <- spark_connect(master = "local", config = conf)

### Load Parquet Frames

In [8]:
system("ls data/*", intern=TRUE)

In [9]:
txt <- spark_read_parquet(sc, "logdata", "data/*")

### Build UDFs

In [10]:
test <- "there/are/a/lot/of/words"
dplyr::last(str_split(test, '/', simplify=TRUE))

In [11]:
get_object <- function(string) {
    splitted <- str_split(string, '/', simplify=TRUE)
    s3_object <- dplyr::last(splitted)
    return(s3_object)
}

In [12]:
get_object(test)

In [13]:
splitted <- str_split(test, '/', simplify=TRUE)
paste(splitted[,-ncol(splitted)], collapse="/")

In [14]:
get_prefix <- function(string) {
    
    splitted <- stringr::str_split(string, '/', simplify=TRUE)
    all_but_last <- splitted[,-ncol(splitted)]
    concat <- paste(all_but_last, collapse = "/")
    return(concat)
}

In [15]:
get_prefix(test)

In [39]:
# new attempt at R function for spark apply
split_key <- function(df, colName) {
    
    splitted <- stringr::str_split(df[[colName]], '/', simplify=TRUE)
    all_but_last <- splitted[,-ncol(splitted)]
    concat <- paste(all_but_last, collapse = "/")
    object <- dplyr::last(splitted)
    
    df %>% mutate(prefix = concat, object = object)
}

### Process Dataframes

In [42]:
txt2 <- txt %>%
            filter(instr(key, "dmp")>0)
txt2

[38;5;246m# Source: spark<?> [?? x 28][39m
   bucketowner bucket requestdatetime remoteip requester requestid operation
   [3m[38;5;246m<chr>[39m[23m       [3m[38;5;246m<chr>[39m[23m  [3m[38;5;246m<chr>[39m[23m           [3m[38;5;246m<chr>[39m[23m    [3m[38;5;246m<chr>[39m[23m     [3m[38;5;246m<chr>[39m[23m     [3m[38;5;246m<chr>[39m[23m    
[38;5;250m 1[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… 15F94453… REST.HEA…
[38;5;250m 2[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… DC0D21AA… REST.HEA…
[38;5;250m 3[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… 637FF3AD… REST.GET…
[38;5;250m 4[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… DEC9747E… REST.HEA…
[38;5;250m 5[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… A0B7D291… REST.HEA…
[38;5;250m 6[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:aws:… E916FC6B… REST.HEA…
[38;5;250m 7[39m 7409ed49be… alpha… 13/Jun/2020:04… 10.121.… arn:a

In [66]:
# doesn't work
#spark_apply(txt2, split_key, context = { colName <- "key" })
variable = "test"
as_label(variable)

In [70]:
# so string split is returning a list object which spark doesn't know how to deal with
get_object_spark <- function(df, col, newColName) {
    df %>%
      mutate( as_label(newColName) <- reverse(split(reverse( df[[col]] ),'/')[0]) )
} 

In [91]:
# We can just hive regex to get the last thing
txt2 %>% 
    select(key, requestid, operation) %>%
    mutate(object := regexp_extract(key, '.*/([^/]+)$', 1),
          prefix := regexp_extract(key, '.*/([^/]+)$', 0))
#sdf_separate_column("object") 

# sdf_separate_column(object, 1)

[38;5;246m# Source: spark<?> [?? x 5][39m
   key                 requestid   operation  object         prefix             
   [3m[38;5;246m<chr>[39m[23m               [3m[38;5;246m<chr>[39m[23m       [3m[38;5;246m<chr>[39m[23m      [3m[38;5;246m<chr>[39m[23m          [3m[38;5;246m<chr>[39m[23m              
[38;5;250m 1[39m dmp/rawcleansed/ed… 15F944538A… REST.HEAD… part-00123-8a… dmp/rawcleansed/ed…
[38;5;250m 2[39m dmp/edh_fs_config/… DC0D21AA4F… REST.HEAD… part-00000-33… dmp/edh_fs_config/…
[38;5;250m 3[39m dmp/rawcleansed/ed… 637FF3AD75… REST.GET.… part-00010-86… dmp/rawcleansed/ed…
[38;5;250m 4[39m dmp/edh_fs_config/… DEC9747EEC… REST.HEAD… part-00000-33… dmp/edh_fs_config/…
[38;5;250m 5[39m dmp/rawcleansed/ed… A0B7D2916D… REST.HEAD… part-00054-f9… dmp/rawcleansed/ed…
[38;5;250m 6[39m dmp/edh_fs_config/… E916FC6B86… REST.HEAD… part-00000-33… dmp/edh_fs_config/…
[38;5;250m 7[39m dmp/rawcleansed/ed… A558C5A885… REST.GET.… part-00191-42… dmp/rawcle