# Parallel and Distributed Systems

Federico Matteoni

A.A. 2021/22

# Index

|   | 0.1  | Introduction                            |  |  |  |  |  |  |  |  |
|---|------|-----------------------------------------|--|--|--|--|--|--|--|--|
| 1 | Gen  | neral Paradigms of Parallel Programming |  |  |  |  |  |  |  |  |
|   | 1.1  | Measures                                |  |  |  |  |  |  |  |  |
|   |      | 1.1.1 Base Measurements                 |  |  |  |  |  |  |  |  |
|   |      | 1.1.2 Derived Measurements              |  |  |  |  |  |  |  |  |
| ] | 1.2  | Patterns                                |  |  |  |  |  |  |  |  |
|   |      | 1.2.1 Data Parallel Patterns            |  |  |  |  |  |  |  |  |
|   |      | 1.2.2 Stream Parallel Patterns          |  |  |  |  |  |  |  |  |
|   |      | 1.2.3 Composing                         |  |  |  |  |  |  |  |  |
|   | 1.3  | Technicalities                          |  |  |  |  |  |  |  |  |
|   |      | 1.3.1 Threads                           |  |  |  |  |  |  |  |  |
|   |      | 1.3.2 Synchronization                   |  |  |  |  |  |  |  |  |
|   |      | 1.3.3 Async                             |  |  |  |  |  |  |  |  |
|   |      | 1.3.4 Packaged Tasks                    |  |  |  |  |  |  |  |  |
|   |      | 1.3.5 Promises                          |  |  |  |  |  |  |  |  |
|   |      | 1.3.6 Overhead                          |  |  |  |  |  |  |  |  |
|   | 1.4  | Load Balancing                          |  |  |  |  |  |  |  |  |
|   |      | 1.4.1 Static Techniques                 |  |  |  |  |  |  |  |  |
|   |      | 1.4.2 Dynamic Techniques                |  |  |  |  |  |  |  |  |
|   | 1.5  | Vectorization                           |  |  |  |  |  |  |  |  |
|   | 1.6  | OpenMP                                  |  |  |  |  |  |  |  |  |
|   | 1.7  | FastFlow                                |  |  |  |  |  |  |  |  |
|   | 1.8  | Parsec                                  |  |  |  |  |  |  |  |  |
|   | 1.9  | Stateful Computations                   |  |  |  |  |  |  |  |  |
|   | 1.10 | MPI                                     |  |  |  |  |  |  |  |  |
|   | 1.11 | Accelerators                            |  |  |  |  |  |  |  |  |
|   |      | 1.11.1 GPUs                             |  |  |  |  |  |  |  |  |
|   |      | 1.11.2 FPGAs                            |  |  |  |  |  |  |  |  |

#### 0.1 Introduction

Prof.: Marco Danelutto

**Program** Techniques for both parallel (single system, many core) and distributed (clusters of systems) systems. Principles of parallel programming, structured parallel programming, parallel programming lab with standard and advanced (general purpose) **parallel programming frameworks**.

**Technical Introduction** Each machine has more cores, perhaps multithreaded cores, but also GPUs (maybe with AVX support, which support operations floating point operations, **flops**, in a single instruction).

Between 1950 and 2000 the VLSI technology arised, integrated circuits which nowadays are in the order of 7nm (moving towards 2nm): printed circuits!

In origin, everything happened in a single clock cycle: fetch, decode, execute, write results in registers, with perhaps some memory accesses. The we had more complex control where in a single clock cycle we do just one of the phases (fetch or decode or...), like a **pipeline**. More components are used the higher the frequency but the more power we need to dissipate, and we're coming to a point were the power we need to dissipate is too much and risks to melt the circuit, so we're reaching a **physical limit** in chip miniaturization. But temperature and computing power do not go in tandem: computing power is proportional to the chip dimensions, while temperature is proportional to the area. So it's better to put more processors (**cores**) and let them work together rather than make a bigger single processor. An approach is to have few powerful cores and more less powerful cores (for example, in the Xeon Phi processors). Now, the processors follow this architecture, with the performance of a single core decreasing a bit with every generation but it's leveled by adding more cores.

Up to the 2000, during the single core era, code written years before will run faster on newer machines. Now, code could run slower due to not exploiting more cores and the decreasing in performance of the single core. With accelerators the situation is even more different: for example GPUs, accelerator for graphics libraries, with their own memory and specialized in certain kinds of operations. This can require the transfer of data between the accelerator's memory and the main memory, so the architecture of the accelerator is impactful on the overall performance.

Telegram: Ofexed 2 Github: fexed

## Capitolo 1

# General Paradigms of Parallel Programming

Parallelism Execution of different parts of a program on different computing devices at the same time. We can imagine different flows of control (sequences of instruction) that all together are a program and are executed on different computing devices. Note that more flows on a single computing device is **concurrency**, not parallelism.

Concurrency It's a similar concept: things that *may* happen in parallel with respect to the ordering between elements. So given more flows of control, any schedule of these flows can be executed with interleaving.

Computing Devices Generic term that defines anything capable of computation:

Threads, implying shared memory

Processes, implying separated memory

GPU Cores

Hardware Layouts on a FPGA (Field Programmable Gate Array)

. . .

#### **Basic Concepts**

Sequential Task A "program" with its own input data that can be executed by a single computing entity.

Overhead Actions required to organize the computation but that are not included in the program. For example: time spent in organizing the result or dispatching the work. Basically, time spent orchestrating the parallel computation and not present in the sequential computation.

Speedup This is the fundamental result we're looking for. It's the ratio between the sequential time and the parallel time. Assuming the best sequential time, it is the following

$$SpeedUp = \frac{Sequential\ time}{Parallel\ time}$$

Scalability Slightly different measure that tells how suitable a particular program is to be used with more devices.

$$\label{eq:Scalability} Scalability = \frac{Parallel~time~with~1~computing~device}{Parallel~time}$$

Remember the fundamental difference between scalability and speedup, although those terms can be used interchangeabily in literature. Speedup is measured with the best theoretical sequential time, while scalability is measured with the time achieved by the particular program at hand.

**Stream of tasks** We may not want to consider just one computation but it may be useful considering a number of computations and wanting to optimize a set of tasks.

**Example: Book Translation** With m = 600 pages, for example. Let's assume I can translate a page in  $t_p = 0.5$ h. The sequential task is: take the book and spend time until I can deliver the translated book. The time is circa  $m \cdot t_p = 300$ h.

In parallel, ideally every page can be translated independently so I can split the book in two pieces of  $\frac{m}{2}$  pages each (introducing some overhead), giving each half to a person. Both can translate at the same time, so ideally the time required is  $\frac{m}{2} \cdot t_p$  for each, producing the translated halves. At this point I get the halves and produce the translated version (introducing some other overhead). Ideally the time required is more or less  $\frac{m}{2} \cdot t_p$ , with "more or less" given by the time spent in splitting the book and reuniting the two halves. So the exact time is  $T = T_{split} + \frac{m}{2} \cdot t_p + T_{merge}$ . What if the two person have different  $t_p$ s? For example  $t_1 > t_2$ . When a translator finishes, it spends some time synchronizing its work with me. With nw "workers" (translators, in this instance)  $T = nw \cdot T_{split} + nw \cdot T_{merge} + \frac{m}{nw} T_{work}$  with  $nw \cdot T_{split}$  time spent delivering work to each worker and  $nw \cdot T_{merge}$  time in merging each result.



Init is the time where every worker has work to do, and finish is the time where the last worker finished working. So the exact formula is

$$T = nw \cdot T_{split} + \frac{m}{nw} \cdot T_{work} + T_{merge}$$

So  $\frac{m}{nw}T_{work}$  is the time that needs to happen, found in the sequential computation too, whereas the other two factors are **overhead**.

$$SpeedUp = \frac{Best \ sequential \ time}{Parallel \ time}$$

but the parallel time depends on the nw so

$$\mathrm{SpeedUp}(nw) = \frac{\mathrm{Best \ sequential \ time}}{\mathrm{Parallel \ time}(nw)} \simeq \frac{\cancel{m} \cdot \cancel{t_p}}{\frac{\cancel{t_h}}{nw} \cdot \cancel{t_p}} = nw$$

This not taking into account the overhead. It's a realistic assumption because usually the time splitting the work is very small. But we have to take into account that, in case it's not negligible.

$$SpeedUp(nw) = \frac{m \cdot t_p}{\frac{m}{nw} \cdot t_p + nw \cdot T_{split} + T_{merge}}$$

**Example: Conference Bag**  $T_{bag} = t_{bag} + t_{pen} + t_{paper} + t_{proc}$  and with m bags we have  $T = m \cdot T_{bag}$  We could build a pipeline, a building chain, with 4 people and each person does one task:

One takes the bag and gives to the next

One puts the pen into the bag and passes it

One puts the paper into the bag and passes it

One puts the proceedings into the bag

So  $w_b, w_{pen}, w_{paper}, w_{proc}$  workers. When the first worker has passed the bag, it could begin taking the next bag. Same for the others.



So in sequential we have  $m \cdot (t_{bag} + t_{pen} + t_{paper} + t_{proc})$ , and in parallel per 1 bag we have  $t_{bag} + t_{comm} + t_{pen} + t_{comm} + t_{pen} + t_{comm} + t_{pen} + t_{comm} + t_{pen} + t_{comm}$  with  $t_{comm}$  spent passing the bag from one to the other, so total of  $m \cdot T_{seq} + m \cdot t_{comm}$ . But that's not correct, because we work in parallel: ideally we have a parallelogram of  $m \cdot (t_{proc} + t_{comm})$  base, and we require  $t_{bag} + t_{pen} + t_{paper} + 3 \cdot t_{comm}$  time to get up to speed and "fill the pipeline". But this required time is negligible, and in the end the overall time is given by the base of the parallelogram.

**Pipeline** With m tasks and nw stages, with the completion of the stage i required in stage i + 1. So the output is  $f_{nw}(f_{nw-1}(\dots f_1(x_i)\dots))$ . With t time required for each stage.



We spend (nw-1)t to get the last stage working and  $m \cdot t$  time spent by the last stage to complete all the tasks.

$$T_{par}(nw) = (nw - 1) \cdot t + m \cdot t$$

$$\mathrm{SpeedUp}(nw) = \frac{(nw \cdot t) \cdot m}{(nw - 1) \cdot t + m \cdot t}$$

So the higher the m is, the lower is the impact of the time required to get up to speed. So  $m >> nw \Rightarrow T_{par}(nw) \simeq m \cdot t$ 

Throughput Number of tasks completed per unit of time.

#### 1.1 Measures

On one side we can have **more speed with more resources** (computing devices).

On the other side we can employ **more complex applications with more resources**. For example more precise computations, so extra resources not for improving the time but to improve the quality of the computations. A recent perspective is to **aim at computing results with less energy** thanks to parallelism.

We've seen the SpeedUp(n) =  $\frac{T_{seq}}{T_{par}(n)}$ , where the plot has to lie below the bisection of the cartesian graph.

#### 1.1.1 Base Measurements

Latency L Measure of the wall-clock time between the start and end of the single task.

Service Time  $T_s$  It's related to the possibility of executing more tasks. It's the measure of the time between the delivery of two consecutive results, for example between  $f(x_i)$  and  $f(x_{i+1})$ 

Even if  $x_i$  and  $x_{i+1}$  arrive at the same time, f would still be computing  $f(x_i)$  so it'll start computing  $f(x_{i+1})$  when it has finished.

In a sequential system,  $T_s = L$ .

Completion Time  $T_c$  The latency related to a number of tasks.

With a sequential system, we have  $T_c = L \cdot m$  for  $x_m, \ldots, x_1$  inputs.

