# Chapter 4

This notebook contains the commands that are shown in the lecture 3.

In [1]:
library(tidyverse)
library(lubridate)

Registered S3 methods overwritten by 'ggplot2':
  method         from 
  [.quosures     rlang
  c.quosures     rlang
  print.quosures rlang
Registered S3 method overwritten by 'rvest':
  method            from
  read_xml.response xml2
── Attaching packages ─────────────────────────────────────── tidyverse 1.2.1 ──
✔ ggplot2 3.1.1       ✔ purrr   0.3.2  
✔ tibble  2.1.1       ✔ dplyr   0.8.0.1
✔ tidyr   0.8.3       ✔ stringr 1.4.0  
✔ readr   1.3.1       ✔ forcats 0.4.0  
── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
✖ dplyr::filter() masks stats::filter()
✖ dplyr::lag()    masks stats::lag()

Attaching package: ‘lubridate’

The following object is masked from ‘package:base’:

    date



## Different resources involved in data analysis pipelines

### Processors as a resource

In [2]:
n_zeros <- 10000
ntimes <- 1000

z <- numeric(n_zeros)

time_for_1 <- Sys.time()
for (t in seq(ntimes)) {
    for (i in seq(1,n_zeros)) {
        z[i] <- z[i] + 1
    }
}
time_for_2 <- Sys.time()

time_for <- time_for_2 - time_for_1

z <- numeric(n_zeros)

time_vec_1 <- Sys.time()
for (t in seq(ntimes)) {
    z <- z + 1
}
time_vec_2 <- Sys.time()

time_vec <- time_vec_2 - time_vec_1

cat(sprintf("Time taken:\n\nFor loop: %.2g\nVectorized operation: %.2g\n\nSpeedup: %.2f", time_for, time_vec, time_for/as.double(time_vec, unit='secs')))

Time taken:

For loop: 0.61
Vectorized operation: 0.028

Speedup: 21.83

### RAM as a resource

Bootstrapping pipeline from chapter 3.

In [3]:
load_filesizes <- function(filesizes_file){
    filesizes <- read_table2(filesizes_file, col_names=c('Bytes','MonthsTo2021', 'Files'), col_types=cols())
    
    filesizes <- filesizes %>%
        # Remove empty files
        filter(Bytes != 0) %>%
        # Create a column for log2 of bytes
        mutate(BytesLog2 = log2(Bytes)) %>%
        # Determine total space S used by N files of size X during date D: S=N*X 
        mutate(SpaceUsage = Bytes*Files) %>%
        # Determine file year and month from the MonthsTo2021-column
        mutate(
            TotalMonths = 2021*12 - MonthsTo2021 - 1,
            Year = TotalMonths %/% 12,
            Month = TotalMonths %% 12 +1,
            Day = 1
        )

     # Set year for really old files and files with incorrect timestamps
    invalid_years = c((filesizes['Year'] < 2010) | (filesizes['Year'] > 2020))
    filesizes[invalid_years, c('Year','Month')] <- NaN
    
    # Get month names for the correct ordering of Month categories
    month_names <- month(seq(1,12), label=TRUE, locale='C')
    filesizes <- filesizes %>%
        mutate(
            # Create Date and get the name for the month
            Date = make_datetime(Year, Month, Day),
            # Set Month 
            Month=month(Month, label=TRUE, locale='C'),
            # Set Month to be an ordered categorical with predefined levels 
            Month=factor(Month, ordered=TRUE, levels=month_names))
    filesizes <- filesizes %>%
        # Sort data based on Date and BytesLog2
        arrange(Date, BytesLog2) %>%
        # Remove old columns
        select(-MonthsTo2021,-TotalMonths,-Day)
    return(filesizes)
}

aggregate_filesize_data <- function(data, grouping, target, agg_function) {
    data_relevant <- data %>%
        # Drop rows with NaNs (invalid years)
        drop_na() %>%
        # Pick relevant columns
        select_at(vars(c(grouping, target))) %>%
        # Change grouping to category for prettier plotting
        mutate_at(vars(grouping), as.factor)

    # Aggregate data
    data_aggregated <- data_relevant %>%
        group_by_at((grouping)) %>%
        summarize_at(vars(target), agg_function) %>%
        ungroup()

    return(data_aggregated)
}

get_bootstrapped_means <- function(dataset, target_col, weight_col, n_means=1000) {
    # Pick relevant columns
    # Pick target data column and convert it to integer
    target_data <- as.numeric(as.character(dataset[[target_col]]))
    # Pick weight data column
    weight_data <- dataset[[weight_col]]
    weight_data <- weight_data/sum(weight_data)

    # Create means vector
    means <- numeric(n_means)
    for (i in seq(n_means)) {
        # Calculate resampled mean
        means[[i]] <- mean(sample(target_data, 100, replace=TRUE, prob=weight_data))
    }
    return(means)
}

bootstrap_byteslog2_mean <- function(dataset, group_variable, target_variable, n_means=1000) {
    
    bootstrapping_function <- function(x) get_bootstrapped_means(x, 'BytesLog2', target_variable, n_means=n_means)
    
    bootstrapped_means <- dataset %>%
        group_by_at(vars(group_variable)) %>%
        nest() %>%
        mutate(
            SampledMeans=map(data, bootstrapping_function),
            Mean=map(SampledMeans, mean)
        ) %>%
        select(-data)
    
    return(bootstrapped_means)
}

In [36]:
chapter3_pipeline <- function(n_means=10000) {

    filesizes <- load_filesizes('../data/filesizes_timestamps.txt')

    yearly_bytes_sum <- aggregate_filesize_data(filesizes, c('Year','BytesLog2'), c('Files', 'SpaceUsage'), sum)

    bootstrapped_yearly_means <- yearly_bytes_sum %>%
        bootstrap_byteslog2_mean('Year', 'Files', n_means=n_means) %>%
        select(Year, Mean)

    return(bootstrapped_yearly_means)
}

head(chapter3_pipeline(n_means=100))

Year,Mean
2010,13.0364
2011,14.0331
2012,10.7298
2013,13.3426
2014,13.9998
2015,11.7389


In [5]:
filesizes <- load_filesizes('../data/filesizes_timestamps.txt')
yearly_bytes_sum <- aggregate_filesize_data(filesizes, c('Year','BytesLog2'), c('Files', 'SpaceUsage'), sum)

print_column_sizes <- function(dataset) {
    map(colnames(dataset), function(x) print(sprintf('column: %12s size: %d', x, object.size(dataset[x]))))
    invisible(NULL)
}

print('filesizes:')
print_column_sizes(filesizes)

print('yearly_bytes_sum:')
print_column_sizes(yearly_bytes_sum)

filesizes_size <- object.size(filesizes)
summarized_size <- object.size(yearly_bytes_sum)

cat(sprintf("
Original data: %d bytes
Summarized data: %d bytes

Reduction ratio: %.2f
", filesizes_size, summarized_size, filesizes_size/summarized_size))


[1] "filesizes:"
[1] "column:        Bytes size: 70384"
[1] "column:        Files size: 70384"
[1] "column:    BytesLog2 size: 70392"
[1] "column:   SpaceUsage size: 70392"
[1] "column:         Year size: 70384"
[1] "column:        Month size: 36872"
[1] "column:         Date size: 70896"
[1] "yearly_bytes_sum:"
[1] "column:         Year size: 3728"
[1] "column:    BytesLog2 size: 5744"
[1] "column:        Files size: 4336"
[1] "column:   SpaceUsage size: 4344"

Original data: 455320 bytes
Summarized data: 15920 bytes

Reduction ratio: 28.60


In [6]:
memory_scope_test <- function(){
    memory_scope_variable = runif(1000)
    print(object.size(memory_scope_variable))
}
memory_scope_test()
print(object.size(memory_scope_variable))

8048 bytes


ERROR: Error in structure(.Call(C_objectSize, x), class = "object_size"): object 'memory_scope_variable' not found


In [7]:
library(pryr)

memtest_nocollect <- function(n=1000) {

    print(mem_used())
    
    A <- runif(n*n)
    A_mean <- mean(A)
    
    print('No garbage collection done.')
    Sys.sleep(5)

    B <- matrix(runif(n*n), ncol=n)
    B <- B %*% t(B)
    B_inv <- solve(B)

    print(mem_used())

    return(max(B %*% B_inv))
}

memtest_collect <- function(n=1000){

    print(mem_used())

    A <- runif(n*n)
    A_mean <- mean(A)

    rm(A)
    print(gc())
    Sys.sleep(5)

    B <- matrix(runif(n*n), ncol=n)
    B <- B %*% t(B)
    B_inv <- solve(B)

    print(mem_used())
    
    return(max(B %*% B_inv))
}

memtest_nocollect(3000)
memtest_collect(3000)

Registered S3 method overwritten by 'pryr':
  method      from
  print.bytes Rcpp

Attaching package: ‘pryr’

The following objects are masked from ‘package:purrr’:

    compose, partial



62.3 MB
[1] "No garbage collection done."
278 MB


62.5 MB
          used (Mb) gc trigger  (Mb) max used  (Mb)
Ncells  883676 47.2    1748677  93.4  1218704  65.1
Vcells 1627736 12.5   37149069 283.5 46638798 355.9
207 MB


## Parallelization strategies
### Using internal parallelization provided by libraries

In [11]:
cat("
A <- matrix(runif(4000*4000), ncol=4000)
A <- A %*% t(A)

time_1 <- Sys.time()
A_inv <- solve(A)
time_2 <- Sys.time()
print(as.double(time_2 - time_1))
", file="omp_test.R")

Sys.setenv(OMP_NUM_THREADS="1")
output <- system('Rscript omp_test.R', intern=TRUE)
time_1thread <- as.numeric(str_extract(output, '\\d.\\d+'))

Sys.setenv(OMP_NUM_THREADS="4")
output <- system('Rscript omp_test.R', intern=TRUE)
time_4thread <- as.numeric(str_extract(output, '\\d.\\d+'))

cat(sprintf("
Time taken:

1 thread: %.2f
4 threads: %.2f

Speedup: %.2f", time_1thread, time_4thread, time_1thread/time_4thread))


Time taken:

1 thread: 4.49
4 threads: 1.75

Speedup: 2.56

In [80]:
library(furrr)

x_squared <- function(x) {
    return(x*x)
}

data <- tibble(x=seq(100))

print(head(data))

# Set up our parallel pool
plan(multisession, workers = 4)

data <- data %>%
    # Run parallel map (future_map) from furrr
    mutate(y=future_map(x, x_squared)) %>%
    # Turn resulting list into a vector of integers
    mutate(y=flatten_int(y))

glimpse(data)

# A tibble: 6 x 1
      x
  <int>
1     1
2     2
3     3
4     4
5     5
6     6
Observations: 100
Variables: 2
$ x <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, …
$ y <int> 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256,…


In [83]:
chapter3_pipeline_parallel <- function(n_means=10000, n_workers=1) {

    filesizes <- load_filesizes('../data/filesizes_timestamps.txt')

    yearly_bytes_sum <- aggregate_filesize_data(filesizes, c('Year','BytesLog2'), c('Files', 'SpaceUsage'), sum)

    bootstrapping_function <- function(x) get_bootstrapped_means(x, 'BytesLog2', 'Files', n_means=n_means)
    
    # Actual parallel part

    # Initialize a parallel pool with n_workers workers
    plan(multisession, workers = n_workers)    
    
    bootstrapped_yearly_means <- yearly_bytes_sum %>%
        group_by(Year) %>%
        nest() %>%
        mutate(
            # Map a function to each dataset. Output is a list of numeric vectors.
            SampledMeans=future_map(data, bootstrapping_function, .options=furrr_options(seed=TRUE)),
            Mean=future_map(SampledMeans, mean),
        ) %>%
        select(-data) %>%
        select(Year, Mean)
    
    return(bootstrapped_yearly_means)
}

# Measure performance and verify results 
time1 <- Sys.time()
means_orig <- chapter3_pipeline(n_means=100000) %>%
    mutate(Mean=flatten_dbl(Mean))
means_orig_means <- flatten_dbl(means_orig)
time2 <- Sys.time()

orig_time <- time2-time1

print(sprintf('Original pipeline: %.2f',orig_time))
head(means_orig, 20)

for (n_workers in seq(1,4)) {
    time1 <- Sys.time()
    means <- chapter3_pipeline_parallel(n_means=100000, n_workers=n_workers) %>%
        mutate(Mean=flatten_dbl(Mean))
    time2 <- Sys.time()
    print(sprintf('Time taken by %d workers: %.2f Speedup was: %.2f', n_workers, time2 - time1, orig_time/as.double(time2-time1)))
    print(sprintf('Maximum difference between calculated means: %f', max(abs(means['Mean']-means_orig['Mean']))))
}

[1] "Original pipeline: 11.92"


Year,Mean
2010,12.97936
2011,14.04265
2012,10.66918
2013,13.41251
2014,14.03964
2015,11.74544
2016,13.54507
2017,11.97751
2018,13.27919
2019,13.69971


[1] "Time taken by 1 workers: 12.42 Speedup was: 0.96"
[1] "Maximum difference between calculated means: 0.003174"
[1] "Time taken by 2 workers: 7.49 Speedup was: 1.59"
[1] "Maximum difference between calculated means: 0.005813"
[1] "Time taken by 3 workers: 6.04 Speedup was: 1.97"
[1] "Maximum difference between calculated means: 0.005813"
[1] "Time taken by 4 workers: 5.47 Speedup was: 2.18"
[1] "Maximum difference between calculated means: 0.005813"


In [10]:
# Initiate profiler
profile_tempfile <- tempfile()
Rprof(profile_tempfile, memory.profiling=TRUE)

# Run the pipeline
glimpse(chapter3_pipeline(10000))

# Stop profiling
Rprof()

# Print top 20 function calls by cumulative time
summaryRprof(profile_tempfile, memory='both')['by.self']

# Remove profiling file
unlink(profile_tempfile)

Observations: 11
Variables: 2
$ Year <fct> 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020
$ Mean <list> [12.97623, 14.04605, 10.67142, 13.41688, 14.04602, 11.74686, 13…


Unnamed: 0,self.time,self.pct,total.time,total.pct,mem.total
"""sample.int""",0.62,40.79,0.64,42.11,345.4
"""mean""",0.4,26.32,1.38,90.79,791.4
"""sample""",0.22,14.47,0.86,56.58,484.3
"""mean.default""",0.1,6.58,0.12,7.89,79.9
"""length""",0.06,3.95,0.06,3.95,34.1
"""factor""",0.04,2.63,0.04,2.63,4.7
"""get_bootstrapped_means""",0.02,1.32,1.4,92.11,791.4
"""lazyLoadDBfetch""",0.02,1.32,0.02,1.32,2.8
"""names""",0.02,1.32,0.02,1.32,2.2
"""parent.env""",0.02,1.32,0.02,1.32,2.3
