<a href="https://colab.research.google.com/github/Thomas-Fabbris/parallel-computing-polimi/blob/main/PTHREADS/PThreads.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Posix Threads**
Low-level management of parallelism in C: the programmer needs to manually manage threads creation and synchronization.

## **Setup**

In [None]:
%%capture
!apt install clang
!mkdir /home/pthread
%cd /home/pthread

## **Glossary**

PThreads are extremely well documented in plenty of sources, not need for me to write a glossary this time!

A source I recommend is this: https://www.cs.fsu.edu/~baker/realtime/restricted/notes/pthreads.html

An extra piece of handy information, the expected signature of a thread's routine is:
<br>
`void *(*start_routine)(void *);`
<br>
Which means:
- the function must accept a single argument of type `void *` (so you can pass any kind of data to the thread);
- the function must return a `void *` (so the thread can return any type of result back to the caller);

This gives you maximum flexibility in passing and returning arbitrary data between threads, such is (a fraction of) the power of pointers to void in C!
<br>
Remember to cast to, and out of, them!

## **Hello, World!**

### **Exercise 1**

Multiple concurrent threads printing "Hello, World!" (plus some meaningless computation to make the threads run longer):

In [None]:
%%writefile /home/pthread/hello_world_1.cpp
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#define NUM_THREADS	5

void * printHello(void *threadid) {
  int i;
  double result = 0.0;
  sleep(3);
  for (i = 0; i < 10000; i++) {
    result = result + sin(i) * tan(i);
  }
  printf("%ld: Hello World!\n", (long) threadid);
  pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
  pthread_t threads[NUM_THREADS];
  int rc;
  long t;
  for (t = 0; t < NUM_THREADS; t++) {
    printf("In main: creating thread %ld\n", t);
    rc = pthread_create(&threads[t], NULL, printHello, (void *)t);
    if (rc) {
      printf("ERROR; return code from pthread_create() is %d\n", rc);
      exit(-1);
    }
  }
}

Overwriting /home/pthread/hello_world_1.cpp


Questions:

- What is missing to ensure that all the work is completed before main() ends? <!--all the 'pthread_join's-->
- Is the order of execution always the same? <!--hah, you wish it was!-->

Compile:

In [None]:
%cd /home/pthread
!clang hello_world_1.cpp -lpthread -lm -o hello_world_1

/home/pthread


Execute:

In [None]:
!./hello_world_1

In main: creating thread 0
In main: creating thread 1
In main: creating thread 2
In main: creating thread 3
In main: creating thread 4


###**Exercise 2**

Use of mutexes and conditional variables to enforce execution order:

In [None]:
%%writefile /home/pthread/hello_world_2.cpp
#include <pthread.h>
#include <stdio.h>

pthread_mutex_t mux;
pthread_cond_t cond;
int actual_condition = 0;

void * printMessage(void * threadIndex) {
  if (*((unsigned int *) threadIndex) == 0) {
    pthread_mutex_lock(&mux);
    printf("Hello, ");
    actual_condition = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mux);
  } else {
    pthread_mutex_lock(&mux);
    while (actual_condition == 0) {
      pthread_cond_wait(&cond, &mux);
    }
    printf("World!\n");
    pthread_mutex_unlock(&mux);
  }
  return 0;
}

int main() {
  /* Thread data structures */
  unsigned int idx[2];
  pthread_t threads[2];
  pthread_attr_t attr;

  /* Initialize the attribute(s) for the threads creation */
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  pthread_mutex_init(&mux, NULL);
  pthread_cond_init(&cond, NULL);

  for (unsigned int ii = 0; ii < 2; ii++) {
    idx[ii] = ii;
    pthread_create(&threads[ii], &attr, printMessage, (void *) &idx[ii]);
  }
  pthread_join(threads[1], NULL);

  /* Deallocate the structures */
  pthread_mutex_destroy(&mux);
  pthread_cond_destroy(&cond);
  pthread_attr_destroy(&attr);

  return 0;
}

Overwriting /home/pthread/hello_world_2.cpp


