# EP2 - Cálculo do Conjunto de Mandelbrot em Paralelo com CUDA e OpenMPI

30 de Junho de 2020

## Membros do grupo

| Nome | NUSP |
|------|------|
| Carolina Marques | 10737101 |
| Daniela Favero | 10277443 |
| Miguel Ostrowski | 10723610 |
| Raphael Ribeiro | 10281601 |

## Pacotes Julia

Instalando os pacotes necessários que estão listados no arquivo `Project.toml`:

In [29]:
] up



[32m[1m   Updating[22m[39m registry at `~/.julia/registries/General`
[32m[1m   Updating[22m[39m git-repo `https://github.com/JuliaRegistries/General.git`
[32m[1m   Updating[22m[39m `~/Documentos/USP/concorrente-e-paralela/EP2/Project.toml`
[90m [no changes][39m
[32m[1m   Updating[22m[39m `~/Documentos/USP/concorrente-e-paralela/EP2/Manifest.toml`
[90m [no changes][39m


Verificando o status dos pacotes:

In [30]:
] st

[32m[1mStatus[22m[39m `~/Documentos/USP/concorrente-e-paralela/EP2/Project.toml`
 [90m [336ed68f][39m[37m CSV v0.6.2[39m
 [90m [a93c6f00][39m[37m DataFrames v0.21.2[39m
 [90m [31c24e10][39m[37m Distributions v0.23.4[39m
 [90m [7073ff75][39m[37m IJulia v1.21.2[39m
 [90m [8314cec4][39m[37m PGFPlotsX v1.2.7[39m
 [90m [1a8c2f83][39m[37m Query v0.12.2[39m
 [90m [f3b207a7][39m[37m StatsPlots v0.14.6[39m


## Compilando

Compilando e executando os códigos C pelo *modo shell*: 


In [3]:
; make all

make: Nothing to be done for 'all'.


In [4]:
; ./Cuda/mandelbrot_cuda -0.188 -0.012 0.554 0.754 4096 4 256

31.371680


Além de imprimir o tempo levado, o programa em C gera a seguinte imagem (gerada no formato ppm, tomamos liberdade para convertê-la para png e exibir aqui):

<img src="output_cuda.png" alt="mandelbrot_cuda" width="400"/>

In [5]:
; mpirun --host localhost:4 ./MPI/mandelbrot_openmpi -0.188 -0.012 0.554 0.754 4096

7.629372


Imagem gerada, novamente a Triple Spiral Valley, dessa vez usando 4 processos com auxílio de OpenMPI:

<img src="output_openmpi.png" alt="mandelbrot_openmpi" width="400"/>

## Experimentos

### Funções úteis

A função abaixo recebe parâmetros `size`, com tamanho da imagem, `file`, com o nome do programa a ser executado e `threads`, com o número de threads do programa paralelo. A função executa o programa `file` com os parâmetros dados e devolve um `DataFrame` com os resultados.

In [38]:
using DataFrames, Query, StatsPlots, Statistics

function measure_mandelbrot(size, file; processes=1, threads=1, blocks=1)
    if file == "MPI/mandelbrot_openmpi"
        results = parse.(Float64,
            split(chomp(read(`mpirun --host localhost:$processes $file -2.5 1.5 -2.0 2.0 $size`, String)), ", "))
        return DataFrame(size = size,
            file = file,
            processes = processes, 
            duration = results[1])
    elseif file == "Cuda/mandelbrot_cuda"
        results = parse.(Float64,
            split(chomp(read(`./$file -2.5 1.5 -2.0 2.0 $size $blocks $threads`, String)), ", "))
        return DataFrame(size = size,
            file = file,
            threads = threads,
            blocks = blocks,
            duration = results[1])
    elseif file == "MPI+CUDA/mandelbrot_openmpi+cuda"
        results = parse.(Float64,
            split(chomp(read(`mpirun --host localhost:$processes $file -2.5 1.5 -2.0 2.0 $size $blocks $threads`, String)), ", "))
        return DataFrame(size = size,
            file = file,
            processes = processes, 
            threads = threads,
            blocks = blocks,
            duration = results[1])
    elseif file == "MPI+OMP/mandelbrot_openmpi+omp"
        results = parse.(Float64,
            split(chomp(read(`mpirun --host localhost:$processes $file -2.5 1.5 -2.0 2.0 $size $threads`, String)), ", "))
        return DataFrame(size = size,
            file = file,
            processes = processes, 
            threads = threads,
            duration = results[1])
    else
        results = parse.(Float64,
            split(chomp(read(`./$file -2.5 1.5 -2.0 2.0 $size $threads`, String)), ", "))
        return DataFrame(size = size,
            file = file,
            threads=threads,
            duration = results[1])
    end
end

measure_mandelbrot (generic function with 1 method)

##TODO: Explicar melhor

A função `run_experiments` recebe os mesmos parâmetros `size`, `file`, e `threads`, e um parâmetro adicional `repetitions`, com o número de repetições de cada experimento com um dado número de `threads`. A função devolve um `DataFrame` com todos os experimentos.

Função adaptada para cuda

In [39]:
using DataFrames
function run_experiments(size, file, threads, repetitions)
    run(`make all`)
    
    results = DataFrame(size = Int[],
        file = String[],
        threads = Int[],
        duration = Float64[])  
    
    for t in threads
        for r in 1:repetitions
            append!(results,
                measure_mandelbrot(size, file, threads=t))    
        end
    end
    return results
end

run_experiments (generic function with 1 method)

In [40]:
function run_experiments_cuda(size, file, threads, blocks, repetitions)
    run(`make all`)
    
    results = DataFrame(size = Int[],
        file = String[],
        threads = Int[],
        blocks = Int[],
        duration = Float64[])  
    
    for t in threads
        for b in blocks
            for r in 1:repetitions
                append!(results,
                    measure_mandelbrot(size, file, threads=t, blocks=b)) 
            end
        end
    end
    return results
end

run_experiments_cuda (generic function with 1 method)

In [41]:
function run_experiments_mpi(size, file, processes)
    run(`make all`)
    
    results = DataFrame(size = Int[],
        file = String[],
        processes = Int[],
        duration = Float64[])  
    
    for p in processes
        for r in 1:repetitions
            append!(results,
                measure_mandelbrot(size, file, processes=p)) 
        end
    end
    return results
end

run_experiments_mpi (generic function with 1 method)

In [42]:
function run_experiments_mpicuda(size, file, processes, threads, blocks)
    run(`make all`)
    
    results = DataFrame(size = Int[],
        file = String[],
        processes = Int[],
        threads = Int[],
        blocks = Int[],
        duration = Float64[])  
    
    for p in processes
        for t in threads
            for b in blocks
                for r in 1:repetitions
                    append!(results,
                        measure_mandelbrot(size, file, processes=p, threads=t, blocks=b)) 
                end
            end
        end
    end
    return results
end

run_experiments_mpicuda (generic function with 1 method)

In [43]:
function run_experiments_mpiomp(size, file, processes, threads)
    run(`make all`)
    
    results = DataFrame(size = Int[],
        file = String[],
        processes = Int[],
        threads = Int[],
        duration = Float64[])  
    
    for p in processes
        for t in threads
            for r in 1:repetitions
                append!(results,
                    measure_mandelbrot(size, file, processes=p, threads=t)) 
            end
        end
    end
    return results
end

run_experiments_mpiomp (generic function with 1 method)

A função `parse_results` recebe um `DataFrame` de resultados, e produzido pela função `run_experiments`. A função devolve um `DataFrame` com a média e o intervalo de confiança da média a 95% dos tempos de execução, agrupados por número de threads.

In [44]:
function parse_results_threads(results)
    parsed_results = results |>
                    @groupby(_.threads) |>
                    @map({threads = key(_),
                          mean_duration = mean(_.duration),
                          ci_duration = 1.96 * std(_.duration)}) |>
                    DataFrame
    
    return parsed_results
end

parse_results_threads (generic function with 1 method)

In [45]:
function parse_results_processes(results)
    parsed_results = results |>
                    @groupby(_.processes) |>
                    @map({processes = key(_),
                          mean_duration = mean(_.duration),
                          ci_duration = 1.96 * std(_.duration)}) |>
                    DataFrame
    
    return parsed_results
end

parse_results_processes (generic function with 1 method)

In [46]:
function parse_results_blocks(results)
    parsed_results = results |>
                    @groupby(_.blocks) |>
                    @map({blocks = key(_),
                          mean_duration = mean(_.duration),
                          ci_duration = 1.96 * std(_.duration)}) |>
                    DataFrame
    
    return parsed_results
end

parse_results_blocks (generic function with 1 method)

A função `save_csv_results`recebe um `DataFrame` e um nome de arquivo, e escreve o `DataFrame` em disco, no formato `.csv`, com o nome passado no argumento. A função `read_csv_results` recebe um nome de arquivo e lê o arquivo correspondente, devolvendo um `DataFrame`.

In [47]:
using CSV

function save_csv_results(data_frame, file)
    CSV.write(file, data_frame)
end

save_csv_results (generic function with 1 method)

In [48]:
using CSV

function read_csv_results(file)
    return CSV.read(file)
end

read_csv_results (generic function with 1 method)

A função `plot_results` faz dois tipos de gráfico: ela pode mostrar todos os resultados de um experimento, marcando pontos no plano cartesiano; e também pode mostrar o resultado dado pela média e traçar uma linha vertical delimitando o intervalo de confiança do conjunto de amostras recebido.

In [49]:
pgfplotsx()

function plot_results(x, y, xlabel, ylabel; yerror=[], max_block_power=7)
    if yerror != []
        p = scatter(x,
            y,
            xaxis = :log2,
            xlabel = xlabel,
            xticks = [2 ^ x for x in 0:max_block_power],
            yerror = yerror,
            alpha = 0.6,
            labels = ylabel,
            legend = :topright)
    else
        p = scatter(x,
            y,
            xaxis = :log2,
            xlabel = xlabel,
            xticks = [2 ^ x for x in 0:max_block_power],
            alpha = 0.6,
            labels = ylabel,
            legend = :topright)
    end
    return p
end

plot_results (generic function with 1 method)

### Sequencial

Realizando os experimentos rodando a célula abaixo.

In [50]:
threads = 1
file = "Sequential/mandelbrot_seq"
size = 4096
repetitions = 15

results = run_experiments(size, file, threads, repetitions)

save_csv_results(results, "results_seq.csv")

make: Nothing to be done for 'all'.


"results_seq.csv"

In [51]:
results_seq = read_csv_results("results_seq.csv")

Unnamed: 0_level_0,size,file,threads,duration
Unnamed: 0_level_1,Int64,String,Int64,Float64
1,4096,Sequential/mandelbrot_seq,1,2.33688
2,4096,Sequential/mandelbrot_seq,1,2.32141
3,4096,Sequential/mandelbrot_seq,1,2.34468
4,4096,Sequential/mandelbrot_seq,1,2.35436
5,4096,Sequential/mandelbrot_seq,1,2.34227
6,4096,Sequential/mandelbrot_seq,1,2.36013
7,4096,Sequential/mandelbrot_seq,1,2.35207
8,4096,Sequential/mandelbrot_seq,1,2.3578
9,4096,Sequential/mandelbrot_seq,1,2.35791
10,4096,Sequential/mandelbrot_seq,1,2.35993


Veja no gráfico:

### PThreads
Realizando os experimentos rodando a célula abaixo, variando em número de threads.

In [52]:
size = 4096
file = "Pthreads/mandelbrot_pth"
threads = [2 ^ x for x in 0:7]
repetitions = 15

results = run_experiments(size, file, threads, repetitions)
parsed_results = parse_results_threads(results)

save_csv_results(results, "CSV/results_pth_t.csv")
save_csv_results(parsed_results, "CSV/parsed_results_pth_t.csv")

make: Nothing to be done for 'all'.


"CSV/parsed_results_pth_t.csv"

In [53]:
results_pth_t = read_csv_results("CSV/results_pth_t.csv")
parsed_results_pth_t = read_csv_results("CSV/parsed_results_pth_t.csv")

Unnamed: 0_level_0,threads,mean_duration,ci_duration
Unnamed: 0_level_1,Int64,Float64,Float64
1,1,2.34531,0.0254834
2,2,1.1846,0.00571533
3,4,1.11951,0.014977
4,8,0.913057,0.0343542
5,16,0.638148,0.0605451
6,32,0.534529,0.0420827
7,64,0.498689,0.0302804
8,128,0.49348,0.0336628


In [54]:
results_pth_t

Unnamed: 0_level_0,size,file,threads,duration
Unnamed: 0_level_1,Int64,String,Int64,Float64
1,4096,Pthreads/mandelbrot_pth,1,2.33911
2,4096,Pthreads/mandelbrot_pth,1,2.33898
3,4096,Pthreads/mandelbrot_pth,1,2.32467
4,4096,Pthreads/mandelbrot_pth,1,2.35651
5,4096,Pthreads/mandelbrot_pth,1,2.35674
6,4096,Pthreads/mandelbrot_pth,1,2.3386
7,4096,Pthreads/mandelbrot_pth,1,2.33687
8,4096,Pthreads/mandelbrot_pth,1,2.34902
9,4096,Pthreads/mandelbrot_pth,1,2.36344
10,4096,Pthreads/mandelbrot_pth,1,2.36032


Veja no gráfico:

In [57]:
plot_results(results_pth_t.threads,
    results_pth_t.duration,
    "Threads",
    "Duration",
    max_block_power = 7)

MethodError: MethodError: Cannot `convert` an object of type Pair{String,RGBA{Float64}} to an object of type OrderedCollections.OrderedDict{Any,Any}
Closest candidates are:
  convert(::Type{OrderedCollections.OrderedDict{K,V}}, !Matched::OrderedCollections.OrderedDict{K,V}) where {K, V} at /home/locus/.julia/packages/OrderedCollections/P6ntV/src/ordered_dict.jl:113
  convert(::Type{OrderedCollections.OrderedDict{K,V}}, !Matched::AbstractDict) where {K, V} at /home/locus/.julia/packages/OrderedCollections/P6ntV/src/ordered_dict.jl:99
  convert(::Type{T}, !Matched::T) where T<:AbstractDict at abstractdict.jl:486
  ...

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

### OpenMP
Realizando os experimentos rodando a célula abaixo, variando em número de threads.

In [37]:
size = 4096
file = "OMP/mandelbrot_omp"
threads = [2 ^ x for x in 0:7]
repetitions = 1

results = run_experiments(size, file, threads, repetitions)
parsed_results = parse_results_threads(results)

save_csv_results(results, "CSV/results_omp_t.csv")
save_csv_results(parsed_results, "CSV/parsed_results_omp_t.csv")

make: Nothing to be done for 'all'.


UndefVarError: UndefVarError: Dataframe not defined

Veja no gráfico:

In [None]:
plot_results(results.threads,
    results.duration,
    "Threads",
    "Duration",
    max_thread_power = 7)

In [None]:
plot_results(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

### CUDA
Realizando os experimentos rodando a célula abaixo, variando em número de blocos e mantendo numero de threads por bloco em 256

In [None]:
size = 4096
file = "Cuda/mandelbrot_cuda"
blocks = [2 ^ x for x in 0:6]
threads = 256
repetitions = 3

results = run_experiments_cuda(size, file, threads, repetitions,blocks)
parsed_results = parse_results_threads(results)

save_csv_results(results, "CSV/results_cuda_blocks.csv")

In [None]:
plot_results(results.threads,
    results.duration,
    "Threads",
    "Duration",
    max_thread_power = 7)

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

Agora mantemos fixo numero de blocos igual a 4 e variamos o numero de threads por bloco

In [None]:
size = 4096
file = "CUDA/mandelbrot_cuda"
blocks = 4
threads = [2 ^ x for x in 0:6]
repetitions = 15

results = run_experiments(size, file, processes, blocks, threads, repetitions)
parsed_results = parse_results_threads(results)

save_csv_results(results, "CSV/results_cuda_threads.csv")

In [None]:
plot_results(results.threads,
    results.duration,
    "Threads",
    "Duration",
    max_thread_power = 7)

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

### OpenMPI
Realizando os experimentos rodando a célula abaixo, variando em número de processos.

In [None]:
size = 4096
file = "mandelbrot_openmpi"
processes = [2 ^ x for x in 0:6]
repetitions = 15

results = run_experiments(size, file, processes=processes, repetitions=repetitions)
parsed_results = parse_results_processes(results)

save_csv_results(results, "CSV/results_openmpi_t.csv")
results_omp_t = read_csv_results("CSV/results_openmpi_t.csv")

Veja no gráfico:

In [None]:
plot_results(results.processes,
    results.duration,
    "Processes",
    "Duration",
    max_thread_power = 6)

In [None]:
plot_results_thread(parsed_results.processes,
    parsed_results.mean_duration,
    "Processes",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 6)

### OpenMPI + OMP
Realizando os experimentos rodando a célula abaixo, variando em número de procesos, mantendo fixo numero de threads em 4

In [None]:
size = 4096
file = "MPI+OMP/mandelbrot_openmpi+omp"
processes = [2 ^ x for x in 0:6]
threads = 4
repetitions = 15

results = run_experiments(size, file, processes=processes, threads=threads, repetitions=repetitions)
parsed_results = parse_results_processes(results)

save_csv_results(results, "CSV/results_mpi+omp_processes.csv")
results_omp_t = read_csv_results("CSV/results_mpi+omp_processes.csv")

In [None]:
plot_results(results.processes,
    results.duration,
    "Processes",
    "Duration",
    max_thread_power = 6)

In [None]:
plot_results_thread(parsed_results.processes,
    parsed_results.mean_duration,
    "Processes",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 6)

Agora mantemos fixo o número de processes em 2 e variamos o número de threads

In [None]:
size = 4096
file = "MPI+OMP/mandelbrot_openmpi+omp"
processes = [2 ^ x for x in 0:6]
threads = [2 ^ x for x in 0:7]
repetitions = 15

results = run_experiments(size, file, processes=processes, threads=threads, repetitions=repetitions)
parsed_results = parse_results_threads(results)

save_csv_results(results, "CSV/results_mpi+omp_threads.csv")
results_omp_t = read_csv_results("CSV/results_mpi+omp_threads.csv")

In [None]:
plot_results(results.threads,
    results.duration,
    "Threads",
    "Duration",
    max_thread_power = 7)

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

### OpenMPI + CUDA
Realizando os experimentos rodando a célula abaixo, variando em número de processos, mantendo fixo numero de blocos igual a 4 e numero de threads por bloco em 512.

In [None]:
size = 4096
file = "MPI+Cuda/mandelbrot_openmpi+cuda"
processes = [2 ^ x for x in 0:6]
blocos = 4
threads = 512
repetitions = 15

results = run_experiments(size, file, processes=processes, threads=threads, repetitions=repetitions)
parsed_results = parse_results_processes(results)

save_csv_results(results, "CSV/results_mpi+cuda_processes.csv")
results_omp_t = read_csv_results("CSV/results_mpi+cuda_processes.csv")

In [None]:
plot_results(results.processes,
    results.duration,
    "Processes",
    "Duration",
    max_thread_power = 6)

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

Agora mantemos fixo o numero de processos em 2, numero de threads por bloco em 512, e variamos o numero de blocos

In [None]:
size = 4096
file = "MPI+Cuda/mandelbrot_openmpi+cuda"
processes = 2
blocos = [2 ^ x for x in 0:6]
threads = 512
repetitions = 15

results = run_experiments(size, file, processes=processes, threads=threads, repetitions=repetitions)
parsed_results = parse_results_blocks(results)

save_csv_results(results, "CSV/results_mpi+cuda_blocos.csv")

In [None]:
results_mpicuda_b = read_csv_results("CSV/results_mpi+cuda_blocos.csv")

In [None]:
plot_results(results_mpicuda_b.blocks,
    results_mpicuda_b.duration,
    "Blocks",
    "Duration",
    max_thread_power = 6)

In [None]:
plot_results_thread(parsed_results.threads,
    parsed_results.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results.ci_duration,
    max_thread_power = 7)

Agora mantemos fixo o numero de processos em 2, numero de blocos em 4, e variamos o numero de threads por bloco.

In [None]:
size = 4096
file = "MPI+Cuda/mandelbrot_openmpi+cuda"
processes = 2
blocos = 4
threads = [2 ^ x for x in 0:11]
repetitions = 15

results = run_experiments(size, file, processes=processes, threads=threads, repetitions=repetitions)
parsed_results_mpicuda_t = parse_results_threads(results)
save_csv_results(parsed_results_mpicuda_t, "CSV/parsed_results_mpicuda_t.csv")
save_csv_results(results, "CSV/results_mpi+cuda_threads.csv")

In [None]:
results_mpicuda_t = read_csv_results("CSV/results_mpi+cuda_threads.csv")
parsed_results_mpicuda_t = read_csv_results("CSV/parsed_results_mpicuda_t.csv")

In [None]:
plot_results(results_mpicuda_t.threads,
    results_mpicuda_t.duration,
    "Threads",
    "Duration",
    max_thread_power = 11)

In [None]:
plot_results_thread(parsed_results_mpicuda_t.threads,
    parsed_results_mpicuda_t.mean_duration,
    "Threads",
    "Mean Duration + CI",
    yerror = parsed_results_mpicuda_t.ci_duration,
    max_thread_power = 11)

## Discussão

### Comportamento conforme as variações

**O tamanho da entrada**  
O tempo de execução do programa cresce de forma quadrática quando se aumenta o tamanho da entrada, pois é necessario rodar o método NxN vezes, sendo N o tamanho da entrada. E isso acontece nos 3 casos (sequencial, Pthreads, OpenMP) já que isso depende do método de Mandelbrot.

**O número de threads**  
Tanto utilizando Pthreads quanto OpenMP, é possível observar que com o aumento do número de threads existe uma diminuição do tempo de execução do programa. No entanto, a partir de certo ponto o overhead operacional supera o ganho da paralelização.

### Operações I/O e alocação de memória
O impacto das operações de I/O e alocação de memória no tempo de execução deveria se mostrar mais significativo no OpenMP porque elas são feitas de forma implícita, de modo que a medição de tempo não pode desconsiderá-las. Surpreendentemente, o tempo do programa se mantém mais ou menos parecido (utilizando o mesmo número de threads) no Pthreads e no OpenMP. Supomos que isso ocorre porque o OpenMP deve conter uma série de otimizações que permite compensar o overhead.

### Perguntas interessantes
**Por que o tempo continua diminuindo mesmo após superar o número de núcleos do computador (com Pthreads)?**  
Isso se dá devido ao modo como o sistema operacional gerencia o tempo de execução de cada processo. O paralelismo simulado com concorrência, permite que, mesmo com o número de threads superando o número de núcleos, a eficiência do programa continue aumentando.