With a parallel system, we have  $T_c \simeq m \cdot T_s$  because we reason at regime.

**Example** A 3 stage pipeline, with each node being sequential and with latency  $L_i$  for node i.

At  $t_0$  the first stage  $N_1$  gets the first tasks and computes it in  $L_1$ , then  $N_2$  computes in  $L_2$  and  $N_3$  computes in  $L_3$  so a total of  $t_0 + L_1 + L_2 + L_3$ .

When the pipeline is filled,  $T_s$  is dominated by the longest  $L_i$ , so  $T_s = \max\{L_1, L_2, L_3\}$  and  $T_c = \sum L_i + (m-1)T_s$ If m is large with respect to n = number of stages, the "base of the parallelogram" would be very long, so  $m >> n \Leftrightarrow T_c = m \cdot T_s$ 

#### 1.1.2 Derived Measurements

#### SpeedUp

SpeedUp(n) = 
$$\frac{T_{seq}}{T_{par}(n)}$$

Could be latencies, service times...depending on what we want to measure the speedup of.

#### **Scalability**

Scalability(n) = 
$$\frac{T_{par}(1)}{T_{par}(n)}$$

#### Efficiency

$$\text{Efficiency}(n) = \frac{\text{Ideal parallel time}(n)}{T_{par}(n)} = \frac{\frac{T_{seq}}{n}}{T_{par}(n)} = \frac{T_{seq}}{n \cdot T_{par}(n)} = \frac{\text{SpeedUp}(n)}{n}$$

Measures the tradeoff between what you gain with the speedup and the cost of the speedup.

#### Throughput

Throughput = 
$$\frac{1}{T_s}$$

Amdahl Law Taken the total time of a computation,  $T_{seq}$ , it can be divided into something that can and something that cannot be computed in parallel (for example, dividing the book is a sequential activity). So we can say that  $T_{seq} = \text{serial fraction} + \text{parallel fraction}$  and the **serial fraction cannot be parallelized**.  $f \in [0,1] \mid f \cdot T_{seq}$  is the serial fraction.

$$T_{seq} = f \cdot T_{seq} + (1 - f) \cdot T_{seq}$$

The parallel fraction can be splitted between the workers, but we would have to compute the serial fraction too. By splitting more and more and more, we have that

$$\lim_{n \to \infty} T_{par}(n) = f \cdot T_{seq}$$

$$\operatorname{SpeedUp}(n) = \frac{T_{seq}}{f \cdot T_{seq}} = \frac{1}{f}$$

So we have a very low upper bound on the achievable speedup. This is referred to as **strong scaling**: strong meaning using more resources to get the computation faster.

#### Gustaffson Law

$$SpeedUp(n) = n - S \cdot (n-1)$$

With S being the serial fraction. This comes from the fact that we're considering a different perspective: Gustaffson assumes that the computation increases with the parallelism, something that's called **weak scaling**: keeping the size of the computation the same for all the concurrent agents that we use. Given nw workers I want  $nw \cdot N$  results.

**Cores** In modern computers, we have a main memory (slow), a disk (even slower) and the memory is connected to at least 3 levels of cache. At the bottom we have some cores (4, 8...), each one has its own level 1 cache (usually split in data and instruction cache).

With an activity with a working set that fills the cache, in case of strong scaling splitting the computation across cores we process less data per core because the size of the problem is the same.

With weak scaling, we assume that the data increases so by using more cores we process the same data on all cores but the data grows so we could have extra overhead because of the working set size.

We will have patterns of parallel computation that differentiate in how we process the data.

**Application as Graphs** The applications can be seen as graphs of sequential nodes with dependencies. The **work span** model defines two components:

Work: the amount of time required on a single computing device, of course respecting the dependencies

Span: the length of the longest route from source to end, in terms of computation time

The maximum speedup is  $\frac{\text{work}}{\text{span}}$ , because in every case I need to go from the first to the goal node. I take the longest one because at least the longest path must be computed, and all the rest can be done in parallel and I assume to have enough resources to compute the rest in the time of the span.

We cannot use this model for reasoning about multiple tasks, because it does not take into account the parallelism achieved in exploiting different tasks in the same program but only takes into account what happens for a single task. But it can be used, e.g., to reason about a single node of a pipeline.

#### 1.2 Patterns

Computations with common and particular shapes and semantics that can be understood and implemented depending on the situations, not linked to languages and technicalities. Patterns are a useful concepts, allow programmers to reuse experience of other programmers an not reinventing the wheel.

Parallel patterns can be of two kinds: data parallel, stream parallel. For each pattern there are more than one name used in literature.

#### 1.2.1 Data Parallel Patterns

Parallelism comes from data: we split the data in pieces, compute a set of results that can be combined into a single final result. The book translation example is a situation where we can apply data parallel patterns. What matters is the latency L, we want to be able to provide the result faster once I have the full collection available. The general pattern is:

Decomposition

Partial results

Recomposition

Different choices in each phase yield different data patterns.

#### Map Pattern Also called applytoall:

Decomposition:  $\forall$  item of the collection, which is of course the minimum decomposition of a collection

Partial results: f(item), the result of a function f applied to each item

Recomposition:  $\forall f(\text{item})$ , yielding an isomorphic collection to the original one.

#### Reduce Pattern Also called fold:

Decompose again  $\forall$  item of the collection

Partial results: apply a binary function  $\oplus(x,y)$  (e.g. the +)

Recombination:  $\oplus(\oplus(a,b),\oplus(c,d))$ 

#### Stencil Pattern

Decompose into partially overlapping partitions of the data, e.g. of a matrix or an image

Partial results: the application of a function f(item)

 $\forall$  f(item), again an isomorphic collection

Different kind of problems: overlapping positions will yield the new value, so we have to account for that.

#### "Google" mapreduce

Again decompose ∀ items

Compute  $f(\text{item}) + \oplus(\text{item}_i, \text{item}_i)$ 

With  $f \mapsto \langle K, V \rangle$  key-value pair and the  $\oplus$  function that sums the values of the corresponding keys.

A single item

What I apply to each item is an f that maps to  $\langle \text{key, value} \rangle$  and  $\oplus$  applies the sum to each value.

For example in a document,  $f(\text{word}) = \langle w, 1 \rangle$  and  $\oplus (\langle w_k, v_1 \rangle, \langle w_k, v_2 \rangle) = \langle w_k, \oplus (v_1, v_2) \rangle$ 

"The lesson given by the professor", f will output  $\langle \text{the}, 1 \rangle$ ,  $\langle \text{lesson}, 1 \rangle \dots$  and  $\oplus$  will for example output  $\langle \text{the}, 2 \rangle$ .

We can apply the map function over all the data distributed in various databases, for example.

This is map(f) and  $reduce(\oplus)$ , but we want something like  $map(reduce(\oplus))$ . Combining elementary patterns to achieve more complex results. Like two nested fors.

So I want building blocks, something that guarantees correctness of implementation that can be used to build upon. For example map(function<A(B)>, collection<B>)

So build a bag of tools that we can combine to undertake common situations with good efficiency, speedup, scalability...

#### 1.2.2 Stream Parallel Patterns

Stream of data, flowing in time. In data parallel we process a data collection, while in stream parallel we don't have data appearing all at the same time. So stream as a collection with items appearing at different times. We want to take the single items and try to process in parallel, parallel execution of f over different items of the stream.

What matters is the service time  $T_S$ , to be able to sustain the flow of data and process the next available item as soon as it arrives.

#### **Pipeline**



Inside the pipeline we have  $f_1 \to f_2 \to \ldots \to f_k$  with each  $f_i$  corresponding to a phase, with  $f_i$  taking input from  $f_{i-1}$ .