Questions:
- what execution order do the conditions enforce (i.e. which thread returns first)? <!--thread 0 will always return first-->
- could there be a deadlock? <!--no, thanks to "actual_condition"-->

*Note: conditional veriables are not semaphores, if you are not waiting on them while they get signaled and arrive aftewards, you will not wake up until the next signal!*

Compile:

In [None]:
%cd /home/pthread
!clang hello_world_2.cpp -lpthread -o hello_world_2

/home/pthread


Execute:

In [None]:
%cd /home/pthread
!./hello_world_2

/home/pthread
Hello, World!


Inspect the compiled code:

In [None]:
%cd /home/pthread
!clang hello_world_2.cpp -S -emit-llvm -lpthread
!cat hello_world_2.ll

/home/pthread
; ModuleID = 'hello_world_2.cpp'
source_filename = "hello_world_2.cpp"
target datalayout = "e-m:e-p270:32:32-p271:32:32-p272:64:64-i64:64-f80:128-n8:16:32:64-S128"
target triple = "x86_64-pc-linux-gnu"

%union.pthread_mutex_t = type { %struct.__pthread_mutex_s }
%struct.__pthread_mutex_s = type { i32, i32, i32, i32, i32, i16, i16, %struct.__pthread_internal_list }
%struct.__pthread_internal_list = type { %struct.__pthread_internal_list*, %struct.__pthread_internal_list* }
%union.pthread_cond_t = type { %struct.__pthread_cond_s }
%struct.__pthread_cond_s = type { %union.__atomic_wide_counter, %union.__atomic_wide_counter, [2 x i32], [2 x i32], i32, i32, [2 x i32] }
%union.__atomic_wide_counter = type { i64 }
%union.pthread_attr_t = type { i64, [48 x i8] }
%union.pthread_mutexattr_t = type { i32 }
%union.pthread_condattr_t = type { i32 }

@mux = dso_local global %union.pthread_mutex_t zeroinitializer, align 8
@cond = dso_local global %union.pthread_cond_t zeroinitializer, a

## **Calculation of pi**

### **Exercise 3**

Montecarlo algorithm for the calculation of pi: create a set of random points in a square with side *n*, check if they are inside a circle with diameter *n* and calculate the ratio between internal and external points.

<img align="middle" src="https://drive.google.com/uc?id=16bV6NuIUSkB5GNiVWEVlXxcjYp_BAEBb">

<img src="https://drive.google.com/uc?id=1aq0iENGBsqKQ2f0RqlgVGoQZRlbbkk1E">

The way each thread has its own counter and they all merge it atomically at the end is just another, very simple, instance of privatization.

In [None]:
%%writefile /home/pthread/pi_montecarlo.cpp
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

/*Number of points*/
unsigned int num_points = 100000;

/*Number of threads*/
unsigned int num_threads = 16;

/*Points inside the circle*/
unsigned int inside_points = 0;

/*Mutex */
pthread_mutex_t mutex;

/*Barrier */
pthread_barrier_t barrier;

/**
 * Check if some random points are inside or outside a loop with radius 1
 * @param data is the index identifying the thread
 */
void * check_points(void * data) {
   /* The index of the current thread */
   unsigned int index = *((unsigned int *) data);
   unsigned int seed = index;

   /* The number of points to be examined by this thread */
   unsigned int num_points_per_thread = num_points/num_threads + (index < num_points%num_threads ? 1 : 0);
   unsigned int i;
   unsigned int local_count = 0;
   /* The core of the application */
   for(i = 0; i < num_points_per_thread; i++) {
      double x = (rand_r(&seed)) / (double) RAND_MAX;
      double y = (double) rand_r(&seed) / (double) RAND_MAX;
      if(x*x + y*y <= 1)
         local_count++;
   }
   printf("Thread %d - Inside points %d/%d\n", index, local_count, num_points_per_thread);
   fflush(stdout);

   /* Lock and update global result */
   pthread_mutex_lock(&mutex);
   inside_points += local_count;
   pthread_mutex_unlock(&mutex);
   pthread_barrier_wait(&barrier);

   return 0;
}

