# Working with DataFrames.jl beyond CSV files

# Part 2: Using Parquet for data larger than RAM

## Bogumił Kamiński
### June 25, 2023

What is covered in part 2:
* how to iteratively create Parquet data store that jointly has more data than available RAM
* how to manually process such data on a single machine (notebook-oriented process)

## Setup

In [1]:
using DataFrames

In [2]:
using Parquet2

In [3]:
using Random

In [4]:
using Statistics

In [5]:
using StatsBase

## Generate some large data

In [6]:
isdir("pq_experiment") && rm("pq_experiment"; recursive=true)

false

In [7]:
mkdir("pq_experiment")

"pq_experiment"

In [8]:
Random.seed!(1234);

Create 500 groups (range `0.0:0.002:1.0`) of data, each having $2^{20}$ = 1,048,576 rows and two `Float64` columns (I could have made it larger, but this should be enough as an example).

In [9]:
let # create local scope for more consistent variable scoping behavior and avoid temporary variable leakage
    i = 1
    df = DataFrame() # temporary data frame to store intermediate results
    maxsize = 10^8 # define size of one chunk of data written to disk
    for μ in 0.0:0.002:1.0
        result = DataFrame(mu=μ, x=randn(2^20) .+ μ)
        append!(df, result) # keep appending data from partial simulations
        if nrow(df) > maxsize # if our data gets to big dump it to a consecutive file
            @info "writing file #$i"
            Parquet2.writefile("pq_experiment/experiment_$i.parquet", @view df[1:maxsize, :])
            deleteat!(df, 1:maxsize) # drop data stored in a file
            i += 1
        end
    end
    if nrow(df) > 0 # if we have some unsaved data store it now
        @info "writing file #$i"
        Parquet2.writefile("pq_experiment/experiment_$i.parquet", df)
    end
end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #1
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #2
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #3
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #4
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #5
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mwriting file #6


[34m✏ [39mParquet2.FileWriter{IOStream}(pq_experiment/experiment_6.parquet)

Note that reading the file is lazy. Actual data is not read yet:

In [10]:
pq_experiment = Parquet2.readfile("pq_experiment", load_initial=true)

[34m≔ [39mParquet2.Dataset (837645573 bytes)
	1. [33m"mu"[39m: [36mFloat64[39m
	2. [33m"x"[39m: [36mFloat64[39m


We have six chunks of data (each corresponding to one file, as we did not create row groups within files):

In [11]:
length(pq_experiment)

6

In [12]:
Parquet2.filelist(pq_experiment)

6-element Vector{FilePathsBase.WindowsPath}:
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_1.parquet"
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_2.parquet"
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_3.parquet"
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_4.parquet"
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_5.parquet"
 p"C:/WORK/dev/DataFramesTutorials/JuliaCon2023-Tutorial/pq_experiment/experiment_6.parquet"

Note that the last file has less rows than the rest:

In [13]:
nrow.(pq_experiment)

6-element Vector{Int64}:
 100000000
 100000000
 100000000
 100000000
 100000000
  25336576

The challenge we have in this dataset is that the same values of keys (`mu` column) are split across multiple files.

Assume we want to get a mean over all keys. We need to do it in two steps.

This is a standard map-reduce pattern. In this tutorial we perform both steps manually on a single node:

In [14]:
agg1 = map(enumerate(pq_experiment)) do (i, rowset)
    @info "processing chunk of data #$i"
    df = DataFrame(rowset, copycols=false)
    gdf = groupby(df, :mu)
    return combine(gdf, :x => mean, nrow)
end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #1
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #2
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #3
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #4
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #5
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mprocessing chunk of data #6


6-element Vector{DataFrame}:
 [1m96×3 DataFrame[0m
[1m Row [0m│[1m mu      [0m[1m x_mean       [0m[1m nrow    [0m
     │[90m Float64 [0m[90m Float64      [0m[90m Int64   [0m
─────┼────────────────────────────────
   1 │   0.0    -0.000165159  1048576
   2 │   0.002   0.00181935   1048576
   3 │   0.004   0.00547356   1048576
   4 │   0.006   0.0053043    1048576
   5 │   0.008   0.00748101   1048576
   6 │   0.01    0.0105489    1048576
   7 │   0.012   0.0113697    1048576
   8 │   0.014   0.0133716    1048576
   9 │   0.016   0.0163869    1048576
  10 │   0.018   0.0192935    1048576
  11 │   0.02    0.0198156    1048576
  ⋮  │    ⋮          ⋮           ⋮
  87 │   0.172   0.171104     1048576
  88 │   0.174   0.175584     1048576
  89 │   0.176   0.176103     1048576
  90 │   0.178   0.178837     1048576
  91 │   0.18    0.182103     1048576
  92 │   0.182   0.182695     1048576
  93 │   0.184   0.181319     1048576
  94 │   0.186   0.186327     1048576
  95 │   0.188

In [15]:
agg2 = reduce(vcat, agg1)

Row,mu,x_mean,nrow
Unnamed: 0_level_1,Float64,Float64,Int64
1,0.0,-0.000165159,1048576
2,0.002,0.00181935,1048576
3,0.004,0.00547356,1048576
4,0.006,0.0053043,1048576
5,0.008,0.00748101,1048576
6,0.01,0.0105489,1048576
7,0.012,0.0113697,1048576
8,0.014,0.0133716,1048576
9,0.016,0.0163869,1048576
10,0.018,0.0192935,1048576


In [16]:
agg3 = combine(groupby(agg2, :mu)) do sdf
    return (; x_mean = mean(sdf.x_mean, Weights(sdf.nrow)))
end

Row,mu,x_mean
Unnamed: 0_level_1,Float64,Float64
1,0.0,-0.000165159
2,0.002,0.00181935
3,0.004,0.00547356
4,0.006,0.0053043
5,0.008,0.00748101
6,0.01,0.0105489
7,0.012,0.0113697
8,0.014,0.0133716
9,0.016,0.0163869
10,0.018,0.0192935


Other common possible scenarios:

* using multiple-threads for in-core data
* using several machines in a cluster

What I presented above gives full flexibility, but requires manual handling of reduction.

For common operations [DTables.jl](https://github.com/JuliaParallel/DTables.jl) provides distributed table structures and data manipulation operations built on top of Dagger.jl.

In part 3 we discuss some limitations of Parquet format that one needs to keep in mind when working with it.

*Preparation of this worksop has been supported by the Polish National Agency for Academic Exchange under the Strategic Partnerships programme, grant number BPI/PST/2021/1/00069/U/00001.*

![SGH & NAWA](logo.png)