# Distributed file loading and processing

In this short exercise we will use `DistributedArrays` to load and process four data files in parallel. This can be useful in a scenario where the total data doesn't fit into the memory of a single computer. The idea is to let different workers hold different batches of the data (one file per worker) and load and process the data in parallel on the workers.

(In the following, we will of course only mimic such a situation. Our workers will live on the same machine.)

## Preparation

### Create fake data

Before we turn to the actual tasks, we need some fake data. We will create four binary files holding 1 million random floating point numbers each.

In [11]:
using Serialization
for i in 1:4
    serialize("$i.bin", rand(1000000)) # 8 MB files
end

Let's check if the files have been created (`readdir()` lists all the files and folders of the current directory)

In [12]:
fp = filter(endswith("bin"), readdir())

4-element Array{String,1}:
 "1.bin"
 "2.bin"
 "3.bin"
 "4.bin"

### Start worker processes

Let's get some worker processes which use the same Julia environment as our main session (the environment of the workshop).

In [13]:
using Distributed, Pkg

In [14]:
withenv("JULIA_PROJECT"=>joinpath(pwd(),"../..")) do
    addprocs(4)
end

4-element Array{Int64,1}:
 6
 7
 8
 9

In [15]:
@everywhere using DistributedArrays, Serialization

## Tasks

1. Distribute the array of file names (see the array `fp` above) across the workers. Afterwards, apply the `deserialize` function to load the (fake) data from the four data files in parallel.

2. Convince yourself, that the resulting array, i.e. the loaded data, is distributed equally between the workers (each worker holds the numbers of a single data file).

3. Our goal is to compute the sum of the cosines of all the numbers. First compute the partial sums of the cosines on the workers in parallel (each worker processes the batch of numbers it holds). Afterwards compute the sum of the partial sums on the master.

In [16]:
fp

4-element Array{String,1}:
 "1.bin"
 "2.bin"
 "3.bin"
 "4.bin"

In [17]:
dfp = distribute(fp)

4-element DArray{String,1,Array{String,1}}:
 "1.bin"
 "2.bin"
 "3.bin"
 "4.bin"

In [18]:
xs = map(deserialize, dfp);

In [19]:
# Task 2
@show typeof(xs)
@everywhere println(localindices($xs))

typeof(xs) = DArray{Array{Float64,1},1,Array{Array{Float64,1},1}}
(1:0,)
      From worker 8:	(3:3,)
      From worker 6:	(1:1,)
      From worker 7:	(2:2,)
      From worker 9:	(4:4,)


In [20]:
result_dist = map(x->sum(cos.(x)), xs)

4-element DArray{Float64,1,Array{Float64,1}}:
 841325.5329400531
 841505.2839542432
 841459.3108902681
 841546.2474295361

In [21]:
@show typeof(result_dist)

typeof(result_dist) = DArray{Float64,1,Array{Float64,1}}


DArray{Float64,1,Array{Float64,1}}

In [None]:
# Task 3
result_dist = map(x->sum(cos.(x)), xs)
@show typeof(result_dist)
sum(result_dist)

In [22]:
sum(result_dist)

3.3658363752141003e6