int main(int argc, char ** argv) {
   /* Thread data structure */
   pthread_t threads[num_threads];
   unsigned int indexes[num_threads];
   pthread_attr_t attr;

   /* Initialize the mutex */
   pthread_mutex_init(&mutex, NULL);

   /* Initialize the barrier */
   pthread_barrier_init(&barrier, NULL, num_threads + 1);

   /* Create the threads */
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
   unsigned int index;
   for(index = 0; index < num_threads; index++) {
      indexes[index] = index;
      pthread_create(&threads[index], &attr, check_points, (void *) &indexes[index]);
   }
   /* Wait the end of all the threads */
   pthread_barrier_wait(&barrier);

   /* Compute the final result */
   double pi = (4.0 * inside_points) / num_points;
   fprintf(stdout, "Computed pi is %f\n", pi);

   /* Deallocate structures */
   pthread_attr_destroy(&attr);
   pthread_mutex_destroy(&mutex);
   pthread_barrier_destroy(&barrier);
   pthread_exit(NULL);
   return 0;
}

Writing /home/pthread/pi_montecarlo.cpp


Questions:

* How is access to the shared variable solved? <!--mutex-->
* Why is the index argument passed like that? <!--the answer is twofold: the integer is cast to void and back to comply with the general pthread signature, and the id is manually passed because automatically assigned thread ids are not contigous values starting from zero-->
* Why is the barrier needed? <!--to ensure all threads finished and thus reading 'inside_points' from the master thread is safe-->

Compile:

In [None]:
%cd /home/pthread/
!clang pi_montecarlo.cpp -lpthread -o pi

/home/pthread


Execute:

In [None]:
!./pi

Thread 0 - Inside points 4940/6250
Thread 2 - Inside points 4884/6250
Thread 1 - Inside points 4867/6250
Thread 3 - Inside points 4934/6250
Thread 7 - Inside points 4879/6250
Thread 8 - Inside points 4867/6250
Thread 4 - Inside points 4948/6250
Thread 9 - Inside points 4887/6250
Thread 10 - Inside points 4887/6250
Thread 14 - Inside points 4897/6250
Thread 5 - Inside points 4885/6250
Thread 11 - Inside points 4954/6250
Thread 12 - Inside points 4870/6250
Thread 13 - Inside points 4927/6250
Thread 6 - Inside points 4934/6250
Thread 15 - Inside points 4855/6250
Computed pi is 3.136600


## **Pipelines**

### **Exercise 4**

The following program implements a data processing pipeline with four filters; each thread executes one filter and all threads synchronize on a barrier. Note how each thread has to find the correct portion of data in the buffer.

Threads loop forever but synch on a barrier between iterations. There are two copies of each intermediate buffer that get alternated (ping-pong) between iterations.

<img align="middle" src="https://drive.google.com/uc?id=1BGoqfst9LHfvEuS1LYWNQBw8xPVb3SCA">

In [None]:
%%writefile /home/pthread/pipeline.cpp

#define _POSIX_C_SOURCE 200112L

#define NUM_THREADS 4
#define FILTER_SIZE 32
#define WINDOW_SIZE 32
#define BUFFER_SIZE WINDOW_SIZE + FILTER_SIZE-1 + WINDOW_SIZE
#define SAMPLE_RATIO 8
#define INPUT_SIZE 65536*16

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

/* First filter */
float first_filter[FILTER_SIZE];

/* Second filter */
float second_filter[FILTER_SIZE];

/* Buffers between first and second thread */
float even_buffer_1[BUFFER_SIZE], odd_buffer_1[BUFFER_SIZE];

/* Buffers between second and third thread */
float even_buffer_2[(WINDOW_SIZE + WINDOW_SIZE)/4 + 1], odd_buffer_2[(WINDOW_SIZE + WINDOW_SIZE)/4 + 1];

/* Buffers between third and fourth thread */
float even_buffer_3[BUFFER_SIZE], odd_buffer_3[BUFFER_SIZE];

/* Input */
float input[INPUT_SIZE];

/* Output */
float output[WINDOW_SIZE];