So  $x_i \mapsto f_1(x_i) \mapsto f_2(f_1(x_i)) \mapsto \dots$ , and the parallelism is in the computation of different phases of different items (much like what we've seen with the CPU fetch-decode-execute pipeline).



**Farm** We have a number nw of instances of the same function f each processing one single item.



We have no interference between computations of  $x_i, x_j$  with  $i \neq j$ , no need for synchronization.

The completion time  $T_C$  is  $\simeq m \cdot T_S$  (number of tasks times service time). In the sequential stages,  $T_S = L$ . This means that given a stream of item to be computed in a parallel fashion, and I want to increase  $T_C$  or  $T_S$ , the I would

need to decrease the latency of the stages in my pipeline or the workers in my farm.

If we have pipe(seq(f), seq(g)) then by decreasing the latency of f we will already improve the performances of the overall computation. Keep in mind that **not every kind of computation can be parallelized**.

Two Tier Model Let's assume a grammar of patterns.

 $Pat = Seq(f) \mid DPP \mid SPP$ 

 $DPP = Map(Pat) \mid Reduce(Pat)$ 

 $SPP = Farm(Pat) \mid Pipe(Pat, Pat)$ 

This defines parallel computations and we aim at assuring that this can be done, a way of implementing this. So

can be a data parallel computation where on the single item we compute a 2-stages pipeline of f and g. But each element of the map is given to a single pipeline, or each pipeline receives a single item, so the stream parallelism is useless.

Farm(Map(Seq(f)))

Here we have a stream of items that will be processed by a Farm, each item splitted by Map and processed. This can deliver the result faster and access the next item.

So Data parallel with Stream parallel is not very good, Stream parallel with Data parallel is better: **two tier model**. So we have an initial part of the pattern which is Stream Parallel, the second part is Data Parallel and eventually the last stages (the leafs of the tree) which are sequential.

**Parallel Design Patterns** Also called **algorithm skeletons**: programming abstraction that model some pattern. The programmer has a framework, libraries, languages and that includes algorithm abstractions.

#### 1.2.3 Composing

These are building blocks, so we can **compose** them. Let's see how that works and what are the expected performances.

#### Pipeline

We have a number k of stages for m tasks: Pipeline $(s_1, \ldots, s_k)$  meaning that this is a composition yielding  $s_k(\ldots s_1()\ldots)$ 

Input stream 
$$\longrightarrow s_1 \to \ldots \to s_k \longrightarrow \text{Output stream}$$

With each  $\rightarrow$  being a stream  $s_i \rightarrow s_{i+1}$  and each  $s_i$  taking input from  $s_{i-1}$ .

The latency of the pipeline is the sum of the latencies of the stages

$$L(\text{Pipeline}(s_1,\ldots,s_k)) = \sum_{i=1}^k L(s_i)$$

We do not consider the time required to pass input to the next stage,  $t_{comm}$ , which would be based on size, nature of the computation...

The steady state is when all the stage are "filled": the longest of the stages will dominate the service time  $T_S$ 

$$T_S(\text{Pipeline}(s_1, \dots, s_k)) = \max_{i=1}^k \{T_S(s_i)\} = \max_{i=1}^k \{L(s_i)\}$$

The completion time is  $T_C$ 

$$T_C(\text{Pipeline}(s_1, \dots, s_k)) = \left(\sum_{i=1}^k L(s_i)\right) + (m-1) \max_{i=1}^k \{L_i\}$$

and when m >> k we can approximate it with

$$T_C = mT_S$$

because the number of tasks required, the "base of the parallelogram", will dominate the number of workers, the "height of the parallelogram".

What if I want to achieve a given performance? For example, given a Pipeline  $(S_1, S_2, S_3)$  where  $S_1$  and  $S_3$  both take 1s and  $S_2$  takes 2s, and we want a  $T_A = T_S = 1$ s. We can't do much, by necessity  $T_S = \max\{1s, 5s, 2s\} = 5s$ , and in general I don't know what's inside each stage of the pipeline so I can't perform optimization inside those. In this case, we can take the **bottleneck**, the slowest stage  $(S_2)$ , and shorten the  $T_S$  by adding a farm in its place: Pipeline  $(S_1, Farm(S_2, S_1), S_3)$ . This computes the very same result:

With the first form, Pipeline  $(S_1, S_2, S_3)$ , we compute  $S_3(S_2(S_1(x_i)))$ 

With the second form, Pipeline  $(S_1, \operatorname{Farm}(S_2), S_3)$ , we compute  $S_1(x_i)$  which delivers the result to the scheduler of  $\operatorname{Farm}(S_2, )$ , where each worker computes  $S_2$  and the scheduler selects which worker will compute it yielding  $S_2(S_1(x_i))$ . The farm then delivers the result to  $S_3$ , which will output the final result  $S_3(S_2(S_1(x_i)))$ 

This works well if every timing remains the same. But if a worker in the  $S_2$ 's farm takes longer we may encounter a situation where  $S_3$  receives a  $x_{i+1}$  before receiving  $x_i$ . This may be irrelevant or be unacceptable depending on the kind of application. With 5 workers in the farm, we end up with  $T_S = 1$ s.

Another approach would be to consider the original pipeline as a single task computing  $P(x) = S_3(S_2(S_1(x)))$  and building a farm with workers that each compute P. P(x) has a latency L = 1 + 5 + 1 = 7s, so with a farm composed by P we have a P we have a P we have a P we have a P where P is P we have P where P is P we have P is P where P is P is P in P in P in P is P in P

Which is better depends:

Pipeline $(S_1, S_2, S_3)$  has  $T_S = 5$ s and nw = 3

Pipeline( $S_1$ , Farm( $S_2$ , 5),  $S_3$ ) has  $T_S = 1$ s and  $nw = 7 \ (+2 \text{ for } E \text{ and } C)$ 

Farm(P,7) has  $T_S = 1s$  and nw = 7 (+2 for E and C)

Noting that each farm adds 2 workers. So by refactoring with the farm we need more workers, which can be a decision point in implementing this kind of refactoring.

**Boundary Conditions** We have to take into account the **interarrival time**  $T_A$ , time spent to get another item from the input stream, and the **interdeparture time**  $T_D$ , the time spent to get another item into the output stream. Let's suppose that  $L(s_i) = i$  seconds, so ideally  $T_S = k$  seconds: we process 1 item each k seconds. If  $T_A > L(s_i)$  we have to wait the second item when I finish processing the first, same thing for the next after the second: the interrarrival time looks like an interstage between  $s_{i-1}$  and  $s_i$ .

 $T_D$  behaves at the same way: we have to wait that  $T_D$  finishes before giving it out output, behaving like an interstage. So the previous behavior, analyzed before, happens  $\Leftrightarrow T_A < T_S$  and  $T_D < T_S$  something that we have always to take into account.

#### Farm

Sometimes we denote as Farm(s, nw), otherwise we omit the number of workers and simply write Farm(s).

We assume to know  $L_w$  and  $T_w$  of the workers. We have some scheduler (**emitter** E) that distributes the items from the input stream to the workers, and a gatherer (**collector** C) that gets the results from the workers and delivers them to the output stream. Those can simply be data structures: queues, for example.

$$L(\operatorname{Farm}(s, nw)) = t_E + L_w + t_C$$

This can appear as a pipeline of three stages, where the Emitter produces to the second stage (the workers) which produce to the third stage (the Collector)

$$T_S(\operatorname{Farm}(s, nw)) = \max\left\{t_E, \frac{T_w}{nw}, t_C\right\}$$

We assume to have m tasks

$$T_C(\operatorname{Farm}(s, nw)) = m \cdot T_S(\operatorname{Farm}(s, nw))$$

If we take into account the boundary conditions

$$T_S = \max\{T_s(Farm()), T_A, T_D\}$$

What if we want to achieve a given performance? Compute a nw suitable to achieve a wanted performance by inverting the very same formulas.

With a target  $T_S = T_A = 1$ s

$$T_S = 1s = \max\left\{t_E, t_C, \frac{10s}{nw}\right\}$$

But  $t_E, t_C$  are negligible so

$$\frac{10s}{nw} = 1s \Rightarrow nw = 10$$

#### Map

The map data parallel pattern is composed by three main phases:

Split: divide the collection into sub collections, a set of subcollections

Map: compute the set of subresults

Merge: produce the final collection results (usually in the same shape as the input collection)

With m dimension of the collection and  $t_f$  to compute the function of the map

$$L(\text{Map}) = \frac{m \cdot t_f}{nw} + t_{split} + t_{merge}$$

 $t_{split}$  and  $t_{merge}$  are non-negligible in distributed architectures, but are negligible in shared-memory architecture.

$$T_S(Map) = L(Map)$$

No concept of  $T_C$  because the map is applied to a single collection.

If I have multiple collections we can consider a 3-stage pipeline composed by a splitter node as  $s_1$ , the map as  $s_2$  and merger node as  $s_3$ , giving

$$T_S = \max\{t_{split}, t_{map}, t_{merge}\}$$
$$t_{map} = \frac{m}{nv}t_f$$

The structure of a map resembles that of a farm, thus to increment the performances we can increment the number of the workers that implement split/merge operations. This requires synchronization between the workers of the split/merge operations in order to avoid the repetition of elements.

#### Reduce

From a vector we want to output the sum: a scalar from a collection. We split in  $t_{split}$ , then each of the nw workers applies the function  $\oplus$  to its subcollection in  $\frac{m}{nw}t_{\oplus}$  and finally we merge in  $nw \cdot t_{\oplus}$  because we have to compute  $\oplus$  over all the nw subresults.

$$L = t_{split} + \left(\frac{m}{nw} - 1\right)t_{\oplus} + (nw - 1)t_{\oplus}$$
$$T_S = L$$

With multiple collections, same argument as before

$$T_S = \max\{\ldots\}$$

But the computation can be organized in a logarithmic tree, too. We would have  $log_2(m)$  phases each with half the activities of the previous phase.



But we have efficiency 1 only in the first phase, then half, then one fourth...

$$L = t_{split} + \lceil \log_2 m \rceil t_{\oplus}$$

provided that  $nw \ge \frac{m}{2}$ . If we use threads, the split phase is just telling the threads what they have to do, then we merge with a simple loop.

#### Stencil

For example computing the average of three neighbor items in a vector, with **necessary boundary conditions**. Split phase that produces for example two halves of the vector: when computing the right extreme of the first half, I need to add the first item of the second vector, same with the left extreme of the second half: we have some shared positions. No problem when reading, the **problem arises when we write the shared position**.

A first solution would be to **use 2 buffers**: we read from a buffer and write to another buffer, and we swap the buffer before restarting such that we use the new values to read and the old values to write.

Another solution is to use a small buffer to host modified values in the neighborhood. The result is kept in a buffer before being written to the array. The buffers are written when we move to such a position that the edited previous position are no longer read: the buffer would need to contain at most half the radius of the stencil operation.

$$L(Stencil) = t_{split} + \frac{m}{nw}t_{stencil} + t_{merge}$$

Where  $t_{stencil}$  includes buffer management with the second solution and  $t_{merge}$  includes swapping the buffers in the first solution.

**Composition** Given a Pipeline  $(s_1, s_2, s_3)$  with  $s_i$  sequential,  $s_2$  may be data parallel (map), m stream items and each being a vector of k items.

$$L = L_1 + L_2 + L_3$$
$$T_S = \max\{L_1, L_2, L_3\}$$
$$T_C \simeq m \cdot T_S$$

 $s_2$  is sequential but can be turned into a map, takes  $L_2$  so I can imagine  $t_f \simeq \frac{L_2}{k}$ 

$$T_S(\text{Pipeline}) = \max \left\{ L_1, L_3, \frac{L_2}{nw_{\text{map}}} \right\}$$

Note how the latency decreases. This of course if  $s_2$  can be turned into a map: I need to be able to check the code.

Another approach, as we saw before, is this: given that  $s_2$  is the slowest stage, we can use a Farm $(s_2, nw_{\text{farm}})$ . However in this case latency stays the same.

Single Owner Computes rule Is a more general pattern: each part of the data structure has a single owner who is in charge of performing the computing of the new values in that particular part.

#### 1.3 Technicalities

#### 1.3.1 Threads

To create a thread using the std::thread library we use auto tid = new std::thread(function, ...); which starts a new flow control that runs the passed function. We are used to writing instructions sequentially, now we fork another flow of computation: we get two flows that are executed together in the same address space, so the new thread inherits all the memory of the original thread.

Correctly handling the threads is fundamental, e.g. by joining them at the end of a computation or detaching them if needed: detach separates the thread from the process allowing the execution to continue independently (detached threads cannot be joined).

Another useful method is yield(), which signals to the scheduler that this thread can release the processor and be rescheduled later. This is useful in non-timesharing contexts or when a thread describes a very long computation.

#### 1.3.2 Synchronization

Two fundamental approached: controlling data sharing (mutexes) and waiting on events (condition variables).

Mutexes std::mutex objects are used to implement mutual exclusion with lock before and unlock after a certain block of code that needs to be executed by just one thread at a time. Thus mutex objects need to be shared.

Often the critical section that needs to be protected is very small and lasts for a small block of data. std::lock\_guard mechanism can be declared having a std::mutex as parameter. It tries to acquire the mutex's lock when declared and will automatically release it when its scope ends (by its object destroyer).

Last mechanism is the std::try\_lock function: it returns immediately with true if the lock has been acquired, false otherwise.

Condition variables std::condition\_variable objects provide the method wait that takes a mutex and optionally a predicate as parameters, and the methods notify\_one() and notify\_all().

We call the wait on some shared data, waiting for some kind of event. Once we call the wait, the thread is asleep and releases the lock on the mutex until some other thread calls one of the notify methods (and the relative unlock!). With notify\_one only one of the waiting threads is put in the ready list, whereas with notify\_all every waiting thread will be put in the ready list. When a thread is awaken by a notify method, it will try to reacquire the lock. Once a thread is awake and has reacquired the lock, it needs to re-check the condition it was waiting because there's no guarantee that, due to scheduling, the condition has not been invalidate between the notify event and the thread actually being put into execution. So usually the wait are put inside a while(condition) loop.

#### 1.3.3 Async

std::async are a bit different. When instantiating an std::thread it starts and forks the execution of the program. For std::async we simply tell the system that it *can* execute that portion concurrently, and it will be executed in a thread if and only if a thread is available. An async returns a future that can be used to know the status and return value of the computation.

#### 1.3.4 Packaged Tasks

std::packaged\_task decouples the moments of having a function to call from the moment of calling that function. With get\_future we can get the future relative to the packaged task and use it. To compute the packaged task we call it passing the arguments, e.g. the packaged task of the function t1 func(t2); is auto pt = new
packaged\_task(func); has future auto future<t1> ft = pt.get\_future(); and can be called with pt(arg); with
arg of type t2. The result can be obtained with t1 res = ft.get();.

#### 1.3.5 Promises

std::promise can be used in two ways: using its future like before or call set\_value on it. They can be viewed as placeholders.

#### 1.3.6 Overhead

Standard computer of course have a processor and a memory, connected by some kind of bus. This bus represents a bottleneck (the **Von Neumann Bottleneck**). We can improve by having two memories: an instruction memory and a data memory.

By exploding the Von Neumann model a bit, we have a **main memory**, a number of **cache** levels faster and smaller going from the main memory to the processor, and the **processor**. Usually the last level of cache, closest to the processor, takes 1 or 2  $\tau$  (clock cycles) to perform the operations, while the main memory takes around 80 to 100  $\tau$ . This follows the **locality principle** which is divided into **spatial locality** (if a given address i is accessed, then it's likely that addresses i+1 and so on will be accessed soon) and **temporal locality** (an accessed address is likely to be accessed again soon), enabling the concept of **working set** (the set of addresses that are likely of being accessed in a small amount of time).

In a multi-processor system, e.g. an 8-core processor, we have some levels of cache that are shared between cores (ideally the higher levels, closer to the main memory) and some that are independent per-core. In this case, a shared variable should keep the same value even when we have independent caches. This is achieved by an hardware component that implements the cache coherence protocol. This requires some time, of course. This is

Telegram: Offexed 13 Github: fexed

achieved by a bus (**snoopy bus**) that is placed between the last level of shared cache and the first level of independent caches. This bus detects operations on independent caches and propagates accesses to shared variables. Other systems implement a **directory of shared data** and polls it whenever shared data is accessed. The key takeaway here is that there is something that we need to do to keep the copies consistent.

This **cache coherency protocol** is applied to cache lines, which are the minimal units of operations on caches. So whenever the line containing the shared variable is changed, all the line is moved in all the other places. So it at least has to access the first shared level of cache, if not going up until the main memory. This has to be made for the whole working set of the thread.

**Example** If we use the stencil pattern to implement matrix multiplication, we may have a **cache line that is** "split" between two portions of the data, thus **two threads**. Each time a value is written inside that line, the cache coherence protocol runs. In a stencil computation this makes sense.

In, e.g., a map operation we may have a vector where  $x_i^t = f(x_i^{t-1})$  performed for several ts. If this cache line is shared with another core, this may trigger the cache coherence protocol for all the other items that may be never used, resulting in the **false sharing problem**. We must try to avoid as much as possible this situation, because **this adds significant overhead** to the computation.

We use **padding techniques** that adds pad values to the original vector such that the culprit cache line is filled with dummy values to avoid sharing it with other cores.

Another overhead problem occur when a thread is moved from a processor to another. The receiving processor in general does not possess any element of the incoming thread working set in its cache levels.

This switch is decided and performed by the operating system, but this decision may not be optimal for some programs, especially if we want to harness the full potential of all the cores on a machine. We can **force it to not perform this operation by pinning the threads** to some particular core, by specifying its identifying number.

Another source of overhead is **communication**, e.g. between stages in a pipeline: each stage spends some time to receive and some time to send, other than the time spent computing. With threads this may be ignored, because those read and write in shared memory, but this may not be negligible in distributed systems across a network. Even on fast networks with  $1\mu s$  latency, this would still be 3 orders of magnitude greater than the clock cycle time. This could result in receive and send times greater than the computation time. Sending data in the end is simply tell the network adapter the address of the data and the order. At this point, instead of waiting, the node could simply begin performing the next computation and be interrupted when the data transfer is completed (and handle errors if there are). Another solution is to have three threads and implement a kind of pipeline where the first thread receives into a buffer, the second thread computes in another buffer and the third thread sends from another other buffer. At the next stage, the first thread computes from its buffer, the second sends its result and the third receives inside its buffer. At regime, we spend  $\max\{t_{rec}, t_{comp}, t_{send}\} \cdot n$  time. This allows to hide communication times if and only if  $t_{comp}$  is larger than the other two times (receive and send).

We mention this technique to be useful in clusters of network-connected workstations, but a similar situation may be present in CPU-GPU communication. GPUs are accelerators, thus are connected to the processor by some kind of bus (the PCIe bus), and have their own memory. Let's say we have a map(f) with a time-consuming f, on a huge v. We can **move** v **to the GPU's memory**, the **GPU kernel computes** map(f) over v and the result v' is then brought back to the CPU memory. The key point here is that we have to **move data to and from the GPU memory**. GPUs provide 1 or 2 **DMA Engines** that are in charge of this GPU-CPU data transfer. This DMA engines work like the NIC mentioned before: and address is written on the bus and it will handle read/write operations without impacting performances.

Given a vector v and a map to be computed, we can consider chunks of the vector such that the computation time over the chunks is  $t_f \simeq t_{comm}$  the communication time. This technique is very common and can be achieved in two ways: moving data from CPU to GPU explicitly or use **streams** which are abstractions, streams of operations. Streams can be of three types: move CPU $\rightarrow$ GPU, move GPU $\rightarrow$ CPU or compute. We setup three streams.

Overhead related to memory allocations New objects are created when instantiating inputs and results, for example. This requires allocating memory in the heap, with corresponding mallocs and frees. So we may need to be smarter, with solutions that work on thread-level memories, small heaps where to allocate objects and releasing when no more needed.

As for threads, the operation **new** which creates a new thread, and **join** which waits for the completion of a thread, of course both take some time. There is a **tradeoff** between the time spent to setup the parallel activity and the time earned from it. If the time spent computing by a thread is smaller than the time required by creating and joining it then it is pointless to perform that computation in another thread.

Telegram: Offexed 14 Github: fexed

jmalloc library Used in BSD systems, FireFox and Facebook among others.

It manages **chunks of memory** called **arenas**, distributed in a round robin way per thread with each thread using one arena. A metaarena is used a common place.

Arenas A1, A2,..., Ak are assigned in round robin to th1, th2.... When some data comes from an arena, and we free that data we free the original arena not the local one. jmalloc has its own API, but other than that it uses the classical API: malloc and free, same as stdlib.

Taking a normal program a.out, with ./a.out p1 ...pk we will go with malloc and free of the stdlib. If we prefix with LD\_PRELOAD=libjmalloc.so ./a.out p1 ...pk then malloc and free will be loaded from the jmalloc library.

#### 1.4 Load Balancing

Giving the same amount of work to all the cores involved. Even if a single thread takes longer than all the other, we wait that it fishes so we have a lot of empty time in the other threads. The efficiency lowers a lot, and poor speedup too

One of the reasons could be that the computation per se is unbalanced.

#### 1.4.1 Static Techniques

Related to the usage of different splitting policies.

Chunk policy Take the vector and split into adjacent parts

Cyclic policy First item to first thread, second to second thread...e.g. with two threads: first to th1, second to th2, third to th1, fourth to th2...

Mix A cyclic distribution of blocks. Split into blocks and assign the first to a thread, the second to the next... following the cyclic policy.

#### 1.4.2 Dynamic Techniques

We can do much more, adapting to the situation giving more things to do to the thread that so far have done less.

**Autoscheduling** Threads are not assigned a block/item or a distribution, but each threads *asks* for something to be computed. Some code like

```
while (more work to do) {
         ask work
         compute
         deliver result
}
```

The threads that gets longer tasks stops asking for more tasks for a while, and more tasks will be executed by the other threads. When tasks are almost finished, it may happen that some thread gets the "last" long task still taking longer than all the other threads. But it's not as impactful as stated before, as it's the worst case. In general, prefetch  $k \simeq 2$  or 3 tasks

**Job Stealing** Bunch of tasks, cyclic static assignment. With nw threads and m tasks, each thread gets  $\frac{m}{nw}$  tasks. With job stealing, the thread that has finished its tasks and perceives that there's more to compute, steals a task from another thread. Problems: "size" of the steal, synchronize accesses, who to target... the solution is a random policy: threads that finishes their own assigned task steal a random number  $\in [0, nw]$  and steal that number of tasks.

#### Autoscheduling + variable size chunks

**Template Based Implementation** For each pattern (pipe, farm, map, stencil...) we have templates:

```
for each target architecture (multicore, cloud...) activity graph
```

performance modelling, also a way to have an idea of what we can achieve in terms of performance

Let's analyze a template for a pipeline on a shared memory multicore machine (template(pipeline, SMmulticore)):

$$\longrightarrow thread \to thread \to \ldots \to thread \longrightarrow$$

With the  $\rightarrow$  being communication channels (queues). Modelling the speedup $(m) \simeq m$  with m being the number of stages, so a linear speedup, if and only if  $\max\{t_i\} \simeq t_i$  so the times of each stage are more or less the same.

But we can have another template

$$x_m \dots x_2 \ x_1 \longrightarrow \text{queue}$$

Where in the queue we put "compute f over  $x_i$ " as  $\langle f, x_i \rangle$  for each i. Each worker gets  $\langle f, x \rangle$  and puts in the queue  $\langle g, f(x) \rangle$ . For  $\langle g, x \rangle$  returns  $\langle h, g(x) \rangle$  and for  $\langle h, x \rangle$  returns  $\langle h, g(x) \rangle$ .

So the queue will have different types of tasks: the input tasks are assigned to f but the intermediate tasks are assigned to g or h: it's a pipeline. Of course, the end tasks means outputting on the output queue.

A problem: the queue is a bottleneck. We can use some techniques: set of queues, local queues... Also the ordering, but i can keep the index i for each stage and use it at the end to keep in a buffer and output in the correct order. With modeling provided sufficient  $T_A$ , the speedup can be proportional to nw and not m.

Another template is a round-robin scheduler that assigns to some workers that deliver results to sorter with respect to the input order. The scheduler gives  $\langle i, x_i \rangle$  and the output is  $h(g(f(x_i)))$  ordered on i. Pipeline. Uses nw + 2 concurrent activities, with a queue for each worker plus a queue for the sorter. With  $T_A$  low, we have speedup circa nw.

Changing architecture, from shared memory multicore to a clustered workstation. I can use any template ported to a cluster. For example

$$workstation_1 \rightarrow_{TCP/IP} workstation_2 \dots$$

The speedup is circa the number of stages. But we can also have a number of workers in each workstation, with a scheduler and a merger per each. A kind of composed template, the third inside the stages of the first.

$$T_S \simeq \frac{\max\{T_i\}_{nw}}{nw}$$

The overhead is in the scheduling and merging parts of each stage. The scheduling is necessary, but in the ordering part we sort data on the input order, but we can do that just at the end of the last stage.

Macro Data Flow Implementation With Data Flow we denote the operations where the order of computations are dictated by the flow of data encountered by the program counter. So we have nodes that are composed of  $\langle$  operator, variables, output $\langle$ . For example,

$$(a+b)\cdot(c-d)$$

can be represented as



We have tokens on the inputs, that represent when data are available. When all data is available, the node becomes "fireable". This technique can be used to implement parallel patterns, Macro because instead of considering operations as nodes with functions an input/output nodes we consider macros with full portions of code instead of primitive functions.

Let's consider a program main.cpp that uses some pattern internally from a pattern library. First step: from a pattern tree we compile the MDF (Macro Data Flow graph). We have a graph repository and a pool of executors: anytime I have some input data I create a copy of the graph in the repository with the tokens that represent the data.



Each executor executes a loop where they get a fireable instruction, compute and deliver the output tokens.

Compiler From the Pattern Tree it outputs the MDF graph.

Compile(Pipe(f,g))  $\rightarrow \langle f, \text{ in, A} \rangle, \langle g, A, \text{ out} \rangle$ 

 $Compile(Seq(f)) \rightarrow \langle f, in, out \rangle$ 

 $\operatorname{Compile}(\operatorname{Farm}(f)) \to \operatorname{Compile}(f) \text{ for each instance of input}$ 

 $\operatorname{Compile}(\operatorname{Map}(f)) \to \operatorname{Compile}(f) \text{ for each element of the input collection}$ 



 $Compile(Reduce(f)) \rightarrow$ 



Each one of the graphs has one input token and one output token, mandatory to be able to compose the graph. The Farm parallelism disappears and third

#### **Problems**

Contention over repository

Token delivery

Maintain the list of fireable instructions

Reordering on the out stream

**Refactoring Rules** The grammar used to write our program is:

$$\operatorname{Pat} = \operatorname{Seq}() \mid \operatorname{Pipe}(\operatorname{Pat}_1, \operatorname{Pat}_2) \mid \operatorname{Comp}(\operatorname{Pat}_1, \operatorname{Pat}_2) \mid \operatorname{Farm}(\operatorname{Pat}, nw) \mid \operatorname{Map}(\operatorname{Pat}) \mid \operatorname{Reduce}(\operatorname{Map})$$



Whit functional semantics we mean what is computed. Non-functional semantics refers to how the result is computed.

So refactoring rules describe the equivalences in the **functional semantics**. So different performance, parallelism degrees...

Some examples:

Farm $(x, n) \equiv \text{Farm}(x, m)$  with  $n \neq m$ Pipe $(x, y) \equiv \text{Comp}(x, y)$ Pipe $(\text{Farm}(x, \_), \text{Farm}(y, \_)) \equiv \text{Farm}(\text{Pipe}(x, y) \_)$ 

How to figure out the better ones? We will consider a small set of rules:

**Pipe introduction/elimination**:  $Comp(x, y) \equiv Pipe(x, y)$ 

Farm introduction/elimination:  $x \equiv Farm(x)$ 

Map fusion:  $Map(Comp(x, y)) \equiv Comp(Map(x, y))$ 

**Pardegree change**:  $Farm(x, n) \equiv Farm(x, m)$  with  $n \neq m$ 

The idea is to write a program (so a tree  $T_0$ ) and apply the rules (via a tool) getting  $T_1, T_2, \ldots$  all functionally equivalent to  $T_0$ . Usually the goal is a low  $T_{S_i}$ . We take the produced trees, mapping each to its  $T_S$  and reducing to min to get the minimum  $T_{S_k}$ .

But maybe we compute worse trees. I can do another thing: taking  $T_0$  trying to find the path in the possible trees. I apply rules and measure the metric (in this example,  $T_S$ ). Then I don't go exploring all the subtrees, applying all applicable rules, but I just go one level down and pick the best one each time. We follow a path that always gives a better (or equivalent) performing solution. But this is **not possible**. With a lot of approximations and details ignored, I can figure out the best parallel architecture.

**Normal Form** Speaking of stream parallel computations. Optimizes the service time  $T_S$  (more precisely, the throughput).

We can derive the normal form from every pattern tree, only with stream parallel patterns

- 1. Get the frontier, set of all the sequential leaves in the tree, left to right.
- $2. \ \, {\rm Comp}({\rm frontier}),$  so sequential composition in the order they appear.
- 3. Farm(Comp(frontier))

This process always aims at optimizing the parallel computation. Unless there are some particular reasons. We take the whole set of things to compute an put them together (1 and 2), meaning creating a chunk of code that do the work: increase the grain of the computation.

This optimizes the service time taking also into account resources.

Performance Models Used to evaluate or to optimize.

**Evaluate** meaning that, having a tree, we evaluate a metric (e.g.  $T_S$ ) visiting the tree from the leaves. For each leaf we're interested in the sequential time ( $T_S = L$  for the sequential nodes). The we go up one step visiting the parents: we have the parameters of the subtree and can compute  $T_S$  of the node given its subtrees. We can repeat until the root.

The visit of a tree is also a pattern, so if I take that pattern (visiting a tree from the leaves upward) passing the tree as a parameter I can get a number of different **visitors**:  $T_S$  in this case is a visitor. For example, for Visitor(# resources) we would visit the tree in the same way but using a different formula for computing the contribution of each node.

Optimize the parameters, for example the parallel degrees of the stages. In rplsh we used optimize main with max resources. How it's implemented? We take the tree, with Pipe in the root, same as before. We turn the tree with the nw figured out, but we have a certain number of resources. For example we want to shrink the number of resources to match the number of cores.

#### **RPLShell**



**Rewriting Rules** are the set of factoring rules.

For example, we have the facts that allow me to rewrite a given tree t as Farm(t) or  $Pipe(t_1, t_2)$  as  $Comp(t_1, t_2)$ .

**Optimiziation Rules** the set of rules that for example transform a Farm(Farm(w)) into Farm(w) (because two farms linked are bad for performance), known as farmfarmopt, or the farmopt that is used when we have Farm(w) with a certain  $T_S$  of the w and a certain  $T_S$  target of the farm in order to set the nw of the farm to  $\frac{T_S}{T_{S-w}}$ .

**Visitors** the elements that visit the tree upwards.

#### Autonomic Management of Non Functional Features In long running computations.

For example the classical variations in network load between night and day. Autonomic because Born from industrial processes management, changing behaviors of the systems. Different architectures of the controller:

#### MAPE Loop (Monitor, Analyze, Plan, Execute loop)

Execute activities via actuators, and the system has a set of actuator objects that can be invoked to change the behavior of the system (for instance changing nw). The system has sensors, too, that provide data to the monitor part.

We have to devise a strategy to be used. E.g.: if  $T_A$  drops, most likely I could just increase nw and viceversa if  $T_A$  increases I can drop nw. Being more clever: let's suppose that  $T_A$  goes from  $T_{A_{\max}}$  to  $T_{A_{\min}}$  in a sin fashion, when I'm in the minimum I could take time in taking a decision to increase nw while  $T_A$  is already increasing. Same for the maximum, I could spend time making the decision and always arriving late. So our strategy can be update: I can observe the changes and avoid situations of alternating opposite changes (in the example, the adding and taking away workers constantly).

To implement these strategies, I need  ${\bf sensors}$  and  ${\bf actuators}.$ 

We need to observe the interarrival time  $T_A$ . Also need a data structure to observe the changes, by doesn't require sensors. These sensors would be in the emitter thread.

We need to increase or decrease nw, so the actuators must be able to achieve that. To add a worker, e.g. a thread, we need to know the queue for that worker, the collector...so the emitter sounds like a good place.

This is implemented in the control program of our manager. Very crude if (condition) do actuator;.

ECA Rules Set of Event, Condition, Action rules implemented in a rule system (e.g. jboss).

Event: triggering action, e.g.  $T_A$  change. Not the event per se to be taken into account.

Condition: predicates on monitored values, internal state and events, that state whenever this particular rule has to be fired or not.

Action: set of actions on the system or the state that must be executed after some triggering event when the condition holds true.

#### Example

|       | Event        | Condition                                          | Action                              |
|-------|--------------|----------------------------------------------------|-------------------------------------|
| $R_1$ | Change $T_A$ | Less than the previous one                         | Adding a worker                     |
| $R_2$ | Change $T_A$ | More than the previous one                         | Removing a worker                   |
| $R_3$ | Change $T_A$ | Less than the previous one AND $nw$ just decreased | $\mathtt{nop}, \mathtt{update} T_A$ |
| $R_4$ | Change $T_A$ | More than the previous one AND $nw$ just increased | nop, update $T_A$                   |

#### Contracts

- 1. Propagate the "contract" top-down and try to implement it "best-effort"
- 2. Manage to keep contracts satisfied

We're taking into accounts performance, but often power consumption is more important. These are both non-functional features. If we take into account both, we could implement a manager for each. But the managers may take decision that contrast with each other. A solution is to build a single manager that takes into account both features, into a single MAPE loop. But this requires that both knowledge are in a single place, bad for the separation of concerns. Another solution is to have two managers, one for each feature, each with its MAPE loop, and the two can communicate: the plan phase inform each other in the hope to have authorization for a change that the execute part needs, if we don't get the authorization we go for a nop.



#### 1.5 Vectorization

Very old technique. Given x, y, z as vectors of  $x_i, y_i, z_i$  from 0 to n-1 and  $\forall i \in [0, n-1]$   $z_i = f(x_i, y_i)$ , implemented e.g. as a for. If the computation  $f(x_i, y_i)$  is independent of the other loop iterations (that is,  $x_i$  is independent from all other  $x_i, j \neq i$ ) then this is easily parallelized. A classical example is matrix multiplication:

$$\forall i \ \forall j \ \forall k \ c_{ij} = a_{ik} \cdot b_{kj}$$

2 steps: we need to have zeroes in the first positions and We have ways to load/store vectors to

Hardware designed specifically, vector processors. Registers that can be seen as vectors of tot bits numbers: 256 bits vector registers seen as 4 64 bits numbers, or 8 32 bits numbers and so on. With special commands, for example VADD.132 R1, R2, R3 meaning R1 = R2 + R3 with I32 meaning 32 bit numbers with I index.

Conditions In g++ we can explicitly ask with -03 -ftree-vectorize and with -fopt-info-vec-[missed/all] ask what cannot be vectorize.

The conditions are:

Need to know the number of iterations.

for is ok, while(c<k) sometimes cannot be vectorized

Cannot call external code in the loop body, functions or libraries.

No conditional code.

It'd require compiling two paths, so complicates things.

No overlapping pointers.

#pragma Indications to the compiler

#pragma GCC ivdep tells the compiler whatever it thinks of the loop, to consider it as independent iterations.

#pragma GCC unroll n tells the compiler that the following loop should be unrolled n times. Useful for short loops.

So vectorization very useful for mathematical operations, such as differential equations. Requires to take a bit of care in the core to be able to vectorize. If we don't convince the compiler, we can use the pragma.

If we vectorize Farm(f, nw), the speedup with the vectorized f with respect to the non-vectorized f is smaller. Because the vectorized code in general is faster, so non-vectorized Speedup(nw) Z Vectorized Speedup(nw).

But vectorization can radically change the  $T_S$  of the stages of a Farm, for instance, and introduce waiting times for workers. You don't change the sequential fraction, just the non-sequential fraction.

**Libraries** Libraries include operations used often, e.g. Blas (basic linear algebra system) or Lapack or NKL. For instance blas provides gemm for very optimized matrix multiplications.

Use libraries because they are very optimized, exploiting all the possible optimizations.

**GRPPI** C++ library that implements common parallel patterns, using standard threads or other backends (e.g. omp, tdd or fastflow). The backend is only modeled by the execution parameter.

#### 1.6 OpenMP

MPi Directed to clusters and workstations, multicores

CDA/OpenCL For GPU

**OpenMP** Parallel for, targets shared memory multicores (and GPU/FPGA). Very dated, since 1997 the version 1.0: no GPU, even multicores were not very popular.

At the beginning very simple API that provided launching threads in parallel and something related to the parallel for. Each version improved on that, introducing: tasks, target accelerators, depend clauses on tasks (to tell that a task must be executed only after the completion of another task), memory management, task reduction (among values of different tasks), task affinity...

When we speak about shared memory multicores, we speak of some situation where in the board we have more than one sockets with multicore in each socket.

#### Concepts

**Directives**, which OpenMP is based on. Directives, in C/C++ environment, are kind of pragmas which can be understood as portions of compiler

OpenMP is compiler-based

As pros:

Few lines of code are required to run parallel code, opposed to writing all the logic by oneself

Keep sequential code, means we can perform functional debugging and then adding parallel pragmas

As main con we need to rewrite the compiler, not entirely (such as the parser), but everything related to the usage of the pragmas must be generated at compile time.

E.g. the map seen in the GRPPI library is written at higher level but in the library, doesn't touch the compiler.

We have something used to set up parallel activities and something different used to organize the parallel computation using the parallel activities available.

#pragma omp parallel This must be placed before a statement, it tells the compiler that the following block must be executed in nw copies (threads) if and only if nw is the number of resources/cores of the current machine. Most directives have clauses used to specify parameters. E.g. num\_threads(nw) tells to use the variable nw instead of the predefined one (the number of cores).

```
#include <iostream>
2 #include <omp.h>
4 using namespace std;
5 int main(int argc, char** argv) {
   int nw = 1;
   if (argc != 1)
     nw = atoi(argv[1]);
   #pragma omp parallel num_threads(nw) // clause
9
10
      auto id = omp_get_thread_num();
      cout << "Ciao from id " << id << endl;</pre>
12
   }
13
   return(0);
14
15 }
```

#pragma omp single Specifies that the following statement/block is to be executed by just a single thread.

```
#include <iostream>
2 #include <omp.h>
4 using namespace std;
5 int main(int argc, char** argv) {
   int nw = 1;
   if (argc != 1) {
     nw = atoi(argv[1]);
      #pragma omp parallel num_threads(nw)
        #pragma single // this will be executed by a single thread, just one enters the
     following line, use a block for more lines
        cout << "Id " << omp_get_thread_num() << " of " << omp_get_num_threads() endl;</pre>
     }
13
   } else {
14
      #pragma omp parallel
15
16
        auto id = omp_get_thread_num();
17
        cout << "Id " << id << " of " << omp_get_num_threads() endl;</pre>
18
19
   }
20
   return(0);
21
22 }
```

#pragma omp critical To execute the next code in mutual exclusion.

#### To use OpenMP Compile with flag -openmp.

The a.out compiled, when executed, uses the linked library which provides several things including a lock/mutex machanism. So instead of the #pragma omp critical we can use #pragma omp parallel and then just the lock/unlock:

```
1 omp_lock_t lock;
2 omp_init_lock(&lock);
3 #pragma omp parallel
4 {
5    // code
6    omp_set_lock(&lock);
7    // mutex code
8    omp_unset_lock(&lock);
9    // code
10 }
```

With the code/unlock declared with respect to the OpenMP library.

#### Different code for different threads We can do something like

```
1 #pragma omp parallel num_threads(2)
2 {
3    if (omp_get_thread_num() == 0) {
4         // code for thread 1
5    } else {
6         // code for thread 2
7    }
8 }
```

This is **SPMD** code: single program multiple data.

#### Sections directive

#pragma omp sections: identifies a command which is filled by other section directives, each executed possibly in parallel

#pragma omp section

#### Example

All the section may happen in parallel, how much is defined and can be inherited by other statements. For example I can have a #pragma omp parallel before the #pragma omp sections, giving the threads explained before (the number of cores as default, or as specified by num\_threads(nw).

**Variables** In most directives you can append clauses that specifies the behavior of variables. For example in parallel pragmas you can append the clauses

```
private(x) (non initialized local copy local to each thread)
```

firstprivate(x, y) (get a local copy initialized from previous value)

shared(y, z) (inherit value from the global environment)

lastprivate(x) (this one legal only in sections, not in parallel, local copy then last section copied to global environment, take care last one means in syntactic-fashion, the last syntactic section updating the variables copies to global environment)

#pragma omp for Most famous and used.

The default behavior is that the iterations, e.g. from i = 0 to i = n-1, are made into nw chunks given each to a thread. Obviously we need independent iterations. There are others scheduling policies, specified with clauses appended to the #pragma omp for:

static the default one, with the option of specifying chunksize(m) specifying the number of items in each chunk, so  $\frac{n-1}{m}$  chunks of m items, given to the threads in a round-robin fashion

dynamic where the threads ask for new chunks when they have finished, chunks are assigned on demand. Same of the static for other things

guided use a number of chunks smaller and smaller: large chunks in the first part of the array, then smaller chunks, then smaller and so on. Intended to be used for unbalanced computations: at the beginning we assign large chunks to each thread, but the threads may finish at different times and get smaller chunks to accommodate for load unbalancing

auto everything decided by the compiler

runtime keyword which inherits the scheduling policy from an environment variable which has to be defined before

You also have other clauses

nowait, removes the implicit barrier to wait all the threads, present in the parallel pragma

```
reduction([+,*,and,or], x)
```

#pragma omp task Kind of async, suggestion that if there are unusued threads use to compute the task otherwise it's sequential code.

Can define taskgroups to execute in parallel and wait with the implicit barrier at the end.

You can append tied or untied: with the first anytime you assign/reassign the execution of this task then it would be on the same thread.

The taskloop provides something similar to a map, or a pragma for, but subjecting to the rules of the tasks: could be executed in parallel given enough resources ecc.

**Example** Sum of a vector with a reduce(+), base case being a vector of length 1 so return the value v[0]. Otherwise the recursive case is a vector of n positions: compute the half and split in v[0, $\frac{n}{2}$ ] and v[ $\frac{n}{2}$ , n] recur. So we need a task for the left half, a task for the right part, checking not to be in the base case.

```
int sum (int* restricted v, int start, int end) {
   auto size = end - start;
   auto mid = size/2;
   int x, y; //shared for the subtasks, sum from left and right
   if (size == 1) return v[start]; //base case
   #pragma omp task shared(x) //to share the result with this "main" thread
9
     x = sum(v, start, start+mid);
10
   }
11
   #pragma omp task shared(y) //same as befor
12
13
     y = sum(v, end-mid, end);
14
15
   #pragma omp taskwait //wait for the termination of the tasks, a "barrier"
   //now both tasks have finished, we have both x and y
   return x+y;
19 }
20
21 int main(int argc, char** argv) {
   #pragma omp parallel num_threads(nw) //or from the CLI "export OMP_NUM_THREADS=nw"
22
   //...
23
   #pragma omp single //single or master
24
25
   sum(v, 0, n-1);
26 }
```

Timing routines Also present.

**Taskpool** The idea is to set up nw threads to execute tasks without awaiting any kind of results from the tasks. Some king of void function.

We can use auto out = bind(function, parameter);, with void function(float x);, then out will be a function(float()) so we can call out(); because the parameter is already given.

#### 1.7 FastFlow

Addresses the idea of structured parallel programming targeting shared-memory multicores. It's a open-source header only library (-03 std=c++17). It's a complete layered system:

Parallel Applications

High level parallel patterns

Parallel building blocks (pipeline...), and also a small library of transformers (to change shape of structures such as pipelines...)

Core library of FastFlow: sequential building blocks

Posix threads, RDMA libraries, OpenCL/CUDA

 ${
m HW}$  layer: multicore shared-memory, but also GPU and FPGA accelerators, also COW (cluster of workstations) with TCP/IP and MPI



ff\_node Computes some function and has two queues: a queue from where it takes items to be computed and a delivery queue.

FastFlow has stream computations and data-parallel computations.

Two nodes can be merged in a pipe ff\_node: this merges the output queue of the first node into the input queue of the second node.

The single nodes implement the business logic, the rest is managed by the framework.

In the svc method, being void\* both in parameters and in return type, you can specify whichever type you need to flow through the tasks. You can pass a task as parameter, process it and cast to void\* its result to return it. With ff\_node\_t<t1, t2> it's introduced the type checking at template level.

#### Stream



The stream flows through the program and it could be infinite, in principle. We have the interarrival time to consider, so we cannot look the entire stream as a whole data structures, at most you can see a certain window.

EOS, End Of Stream: value that signify that the stream has ended. Can be used as a condition to end loops (much like EOF for files)

ff\_send\_out sends items in the stream. E.g. to end the stream, ff\_send\_out(EOS)

Shared-memory multicore, so each activity is a thread basically.

In the queues we host pointers, we don't move data. We target the fastest communication possible, non-blocking channels (without condition variables and other mechanisms): the native FastFlow queues has latency of less than tens of nanoseconds, due to cache overhead. But there are other channels, e.g.: the emitter for a farm is a 1:n communication channel so we use a father thread, slowing down the process but it's inevitable. This can be changed at compile time to use blocking communication channels (structures with mutexes, variables...).

ff\_node\_t<tin, tout> Requires the type of the input and of the output. Requires to implement a methods

tout\* svc(tin\* task); the body, the function computed by the thread.

Has to return(tout\*), or a return(EOS) when it wants to end (passing to svc\_end, or a return(GO\_ON) that tells the framework that you finished the work but doesn't put anything in the output stream. Kinda like a "skip and give me the next item").

Other two method are optional, provided empty but can be rewritten:

int svc\_init(); to initialize data before the computation, executed before svc over the input stream, e.g. to open a file

void svc\_end(); executed at the end, e.g. to close the file

svc stands for service.

#### Lifecycle of ff\_node\_t

- 1. svc\_init called once when the thread is started
- 2.  $\mathtt{svc}$  called for each item appearing on the input stream Returns

tout\*

EOF

 $GO_ON$ 

Other special values

- 3. svc\_end called right before terminating the thread
- 4. EOSnotify() callback upon receiving EOS, to do something when it receives EOS because upon receiving that we don't call svc but go to EOSnotify() and then svc\_end

#### Definition of the program I can declare several ff\_node\_t

```
ff_node_t<vector<float>> t1;
ff_node_t<vector<float>> t2;
```

Then we can put in a pipeline

```
ff_Pipe p(t1, t2);
```

So far no execution, only declaration

Running We call the run\_and\_wait\_end() method, for example p.run\_and\_wait\_end(); which starts everything.

```
1 ff_node_t<vector<float>> t1;
2 ff_node_t<vector<float>> t2;
3 ff_Pipe p(t1, t2);
4
5 {
6   utimer to("program"); // to time the execution
7   p.run_and_wait_end();
8 }
```

Typical Patterns With capital letter are higher level, the lower letter are lower level patterns

```
ff_Farm(ff_node_t worker, int nw); build a farm with nw parallel degree of workers as nodes ff_Pipe(ff_node_t stage, ...);
```

```
1 #include <ff/ff.hpp>
3 struct source : ff_node_t < myTask > {
   myTask* svc(myTask* t) { // source started with null as parameter, so t can be omitted
     for(int i = 0; i < N; i++) {</pre>
        {\tt ff\_send\_out(new\ myTask(i,\ \ldots));} \ \ //\ {\tt whatever\ you\ need\ to\ output}
     return(EOS);
   }
9
10 };
12 struct sink : ff_node_t<myTask> {
myTask* svc(myTask* task) {
     cout << task << endl;</pre>
     return(GO_ON); // processed and ready for next
15
16
17 };
19 struct f : ff_node_t < myTask > {
   myTask* svc(myTask* task) {
     fun(t); // e.g. it works by side effects
      return(t); // whatever the process I need
   }
23
24 };
25
26 int main(int argc, char* argv[]) {
  // declaration
   source node1;
  f node2;
   sink node3;
   ff_Pipe<> pipeline(node1, node2, node3);
   // implies 3 threads, one for each node (parallelism degree = 3)
33
34
   // execution
35
   pipeline.run_and_wait_end();
36
   return 0;
37
```

Compile with at least -03 -pthread. This way uses non-blocking queues.

Pipeline ff\_Pipe<> pipeline(...);

Usage statistics Use -DTRACE\_FASTFLOW for debugging purposes, for usage statistics.

Creating tasks For example a vector of ff\_node pointers, with vecto<unique\_ptr<ff\_node>> W;. The unique\_ptr is an abstraction provided by C++ to associate a usage counter to a pointer, allocated by the system. We then add the workers to the vector with W.push\_back(make\_unique<funstageF>());.

```
1 float fun(float x) {
    this_thread::sleep_for(2*tf);
    return x/2.0 + 1.5;
4 }
6 struct funstageF : ff_node_t < myTask > {
    myTask* svc(myTask* task) {
      task \rightarrow x = fun(task \rightarrow x);
      return task;
    }
10
11 }
13 int main(int argc, char* argv[]) {
    int m = atoi(argv[1]);
14
    int d = atoi(argv[2]);
    source s1(m, d);
16
17
    vector < unique_ptr < ff_node >> W;
18
    for (int i = 0; i < 5; i++) {</pre>
19
20
      W.push_back(make_unique < funstageF >());
    cout << "Worker N. " << W.size() << endl;</pre>
22
    ff_Farm < myTask > f2(move(W));
23
    sing s4;
25
    ff_Pipe <> myPipe(s1, f2, s4);
26
    myPipe.run_and_wait_end();
27
    myPipe.ffStats(cout); // compile with -DTRACE_FASTFLOW
28
29
    return 0;
30
31 }
```

Other types of nodes You can use Multiple Output nodes ff\_monode\_t and Multiple Input nodes ff\_minode\_t. In a classical ff\_node\_t you may just return(something) or ff\_send\_out(something) something that goes in the output queue.

In the multiple output node you can use ff\_send\_out\_to(something, worker) where you specify which worker may receive the data.

In the multiple input node you can use ff\_get\_channel\_id() used to understand which channel provided the result. You also have from\_input() that can be used to distinguish that type of channel (feedback channel or not).

**RoundRobin** The default policy is round robin.

If compiled with  $\neg DFF\_BOUNDED\_BUFFER$  and  $\neg DDEFAULT\_BUFFER\_CAPACITY=k$ , program wide the queues will be bounded to k elements. When the emitter finds all the queues full, it simply waits, looking for empty spots in all the queues.

Scheduling In a farm, there's the method farm.set\_scheduling\_ondemand(). It's not a true on demand, meaning that a worker without work asks the emitter for something. Instead, this forces the input buffers to be short, kind of on demand because the queues will empty faster and the emitter will fill them faster, a good approximation of the worker asking for a task without the need of a feedback channel from each worker to the emitter.

**OFarm** ff\_OFarm has the same rules as the farm pattern but Ordered, meaning that the collector of this farm sorts the results.

Limitations: workers cannot change the length of the string (no GO\_ON or multiple ff\_send\_out upon receiving a task), and workers has to be sequential.

Master Worker Pattern Master directs tasks to workers and receives results. Can be achieved with a ff\_Farm playing with the parameters: eliminate collector, wrap-around (feedback loop) the workers. The problem is that with the wrap-around we have to be careful in handling the feedback loops.

The master also needs to discern the results of the workers (from the feedback loop) from the queue of tasks it has to send out coming from its queue. So we call the first time when there are no tasks in the queue, getting NULL as task in, and start the tasks returning GO\_ON, then waits for the results from the feedback channel.

```
for (int i = 0; i < m; i++) {</pre>
       TASK* t = new TASK{i, ((float) rand())/((float) INT_MAX), (rand()%100)};
       ff_send_out(t);
     }
     return GO_ON;
   } else { // getting tasks from feedback channels
     std::cout << "Feedback " << tin->taskno << " " << tin->x << std::endl;
     sum += tin -> x
     m --;
     if (m == 0) return EOS;
     else return GO_ON;
13
   }
14
15 }
 ParallelFor Needs #include <ff/parallel_for.hpp> because it's a higher level pattern.
#include <ff/parallel_for.hpp>
3 ParallelFor pf; //we'll see the parameters later
_{5} pf.parallel_for(0, // initial value of the iteration value, e.g. 0 if int i = 0
          n, // limit of the iteration value, e.g. n if i < n
          1, // step incremen, e.g. 1 if i++
          0, // chunk size
          [](const int i) { }, // lambda for the ith iteration
          nw); // parallel degree
 For example:
1 for (i = 0; i < n; i++) {</pre>
   v[i] = f(v[i]);
3 }
 becomes
pf.parallel_for(0, n, 1, 0, [&](const int i) { v[i] = f(v[i]); }, nw);
```

Chunksize of 0 means that the vector in nw blocks of equal size and each worker gets a contiguous chunk of  $\simeq$  #iterations/#workers (**static scheduling**). With a chunksize of > 0, we have **dynamic scheduling** with task granularity equal to chunksize (meaning that the vector is divided into blocks of chunksize elements, dynamically means that a idle worker gets a chunk), and with chunksize < 0 we have **static scheduling** with blocks of chunksize elements same as before.

With the #pragma omp for we can append many clauses, e.g. reduction(+:sum) (with an operator followed by a variable in the parentheses). For instance, in the following code

```
1 #pragma omp for reduction(+:sum)
2 for ( ) {
3    sum += x[i];
4 }
```

1 TASK\* svc(TASK\* tin) {

if (tin == NULL) { //first run, start tasks

the sum is managed in parallel exploiting the threads: each thread has its own  $sum_i$  variable and each intermediate sum are summed together (because we specified +) when the parallel for finishes.

The ParallelForReduce supports the name of the variable and the lambda function to apply. An example: need to compute dot product between a and b, so  $\sum_i a_i \cdot b_i$ . Usually would do (sequential code)

```
int sum = 0;
for (i = 0; i < n; i++) {
   sum += a[i] * b[i];
}
With parallel for

1// variable, zero value to initialize, starting from, ending to, increment, chunk size, lambda, final lambda, parallel degree</pre>
```

ParallelFor is a data parallel pattern. At the end of the for there's a barrier that waits for all the threads involved to finish.

Divide and Conquer Pattern ff\_DC(divide, combine, base, cond, input, outpu, nw)

divide splits the input

combine puts together partial results

base function applied to short portions/single elements

cond is a condition that tells if you are in a short portion/single element

input and output are the input and output vectors

nw is the parallel degree

Macro Data Flow Pattern Used to describe custom patterns: use tags to specify nodes and dependencies. An example: compute the following graph



With a function that computes the sum sum(a, b) and one for the multiplication mul(a, b). We setup some parameter info structures that tells the kind of usage of the parameters

```
1 // A = A+B
2 param_info _1 = {&A, input};
3 param_info _2 = {&B, input};
4 param_info _3 = {&A, output};
5 // we've described that A and B are read and A is a result
6 // mdf is the Macro Data Flow object initialized before somewhere
7 mdf.add_task(P, sum, &A, &B, size); // append the node specifying the task
```

FastFlow Building Blocks Few kind of blocks: sequential, parallel...

Sequential Building Blocks Different kind based on number of inputs and number of outputs.



These can be combined sequentially.

**Parallel Building Blocks** Pipelines with: single block, two blocks, two blocks with a feedback channel of the second toward the first.

Farm style building blocks: emitter which connect to a set of nodes (workers), collector which takes from the workers. The emitter and collector are sequential building blocks that can be linked to parallel building blocks as workers. All-to-all building blocks



Graph Transformers For example having a Pipeline with a Farm as intermediate stage, meaning ff.Pipe<int>mPipe(S1, Farm2, S3);, we can optimize it including the sequential computation of the first stage S1 in the emitter of the Farm and the sequential computation of the last stage S3 in the collector of the Farm. We can declare a optlevel opt; optimizer and then set opt.remove\_collector = true;, opt.merge\_with\_emitter = true; and optimize with optimize\_static(mPipe, opt);. All this at declaration phase.

Concurrency Throttling Useful to handle the pressure from the input, adding or removing workers accordingly. It's possible to declare a Farm with a given nw and freeze some of the workers. Meaning that the emitter only perceives the non-frozen workers, the frozen workers are "ghosts" and may be unfrozen when needed.

#### 1.8 Parsec

Benchmarks

Blockslides Portfolio of options, to compute the prices require partial differential equations

Cammeal

Simulated Annealing

Dedup

Ferret Pipeline

- 1. Come up with an initial solution Pipe(S1, S2, S3, S4, S5)
- 2. Refactoring techniques and possible explore the solution space to find the better solution Before coding! Insights on possible performance, possible solutions to discern the better one.
- 3. Implementation phase of the better solution: OMP...

In the project, figure out from the sequential code the values of interest (e.g. time spent in the stages) and once we have a clear idea, refactoring and then implement the best solution.

### 1.9 Stateful Computations

So far assumed that stateless stages/workers/components. E.g. in a Pipeline, the stages are functions  $f, g, h, \ldots$  where we move f(x) to g, g(f(x)) to  $h, \ldots$  and we just compute function, no interaction between the stages. In a stateful computation the computation depends on: input, state and the function. So not f(x) but f(x, state)



Telegram: Offexed 32 Github: fexed

We don't mind the order of accesses in the state but it has to be accessed in mutual exclusion. If the accesses to the state happen at the same time, then it would be difficult to compute the times. So we make another assumption  $t_i = t_{state_i} + t_{fun_i}$  with  $t_{state_i}$  being the mutex.

|                                        | S <sub>1</sub> | \$ <b>\$</b> \ | )    |     |                                         |     |
|----------------------------------------|----------------|----------------|------|-----|-----------------------------------------|-----|
|                                        | S2             |                |      |     |                                         |     |
|                                        | S <sub>3</sub> |                | · /# | *   |                                         |     |
| Sı                                     |                |                |      | 44  | + + + + + + + + + + + + + + + + + + + + |     |
| S,<br>S <sub>2</sub><br>S <sub>3</sub> | (1111)         | 14/            | lu   | 4,  | 141                                     |     |
| ತ್ಯ                                    |                |                | 1//  | 4/1 | 9,                                      | (4) |

**Accumulator** We have a parallel pattern and a **state pattern** (accumulator). Each parallel activity updates the pattern using a function f (associative and commutative). The result is the final value of the state. Characteristics are: type of global states and functions and whether or not the intermediate results means something.

#### **Successive Approximation**

**Example** An emitter and nw workers, followed by a collector.

Without state,  $t_S = \max\{t_e, t_c, \frac{t_w}{nw}\}$  because nw tasks, nw workers each  $t_w$  giving nw results. With state, nw tasks with  $t_f + nw \cdot t_s$  giving nw results. So  $\frac{t_f}{nw} + t_s$ . The Speedup $(nw) \simeq nw$ . With m tasks

$$\frac{m(t_f + t_s)}{m(\frac{t_f}{nw} + t_s)}$$

$$\lim_{nw\to\infty} = \frac{t_f+t_s}{t_s} = \frac{t_f}{t_s} + 1$$

With k stages, m tasks and  $t_f + t_s$  time in each stage per each task

$$k \cdot m(t_f + t_s) =$$

#### Common State Patterns

Resource: lock, replace, unlock

**Accumulator**:  $x_i, s = f(x_i, state)$  with f associative and commutative.

We can use the global value (only) or as local state and finally update the global

Owner Computes: the state is a vector and the ith position can only be updated by the ith concurrent activity. Attention: if using an actual vector this is a source of false sharing overhead, possibly resolved with padding techniques.

Read only: shared state is read only, we can just copy it

#### 1.10 MPI

Message Passing Interface Set of processes plus this MPI that allows to send/receive data or to act with more structured communications. So no pointers but buffers, that are copied.

**SPMD** Single Program Multiple Data:

**MPI** On clusters/network of workstations

**OpenMP** On shared-memory multicores.

Use the library With mpicc compiler.

MPI\_Init(&argc, &argv) at the beginning of the main.

MPI\_Finalize() at the end of the main.

Runs on all the instances.

Relies on the concept of **communicator**: set of processes capable of communicating. Simplest communicator: MPI\_COMM\_WORLD, hosts all the processes in MPI. Can be splitted in subcommunicators: scatter is a broadcast (everyone gets data including the one that scatters).

To know who I am: MPI\_Comm\_rank(communicator, &me) getting in me my ID (one of the numbers between 0-15 if called with mpirun -np 16)

To know how many we are: MPI\_Comm\_size(communicator, &me), returns the total number.

These calls can also be done into a subcommunicator. Use the subcommunicators for example for two set of workers.

Send/Receive Many ways to send and receive.

The message has an envelope. The message has a pointer, size of the data (send contiguous addresses) and type of data. The envelope has (from, to, tag). Tag can be used to discern normal messages, init messages, EOS messages...

MPI\_Send(void\* data, int count, MPI\_Datatype type, ind dest, int tag, MPI\_COMM communicator)
MPI\_Receive(void\* data, int count, MPI\_Datatype, int source, int tag, MPI\_COMM comm, MPI\_Status\* status)
They are blocking calls but the sender only waits until the message is copied in the buffer, and the received only waits
until the buffer is full.

Collective Distributing data. E.g. MPI\_Bcast, broadcast, one sending (root) and all receiving (sender included): the data in the buffer of the sender is copied in each receiver's buffer.

MPI\_Scatter is similar, but the buffer of the sender is divided and each receiver gets a portion. You specify the cell counts.

MPI\_Reduce takes the values in the buffers of the processes applying the operation (e.g. plus) and puts the result in the root's buffer.

MPI\_AllReduce is similar but the result is duplicated in each processe's buffer.

MPI\_Gather gets the values and concantenate them into the root's buffer, no operations. MPI\_AllGather for putting the results in each process's buffer. Remember that it's not shared memory, so with MPI\_Gather the result can be seen only by the root and there's no way of getting it from the other processes.

#### 1.11 Accelerators

#### 1.11.1 GPUs

Graphic Processing Units Aimed mainly at managing objects on the screen, mainly moving (copying) shapes on the screen. For each pixel, you can do a number of operations, typically in a data parallel fashion.

**GP-GPUs** General Purpose. We use the same devices engineered to operate in parallel, because the general idea is **vector processing**. They are composed of processors, each takes the instruction flow from the memory and sorts it into smaller children units. In CUDA terminology the processors are called SM (Streaming Processor) and the child units are Cores (powerful ALUs). It follows the SIMD (Single Instruction Multiple Data) model: single instruction to the processor and instructions with multiple data to the cores (vector processing).

GPU's memories have wider access paths that allow to access more addresses at the same time. Can use the most significant bit to select the memory module (each with  $k \times$  wordsize capacity), using a multiplexer with that bit as selector. If I want to read more than one position at the same time, I can select the other bits to get the "same"

address in each module at the same time. This in theory, with Coalesced Accesses: accessing blocks of memory that start with the same bits (e.g. changing only the last k bits).

Memory Most GPUs distinguish between global, read only and local memory.

Read only for historical reason, kind of RAM, e.g. storing fixed image patterns used many times.

Global memory is accessible by each core but its slower.

Local Memory is bind to the cores, accessed by clock cycle. Not like normal registers but close.

**External** GPU are used as accelerators, meaning that they are external devices with respect to the CPU, connected with the PCIe bus. It has its own memory, so when you need to do something on the GPU you need to transfer data to it. In order to compute, for example, a map(C, f) so  $\forall i \in C$  i' = f(i), we can do it on the CPU like we've done until now or on the GPU with three steps:

- 1. Assuming C is in the central memory M, we need to copy C from M to the GPU's global memory
- 2. Execute the map on the GPU that'll write the resulting collection C' somewhere in the GPU's global memory
- 3. So we need to copy C' from the GPU's memory to M

PCIe has a number of lanes, e.g. 16.

GP-GPU usually have a DMA device that are able to do independent transfers on the PCIe from/to the general memory of the CPU where the bus is attached. The DMA devices are often two, with a specific intent: moving data to/from the GPU while the GPU is computing something else. This means we have to program them.

#### Integrated GPUs Two kinds:

GPU on the same chip with a sort of dedicated PCIe express connecting directly to the CPU, otherwise same mechanisms as before

Still GPU on the same chip with a dedicated interconnection with the CPU and possibly with the memory (cache), and perhaps its own memory. This means faster connection because, being a dedicated link there's less overhead due to management protocols.

**Modern GPUs** Still GP-GPUs with the option of having dedicated interconnection between them (SLI) such that if we have n GPUs of x SM each, we see the whole assembly as a single nx SMs GPU.

#### Using the GPU

- 1. Manage data: moving data to/from GPU memory and also managing allocations in GPU memory.
- 2. Orchestrate parallel computation, meaning managing communication and synchronization: telling to streaming processors and to cores what to do, meaning what to compute and to which data.

  Usually vendors provide abstractions.

#### **Abstractions** nVidia terminology

Grids: groups of threads

Group of threads: a number of warps

Warps: groups of threads (usually 16) need to execute linear, non-diverging codes. Coalescent accesses, one per cycle

So a single activity is denoted by a triple  $\langle \operatorname{grid} \operatorname{component} \operatorname{number}, \operatorname{block} \operatorname{number}, \operatorname{thread} \operatorname{number} \rangle = \langle GC, B, th \rangle$  (at least block and thread number). This means that what we have to tell the GPU is what the single thread performs: the unit of computation is called **kernel** and tells what to do in the  $\langle x, y, z \rangle$  thread. The values are taken with abstractions provided by the GPU, and you only need to write the computation.

Telegram: Offexed 35 Github: fexed

```
1 cuda Malloc*
2 cuda Memcpy* //data from host to device
3 << >>() //kernel computation
4 cuda Memcpy* //device to host
5 cuda free*
```

We move O(m) data to the device, compute O(m) and then move back the O(m) resulting data. From the point of view of times: start at  $t_0$  setting up the bus, then send O(m) with a time depending on the speed of the bus, compute O(m) and take back O(m) results. The transfers to and from the GPU is overhead (from our point of view of parallel computation, it's in the range of setting up the data and collecting it, not useful computation). More overhead we have more parallel computation we need to amortize it. So the GPU is useful when the computation on O(m) data is far longer than the time needed to transfer the same O(m) data back and forth. E.g., matrix multiplication: transfer  $O(n^2)$ , compute  $O(n^3)$  operations and transfer  $O(n^2)$ . In both cases we can use the **stream**.

**Streams** Object we declare to send operations to the GPU, made of copies, in and out, and kernels. For example, with  $DMA_1$  copying from the GPU and  $DMA_2$  copying to the GPU, with the cores computing, we can have a situation like

$$Stream_1 = cpyin(ds_1), kernel, cpyout(ds_1)$$

$$Stream_2 = cpyin(ds_2), kernel, cpyout(ds_2)$$

$$Stream_3 = cpyin(ds_3), kernel, cpyout(ds_3)$$



```
#include <stdio.h>
2 #include <stdlib.h>
3 #include <time.h>
5 using namespace std;
void vecaddGPU(float *r, float *a, float *b, int n) {
s #pragma acc kernels loop copyin(a[0:n], b[0:n]) copyout(r[0:n])
   for (int i = 0; i < n; i++) r[0] = a[i] + b[i];
10 }
_{12} // note that the GPU memory is managed automatically and not explicitly by the programmer
13 int main(int argc, char* argv[]) {
int n; // vector length
  float * a, * b, * r, * e; // inputs, output and expected vectors
   int i, errs;
16
17
   if (argc > 1) n = atoi(argv[1]);
18
   else n = 100000; // default vector length
19
   if (n \le 0) n = 100000;
21
   // allocate vectors on the CPU
   a = (float*) malloc(n*sizeof(float));
23
  b = (float*) malloc(n*sizeof(float));
24
   r = (float*) malloc(n*sizeof(float));
   e = (float*) malloc(n*sizeof(float));
26
   // populate vectors
27
  for (i = 0; i < n; i++) {
28
    a[i] = (float) i+1;
29
    b[i] = (float) 1000*i;
30
31
32
   // compute on the GPU
33
  vecaddGPU(r, a, b, n);
34
35
   // compute on host to compare
36
   clock_t start = clock();
37
   for (i = 0; i < n; i++) e[i] = a[i] + b[i];</pre>
38
   clock_t stop = clock();
39
   printf("Seq time is: %f\n", ((double) (stop - start)/CLOCKS_PER_SEC));
40
   //compare results
42
   errs = 0;
43
   for (i = 0; i < n; i++) {
44
    if (r[i] != e[i]) errs++;
45
46
47
  printf("%d errors found\n", errs);
48
  return errs;
49
50 }
```

**Kernels** Even with annotated code, where we don't have to write all the surrounding logic regarding the parallel computations, you have to be aware that you're executing a kernel which is a for loop of independent iterations.

Given some C code, which we want to turn into a kernel, it's relatively easy to turn into an AST (Abstract Syntax Tree). The we can take the list of variables and see those that are read before being written, those are plausibly the parameters we're getting from somewhere else, and those that are written and then never read anymore, which plausibly are the outputs to be reflected on the main memory.

You need a lot of code to **specify everything**. In particular three kinds of commands that you need to take into account:

Figure out the configuration of the board

Define kernel

OCL: compile a string, e.g. a file read from somewhere and with a specific command produce a kernel from the string that is directed to the GPU

CUDA: special keywords that gets the code from the CUDA compiler, which is a wrapper on top of the C++ compiler

Direct command queue: copy out, copy in, kernel exec

OpenCL is both for CPU and GPU, so figuring out the config means how many cores on CPU too. On CPU, can do conditionals and divergent code in general, things you can't do on GPU. OpenCL can be used for FPGAs too.

#### 1.11.2 FPGAs

Devices with components that can be programmed to act as other components simply configuring them at the beginning, somehow. Basically: logical gates, small registers, small ALUs...

A FPGA is an array where the single cell can act as if it were:

a small (5-7bit input and 1bit output) boolean network: made with a kind of EPROM which uses the input bits as an address and stores a single bit output

a 1 bit register

a router redirecting to a neighbor cell

The columns can host some RAM blocks, 1KB memory for intermediate values, or DSP blocks, basically multiplying addresses (multiply two inputs, adds the result with a register, stores the result of the addition in that register while propagating it too).

These are organized in columns so that there are columns composed solely of cells interleaved with RAM/DSP columns: a cells column may implement a certain function that stores the result in a RAM column, that may be routed in a way to do come floating point operations in a DSP column...

RTL Register Transfer Level code used to program FPGA:

```
VHDL
```

```
Verilog
```

```
module Name (input [N-1:0]x, input [N-1:0]y, output [N:0]z);

// code example
assign z = x + y; // creates a block which computes the sum and outputs in z end module;
```

FPGAs are very good at executing pipelined code, with better performance than CPUs. For example

$$\forall i \ a_i = b_i + c_i$$

a FPGA may operate with a memory interface able to load  $a_i$  and  $b_i$  into registers, then an adder which outputs to another register that delivers to a memory interface outputting  $c_i$  in the memory. It costs a clock cycle per register level: first level of two registers, then adder and register and a final level with the memory interface which outputs  $c_i$ . Hence it costs 3 clock cycles.

On a CPU this would've took 2 loads, the add and the final store: 4 operations each of the fetch-decode-execute pipeline.

Also, this structure (which is a RAM column for the origin of  $a_i, b_i$ , a cells column for the memory interface, a DSP column for the add, another cells column for the memory interface and a final RAM column for the output  $c_i$ ) can be easily replicated in multiple copies obtaining an hardware parallel computation of something already much faster than an original CPU code.



To achieve this you need a complex workflow which can go multiple ways. The classical is:

Write some RTL code

Run the compiler, which solves many problems. In particular, FPGAs are graphs and a task is mapping between tasks which is NP-hard and may take hours to complete

You get a bit stream file that we send to the FPGA (in configuration mode): is basically the entire configuration of the chip, and takes  $\mu$ seconds.

This is simplified, usually the RTL code is firstly run into simulators to check everything from performance to results.

**Xilinx** Assumes a kernel code written in C with no pointers, no recursion... and other limitations, filled with pragmas with calls to a library of components (e.g. communication channels to interconnect different kernels to a buffer in hardware). With HLS (High Level Synthesis) tools, the "compiler", you get the bit stream.

To operate the bitstream it's a different story. You have .ocl code where normally you have the createProgram with the GPU specified, that outputs a kernel that you can submit for execution in a queue, instead of the GPU you specify the bitstream obtained.

Then, you can run on the FPGA or simulate it on CPU/GPU.