# Astro 528, Lab 7, Exercise 2
## Parallelization for Cluster using Distributed-Memory Model

In this lab exercise, we'll perform the calculations very similar to those of exercise 2 of lab 5.  However, instead of using a multi-core workstation, we will [run the calculations on the ICDS Roar ACI-B cluster](https://ics.psu.edu/computing-services/ics-aci-user-guide/#07-00-running-jobs-on-aci-b) using a distributed memory model.  

You're welcome to run this notebook one cell at a time as in other labs to see how it works.  However, the main point of this lab is to see how to run such a calculation in parallel over multiple processor cores that are not necessarily on the same processor.  Therefore, you'll use the command line interface to submit the jobs [ex2.pbs](https://github.com/PsuAstro528/lab6-start/blob/main/ex2.pbs).  Follow the instructions in the [lab's README](https://github.com/PsuAstro528/lab6-start/blob/main/README.md)

In [1]:
# Just in case you're running this from JupyterLab
using Pkg
Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true   
Pkg.activate(".")
# Shouldn't need to instantiate the project, since 
# you'll have already run as part of exercise 1.
# Pkg.instantiate()

[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`


Remember, packages are installed to disk and the Roar nodes all see the same file systems, so you only want to run `instantiate` once on the delegator process.

### Getting setup for parallel computation

We'll load the CSV and DataFrames packages that only need to be loaded by the delegator process.
Then we'll load Julia's `Distributed` module that provides much of the needed functionality.

In [2]:
using CSV, DataFrames
using Distributed
println("# Julia is using ", nworkers(), " workers.")

# Julia is using 1 workers.


In [26]:
# Uncomments the line below, if you're running manually in a notebook and want to test with multiple processors.
if haskey(ENV,"PBS_NODEFILE") && (nworkers() == 1)
    procs_assigned = countlines(ENV["PBS_NODEFILE"])
    addprocs(procs_assigned)
end
println("# Now Julia is using ", nworkers(), " workers.")

# Now Julia is using 3 workers.


### Loading Packages and modules

Now, we can start loading other packages that we'll be using.  Since we want the packages to be in scope on each worker, then we need to add the @everywhere macro in front of the using statement.  Before we can do that we need to activate the project on each of the workers, so they know that they can use the packages in Project.toml.  We don't need to install or instantiate on each worker, since those write files to disk and all the workers have access to the same filesystem.

In [4]:
@everywhere using Pkg
@everywhere Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true   # Tell Julia package manager not to download an updated registry for
@everywhere Pkg.activate(".")

[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`


      From worker 3:	[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`
      From worker 2:	[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`
      From worker 5:	[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`
      From worker 4:	[32m[1m  Activating[22m[39m environment at `/storage/home/e/ebf11/Teach/Astro528/Fall2021/lab7-dev/Project.toml`


In [5]:
@everywhere using Distributions
@everywhere using DistributedArrays

For this lab, I've written several functions that will be used to generate simulated spectra.  This serves a couple of purposes.
First, you'll use the code in the exercise, so you have a calculation that's big enough to be worth parallelizing.  For the purposes of this exercise, it's not essential that you review the code I provided in `.jl` files.  However, the second purpose of providing this is to demonstrate several of the programming patterns that we've discussed in class.  For example, the code in the `lab7` module
- is in the form of several small functions, each which does one specific task.  
- has been moved out of the Jupyter notebook and into `.jl` files in the `src` directory.
- creates objects to represent spectra and a convolution kernel.
- uses [abstract types](https://docs.julialang.org/en/v1/manual/types/#Abstract-Types-1) and [parametric types](https://docs.julialang.org/en/v1/manual/types/#Parametric-Types-1), so as to create type-stable functions. 
- has been  put into a Julia [module](https://docs.julialang.org/en/v1/manual/modules/index.html), so that it can be easily loaded and so as to limit potential for namespace conflicts.

You don't need to read all of this code right now.  But, when you're writing code for your class project, you're likely to want to make use of some of these same programming patterns. So, it may be useful to refer back to this code later to help see examples of how to apply these design patterns in practice.  
        
For now, let's include just the file that has the code for the `lab7` module.  `src/lab7.jl` includes the code from the other files.  We'll preface it with `@everywhere`, since we want all of the processors to be able to make use of these function and types.

In [6]:
@everywhere include("src/lab7.jl")

Now, we'll bring that module into scope.  Note that since this is not a package, we need to include a `.` to tell Julia that it can the module in the current namespace, rather than needing to load a package.

In [7]:
@everywhere using .lab7

### Initialize data to be analyzed
In this exercise, we're going to create a model spectrum consisting of continuum, stellar absorption lines, telluric absorption lines.  
The `lab7` module provides a `SimulatedSpectrum` type, but we'll need to initialize a variable with some specific parameter values.  The function does that for us.

In [8]:
"Create an object that provides a model for the raw spetrum (i.e., before entering the telescope)"
function make_spectrum_object(;lambda_min = 4500, lambda_max = 7500, flux_scale = 1.0,
        num_star_lines = 200, num_telluric_lines = 100, limit_line_effect = 10.0)

    continuum_param = flux_scale .* [1.0, 1e-5, -2e-8]
    
    star_line_locs = rand(Uniform(lambda_min,lambda_max),num_star_lines)
    star_line_widths = fill(1.0,num_star_lines)
    star_line_depths = rand(Uniform(0,1.0),num_star_lines)
    
    telluric_line_locs = rand(Uniform(lambda_min,lambda_max),num_telluric_lines)
    telluric_line_widths = fill(0.2,num_telluric_lines)
    telluric_line_depths = rand(Uniform(0,0.4),num_telluric_lines)

    SimulatedSpectrum(star_line_locs,star_line_widths,star_line_depths,telluric_line_locs,telluric_line_widths,telluric_line_depths,continuum_param=continuum_param,lambda_mid=0.5*(lambda_min+lambda_max),limit_line_effect=limit_line_effect)
end

make_spectrum_object

Next, we: 
1. create a set of wavelengths to observe the spectrum at, 
2. call the function above to create a spectrum object, 
3. create an object containing a model for the point spread function, and 
4. create an object that can compute the convolution of our spectral model with the point spread function model.

In [9]:
# 1.  Pick range of of wavelength to work on.
lambda_min = 4500
lambda_max = 7500
# You may want to adjust the num_lambda to make things more/less computationally intensive
num_lambda = 128*1024
lambdas = collect(range(lambda_min,stop=lambda_max, length=num_lambda));

# 2.  Create a model  spectrum that we'll analyze below
raw_spectrum = make_spectrum_object(lambda_min=lambda_min,lambda_max=lambda_max)

# 3.  Create a model for the point spread function (PSF)
psf_widths  = [0.5, 1.0, 2.0]
psf_weights = [0.8, 0.15, 0.05]
psf_model = GaussianMixtureConvolutionKernel(psf_widths,psf_weights)

# 4. Create a model for the the convolution of thte raw spectrum with the PDF model
conv_spectrum = ConvolvedSpectrum(raw_spectrum,psf_model)

ConvolvedSpectrum{SimulatedSpectrum{Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64}, GaussianMixtureConvolutionKernel{Float64, Float64}}(SimulatedSpectrum{Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64}([7123.2510334783165, 5483.477760805866, 6002.334435093768, 5767.2602747386745, 4557.4841698580585, 7203.671475375378, 7188.214672625796, 6182.495092613661, 6756.312359577489, 5079.533171064807  …  5950.613911593, 7215.595997924973, 5644.09105598876, 4861.282685818973, 6265.536872447527, 4603.201599635212, 5736.856545764558, 5415.761778383482, 4535.143387027149, 4921.1540830707545], [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0  …  1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], [0.9626911949875838, 0.8582260649562032, 0.09909292288325733, 0.49778039972922183, 0.9307830758410467, 0.5796482138470056, 0.7969162316211942, 0.00038276100656609024, 0.3319896557249724, 0.6473685409780343  …  0.46940150269793524, 0.59154387

### Benchmarking serial code

*If the current job has just one worker process,* then we'll benchmark the calculation of this spectrum on a single processor.  Based on performance results from lab 5, we'll make use of Julia's [dot syntax](https://docs.julialang.org/en/v1/manual/functions/#man-vectorized-1) to ["broadcast" and "fuse"](https://docs.julialang.org/en/v1/base/arrays/#Broadcast-and-vectorization-1) the array operation.    We'll run it just a few times (rather than using `@btime`).
Since the rest of the notebook is meant for distributed computing, we'll tell it to exit here if there's only a single worker.  Before we do, we'll flush the buffer, so any messages are written to `STDOUT` before the kernel exits.|

In [10]:
num_runs = 3
if nworkers() == 1    
    for i in 1:num_runs @time conv_spectrum.(lambdas); end
    flush(stdout) # flush buffer before the kernel exits
    sleep(1)      # wait for one second just to make sure
    exit(0)       # Exit the script since completed serial calculation
end

If you're running this in a Jupyter notebook and got a message about the kernel dieing, don't worry.  We used the `exit` command to cause the julia kernel to exit.  If you're in the notebook server and want to keep going, then you can rerun the previous cells above, and either add some processors (e.g., `addprocs(4)`) 

### Distributed Arrays
Here, we want to spread the work over processor cores than are not necessarily on the same node, so we need to use a [distributed memory system](https://en.wikipedia.org/wiki/Distributed_memory).  This would be necessary if you wanted your job to run on more cores than are available on a single node.  It could also be useful if you wanted to increase the chances that the scheduler starts your job more quickly, since asking for several processors cores that don't need to be on the same node is easier to accommodate than asking for the same number of cores on a single node.  Here we'll use Julia's [DistributedArrays.jl](https://juliaparallel.github.io/DistributedArrays.jl/latest/index.html) package to make programming for distributed memory systems relatively easy.  

Here we'll create a distributed array by simply applying `distribute` to our existing array of wavelengths.  (Remember that we could initialize a `DArray` more efficiently be letting each workers initializes its own data.  For convenience functions like `dzeros`, `dones`, `drand`, `drandn` and `dfill` act similarly to their counterparts without a `d` prefix, but create DArrays instead of regular Arrays.) 

In [11]:
lambdas_dist = distribute(lambdas);

As usual, the first time we call a function, it takes some extra time and memory to compile it.  So let's do that again, this time benchmarking the `distribute` operation.

In [12]:
println("# Timing calls to distribute.")
@time lambdas_dist = distribute(lambdas);

# Timing calls to distribute.
  0.001078 seconds (546 allocations: 1.025 MiB)


Below we will apply `map` to a `lambda_dist`, which is a `DArray`.  `map` will parallelize the calculation and return the results in a `DArray`.  Each worker operates on the subset of the array that is local to that worker process. 

In [13]:
println("# Timing calls to map(...).")
for i in 1:num_runs 
    @time map(conv_spectrum,lambdas_dist) 
end

# Timing calls to map(...).
  9.338440 seconds (1.99 M allocations: 124.369 MiB, 0.35% gc time, 3.58% compilation time)
  6.446841 seconds (1.01 k allocations: 85.938 KiB)
  6.385849 seconds (995 allocations: 85.031 KiB)


In the cell above, we left the results with the workers, rather than copying the results back to the delegator process.
In the cell below, we will call `collect` the data in order to copy all the data back to to the delegator process.  

In [14]:
println("# Timing calls to collect(map(...)).")
for i in 1:num_runs 
    @time collect(map(conv_spectrum,lambdas_dist)) 
end

# Timing calls to collect(map(...)).
 16.337607 seconds (8.44 M allocations: 291.844 MiB, 0.59% gc time, 1.21% compilation time)
 15.814595 seconds (7.90 M allocations: 259.293 MiB, 0.34% gc time)
 15.987923 seconds (7.90 M allocations: 259.293 MiB, 0.30% gc time)


Copying all that data back for the delegator process to access added a significant amount to the total time.
Sometimes you don't actually need to bring all the data back to the delegator process.  For example, you might have several calculations that can be done, each leaving the data distributed across many workers, until the very end.
Another common scenario is that you want to performing a task that can be phrased as a [`mapreduce`](https://en.wikipedia.org/wiki/MapReduce) programming pattern.  For example, imagine that we only wanted to compute the total flux over a filter band.  Then we could use code like the following to reduce the amount of communications overhead.

In [15]:
# Send the value lambda_min to each of the workers
for p in workers() remotecall_fetch(()->lambda_min, p); end

# Define a function on each of the workers
@everywhere is_in_filter_band(x) = (lambda_min < x < lambda_min+100) ? one(x) : zero(x)

# Run mapreduce, summing the product of the convolved spectrum and the filter's weight at each wavelength
mapreduce(x->is_in_filter_band(x)*conv_spectrum(x), +, lambdas_dist);

In [16]:
println("# Timing calls to mapreduce.")
for i in 1:num_runs 
    @time mapreduce(x->is_in_filter_band(x)*conv_spectrum(x), +, lambdas_dist) 
end

# Timing calls to mapreduce.
  6.537715 seconds (229.88 k allocations: 14.220 MiB, 1.66% compilation time)
  6.360973 seconds (647 allocations: 30.016 KiB)
  6.370328 seconds (645 allocations: 29.672 KiB)


### Write output (without interfering with output from other jobs)
Below, we'll demonstrate how to write the results to a file, making sure that different jobs don't overwrite each other

In [17]:
results = collect(map(conv_spectrum,lambdas_dist)) 

output_filename = "ex2_out.csv"
if haskey(ENV,"PBS_JOBID")         # see if PBS_JOBID is among environment variables
  m = match(r"^(\d+\[?\d*\]?)\.",ENV["PBS_JOBID"])  # 
  if m != nothing               
      jobid_num = m.captures[1]
      output_filename = "ex2_out_" * jobid_num * ".csv"
  end
end
CSV.write(output_filename,DataFrame(lambda=lambdas,result=results))

"ex2_out_31282870.csv"

## Benchmarking performance versus number of workers
To see how the performance scales on the number of workers, we'll gradually remove worker processes and compare the performance of one of our functions to compute the mean squared error between two spectra from Lab 6, exercise 2.

In [18]:
@everywhere function calc_mse_mapreduce(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum,  v::Number)
    c = lab7.speed_of_light
    z = v/c 
    spec2_shifted = doppler_shifted_spectrum(spec2,z)
    mse = mapreduce(x->(spec1(x)-spec2_shifted(x))^2, +, lambdas)
    mse /= length(lambdas)
end

In [19]:
# Force function to compile
@time calc_mse_mapreduce(lambdas_dist, conv_spectrum, conv_spectrum, 10.0) 

 13.385179 seconds (2.48 M allocations: 147.236 MiB, 0.25% gc time, 3.99% compilation time)


4.651876722818536e-10

In [20]:
num_workers_all = nworkers()
for nw in num_workers_all:-1:1
    println("# Timing calls to mapreduce on DArray with $nw workers.")
    @time calc_mse_mapreduce(lambdas_dist, conv_spectrum, conv_spectrum, 10.0) 
    if nw > 1
        rmprocs(last(workers()))            # Remove one worker
        lambdas_dist = distribute(lambdas)  # Redistribute wavelengths over remaining workers
    end
end

# Timing calls to mapreduce on DArray with 4 workers.
 12.765890 seconds (1.11 k allocations: 89.297 KiB)
# Timing calls to mapreduce on DArray with 3 workers.
 16.429501 seconds (1.08 k allocations: 76.000 KiB)
# Timing calls to mapreduce on DArray with 2 workers.
 23.167735 seconds (1.14 k allocations: 65.141 KiB)
# Timing calls to mapreduce on DArray with 1 workers.
 46.522344 seconds (1.72 k allocations: 69.672 KiB)


In [21]:
println("# Timing calls to mapreduce (serial).")
@time calc_mse_mapreduce(lambdas, conv_spectrum, conv_spectrum, 10.0) 

# Timing calls to mapreduce (serial).
 43.114487 seconds (119.16 M allocations: 8.323 GiB, 3.39% gc time, 2.77% compilation time)


4.651876722818536e-10