/* The pthread barrier used to synchronize the threads */
pthread_barrier_t barr;

/* Apply a finite impulse response filter */
void * convolution_1(void * local_data)
{
   /* The position in the output circular buffers */
   unsigned int output_buffer_position = 0;

   /* The position in the input */
   unsigned int input_position = 0;

   /* The loop iteration counter */
   unsigned int local_count = 0;

   while(1)
   {
      unsigned int index;

      /* Pick the output buffer according to the iteration counter */
      float * out_buffer = (local_count % 2) == 0 ? even_buffer_1 : odd_buffer_1;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE); index++)
      {
         unsigned int coefficient_index = 0;
         for(coefficient_index = 0; coefficient_index < FILTER_SIZE; coefficient_index++)
         {
            out_buffer[output_buffer_position] += first_filter[coefficient_index] * input[local_count * WINDOW_SIZE + index];
         }
         output_buffer_position++;
         if(output_buffer_position == BUFFER_SIZE)
         {
            output_buffer_position = 0;
         }
      }

      local_count++;

      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Apply a finite impulse response filter */
void * convolution_2(void * local_data)
{
   /* The position in the circular buffers */
   int buffer_position = (-3 * WINDOW_SIZE);

   /* The loop iteration counter */
   unsigned int local_count = 0;

   while(1)
   {
      unsigned int index;

      /* Pick the input buffer according to the iteration counter */
      float * in_buff = (local_count % 2) != 0 ? even_buffer_3 : odd_buffer_3;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 3; index++)
      {
         if(buffer_position >= 0)
            output[buffer_position] = 0;
         unsigned int coefficient_index = 0;
         for(coefficient_index = 0; coefficient_index < FILTER_SIZE; coefficient_index++)
         {
            /* Compute real position in circular buffer */
            int real_input_position = buffer_position - coefficient_index;
            if(real_input_position < 0)
            {
               real_input_position += WINDOW_SIZE;
            }
            if(buffer_position >= 0 && real_input_position >= 0)
               output[buffer_position] += second_filter[coefficient_index] * in_buff[real_input_position];
         }
         buffer_position++;
         if(buffer_position == WINDOW_SIZE)
         {
            buffer_position = 0;
         }
      }

      local_count++;

      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Perform the downsample */
void * downsample(void * local_data)
{
   /* Index of the current sample */
   unsigned int sample_counter = 0;

   /* The position in the input circular buffer */
   int input_buffer_position = -1 * WINDOW_SIZE;

   /* The position in the output circular buffer */
   int output_buffer_position = (-1 * WINDOW_SIZE) / SAMPLE_RATIO;

   /* The number of times the following loop was executed */
   unsigned int local_count = 0;

   while(1)
   {
      /* The element in the current window */
      unsigned int index = 0;

      /* Pick the input/output buffers according to the iteration counter */
      float * in_buff = (local_count % 2) != 0 ? even_buffer_1 : odd_buffer_1;
      float * out_buff = (local_count % 2) != 0 ? even_buffer_2 : odd_buffer_2;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 1; index++)
      {
         if(sample_counter % SAMPLE_RATIO == 0)
         {
            sample_counter = 0;
            if(output_buffer_position >= 0 && input_buffer_position >= 0)
               out_buff[output_buffer_position] = in_buff[input_buffer_position];
            output_buffer_position++;
            if(output_buffer_position == (WINDOW_SIZE + WINDOW_SIZE)/4 + 1)
            {
               output_buffer_position = 0;
            }
         }
         sample_counter++;
         input_buffer_position++;
         if(input_buffer_position == BUFFER_SIZE)
         {
            input_buffer_position = 0;
         }
      }

      local_count++;
      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Perform the upsample */
void * upsample(void * local_data)
{
   /* Index of the current sample */
   unsigned int sample_counter = 0;

   /* The position in the input circular buffer */
   int input_buffer_position = (-2 * WINDOW_SIZE) / SAMPLE_RATIO;

   /* The position in the output circular buffer */
   int output_buffer_position = (-2 * WINDOW_SIZE);

   /* The number of times the following loop was executed */
   unsigned int local_count = 0;

   while(1)
   {
      /* The element in the current window */
      unsigned int index = 0;

      /* Pick the input/output buffers according to the iteration counter */
      float * in_buff = (local_count % 2) == 0 ? even_buffer_2 : odd_buffer_2;
      float * out_buff = (local_count % 2) == 0 ? even_buffer_3 : odd_buffer_3;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 2; index++)
      {
         if(sample_counter % SAMPLE_RATIO == 0)
         {
            if(output_buffer_position >= 0 && input_buffer_position >= 0)
               out_buff[output_buffer_position] = in_buff[input_buffer_position];
            sample_counter = 0;
            input_buffer_position++;
            if(input_buffer_position == (WINDOW_SIZE + WINDOW_SIZE)/4 + 1)
            {
               input_buffer_position = 0;
            }
         }
         else
         {
            if(output_buffer_position >= 0)
               out_buff[output_buffer_position] = 0;
         }
         sample_counter++;
         output_buffer_position++;
         if(output_buffer_position == BUFFER_SIZE)
         {
            output_buffer_position = 0;
         }
      }

      local_count++;
      pthread_barrier_wait(&barr);

   }
   return 0;
}

int main(int argc, char * * argv)
{
   unsigned int index;

   /* Generate filters */
   for (index = 0; index < FILTER_SIZE; index++)
   {
      first_filter[index] = index * FILTER_SIZE + index + 1;
      second_filter[index] = index;
   }

   srand(0);
   /* Generate the input */
   for (index = 0; index < INPUT_SIZE; index++)
   {
      input[index] = ((float) rand() / (float) RAND_MAX) * 64.0f;
   }

   /* The current iteration */
   unsigned int current_iteration = 0;

   /* Thread data structure */
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;

   /* Initialize the barrier */
   pthread_barrier_init(&barr, NULL, NUM_THREADS + 1);

   /* Create the threads */
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   pthread_create(&threads[0], &attr, convolution_1, NULL);
   pthread_create(&threads[1], &attr, downsample, NULL);
   pthread_create(&threads[2], &attr, upsample, NULL);
   pthread_create(&threads[3], &attr, convolution_2, NULL);

   /* Counts the loop iterations */
   unsigned int iteration_count = 0;

   /* 3 iterations are added to the loop to allow values to be passed from the first thread to the last */
   while(iteration_count < INPUT_SIZE/WINDOW_SIZE + 3)
   {
      pthread_barrier_wait(&barr);
      iteration_count++;
   }

    for(int count = 0; count < WINDOW_SIZE; count++)
      printf("%f\n", output[count]);

   return 0;
}

Writing /home/pthread/pipeline.cpp


In [None]:
%cd /home/pthread
!clang pipeline.cpp -pthread -o pipeline

/home/pthread


In [None]:
%cd /home/pthread
!./pipeline

/home/pthread
182131851264.000000
193716436992.000000
205301022720.000000
216885608448.000000
135859027968.000000
147443613696.000000
159028199424.000000
170612785152.000000
182197370880.000000
193781956608.000000
205366558720.000000
216951128064.000000
136464654336.000000
148049231872.000000
159633833984.000000
171218419712.000000
182803005440.000000
194387591168.000000
205972176896.000000
217556779008.000000
136295645184.000000
147880214528.000000
159464816640.000000
171049402368.000000
182633988096.000000
194218573824.000000
205803159552.000000
217387761664.000000
228972331008.000000
147378094080.000000
158962679808.000000
170547265536.000000


Questions:

- How can you implement the same program using a mutex? <!--one copy of each intermediate buffer protected by a lock and a flag, if the reading thread gets the lock and the flag is not set, it releases it and tries again, if the writing thread gets the lock, it writes and sets the flag before releasing the lock-->
- How can you implement the same result using a conditional variable? <!--one copy of each intermediate buffer protected by a lock and a conditional variable. Same logic as Exercise 2. If the reading thread gets the lock and the flag is not set, waits on the conditional variable, if the writing thread gets the lock, it writes, sets the flag, signals the conditional variable, and then releases the lock-->