<a href="https://colab.research.google.com/github/AmadeusZhang/LabALGA/blob/main/AACAP_Es11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Posix Threads
Low level management of parallelism in C language: the programmer needs to manually manage threads creation and synchronization

In [None]:
!mkdir /home/pthread
%cd /home/pthread

/home/pthread


## Hello, World!
First of all, let's write a program that concurrently writes the classic test message in a concurrent way

In [None]:
%%writefile /home/pthread/hello_world.c

#include <pthread.h>
#include <stdio.h>

pthread_mutex_t mux;
pthread_cond_t cond;
int actual_condition = 0;

void * printMessage(void * threadIndex) {
  if (*((unsigned int *) threadIndex) == 0) {
    pthread_mutex_lock(&mux);
    printf("Hello, ");
    actual_condition = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mux);
  } else {
    pthread_mutex_lock(&mux);
    while (actual_condition == 0) {
      pthread_cond_wait(&cond, &mux);
    }
    printf("World!\n");
    pthread_mutex_unlock(&mux);
  }
  return 0;
}

int main() {

  /* Thread data structures */
  unsigned int idx[2];
  pthread_t threads[2];
  pthread_attr_t attr;

  /* Initialize the attribute(s) for the threads creation */
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  pthread_mutex_init(&mux, NULL);
  pthread_cond_init(&cond, NULL);

  for (unsigned int ii = 0; ii < 2; ii++) {
    idx[ii] = ii;
    pthread_create(&threads[ii], &attr, printMessage, (void *) &idx[ii]);
  }
  pthread_join(threads[1], NULL);

  /* Deallocate the structures */
  pthread_mutex_destroy(&mux);
  pthread_cond_destroy(&cond);
  pthread_attr_destroy(&attr);

  return 0;
}

compile the source with the proper flags

In [None]:
%cd /home/pthread
!clang hello_world.c -lpthread -o hello_world

and run the obtained binary

In [None]:
%cd /home/pthread
!./hello_world

what happened to our original code?

In [None]:
%cd /home/pthread
!clang hello_world.c -S -emit-llvm -lpthread
!cat hello_world.ll

## Merge Sort

In [None]:
%%writefile /home/pthread/merge_sort.c

#define _POSIX_C_SOURCE 200112L

#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>

#ifdef SILENT
#define printf(...)
#endif

/* Pointer to data array */
int * even_data, * odd_data;

/* Size of data array */
unsigned int array_size = 0;

/* Number of threads */
unsigned int num_threads = 0;

/* Barriers */
pthread_barrier_t barrier;

/* Number of round */
unsigned int current_round = 0;

/* Variable used to store return value of thread 0 */
unsigned int return_thread;

/**
 * Initialize randomly a part of the input data
 * @param temp_index is the index identifying the thread
 */
void * random_initialize(void * temp_index)
{
   /* The index of the current thread */
   unsigned int index = *((unsigned int *) temp_index);
   unsigned int seed = index;

   unsigned int i;
   for(i = index; i < array_size; i+=num_threads)
   {
      odd_data[i] = rand_r(&seed);
   }
   pthread_barrier_wait(&barrier);
   return 0;
}

/**
 * Merge two subarrays already sorted
 * @param input_data is the array containing the two subarrays to be merged
 * @param starting_cell is the index of the first cell of the first subarray
 * @param size is the sum of the sizes of the two arrays
 * @param output_data is where result has to be stored
 */
void bottom_up_merge(int * input_data, int starting_cell, int size, int * output_data)
{
   if(starting_cell > array_size)
      return;

   /*The last position to be written */
   const int last_cell = (starting_cell + size <= array_size) ? starting_cell + size : array_size;

   printf("Sorting cell %d %d\n", starting_cell, last_cell);

   /*The position in the output data to be written */
   int index = starting_cell;

   /*The position in the left part of input data */
   int left_index = starting_cell;

   /*The position in the right part of input data */
   int right_index = starting_cell + size/2;

   /*The last position in the left part to be read*/
   const int last_left = (right_index < last_cell) ? right_index : last_cell;

   for(index = starting_cell; index < last_cell; index++)
   {
      printf("%d %d %d\n", left_index, last_left, input_data[left_index]);
      printf("%d %d %d\n", right_index, last_cell, input_data[right_index]);
      if(left_index < last_left && (right_index >= last_cell || input_data[left_index] <= input_data[right_index]))
      {
         output_data[index] = input_data[left_index];
         left_index++;
      }
      else
      {
         output_data[index] = input_data[right_index];
         right_index++;
      }
      printf("%d\n", output_data[index]);
   }
   printf("\n");
}

/**
 * Sort a subarray
 * @param starting_cell is the index of the first cell of the subarray
 * @param size is the size of the subarray to be sorted
 * @return true if final data are stored in odd_data
 */
bool bottom_up_sort(unsigned int starting_cell, unsigned int size)
{
   printf("Sorting cell %d %d\n", starting_cell, starting_cell + size);
   /*The size of the subsequence to be sorted in the current iteration */
   int width = 0;

   /*The number of the current iteration */
   int iteration = 0;

   for(width = 2; width < size*2; width = width * 2, iteration++)
   {
      /*The index of the subsequence to be considered */
      int sequence = 0;
      for(sequence = 0; sequence < size/width; sequence++)
      {
         /* Even iteration: the result is stored in even_data */
         if(iteration%2 == 0)
         {
            bottom_up_merge(odd_data, starting_cell + sequence * width, width, even_data);
         }
         else
         {
            bottom_up_merge(even_data, starting_cell + sequence * width, width, odd_data);
         }
      }
   }
   return iteration%2 == 0;
}

/**
 * Sort an array
 * @param local_data is the index identifying a thread
 */
void * sort_array(void * local_data)
{
   printf("Start\n");
   /* The index of the current thread */
   const unsigned int thread_index = *((unsigned int *) local_data);

   /* Compute the size of the subarray to be processed */
   unsigned int subarray_size = array_size/num_threads + (array_size%num_threads ? 1 : 0);

   /* Round the size to higher power of two */
   subarray_size--;
   subarray_size |= subarray_size >> 1;
   subarray_size |= subarray_size >> 2;
   subarray_size |= subarray_size >> 4;
   subarray_size |= subarray_size >> 8;
   subarray_size |= subarray_size >> 16;
   subarray_size++;

   /* The starting cell */
   const unsigned int starting_cell = thread_index * subarray_size;
   printf("Thread %d - Sorting cell %d %d\n", thread_index, starting_cell, starting_cell + subarray_size);

   /* Sort the assigned array portion */
   bool odd = bottom_up_sort(starting_cell, subarray_size);

   /* The inverse ratio of active threads */
   int active_ratio = 2;

   while(active_ratio <= num_threads)
   {
      /* Wait all the other threads */
      pthread_barrier_wait(&barrier);

      /* True if this thread has to merge the result with a right part */
      const bool merge = thread_index%active_ratio == 0;
      if(merge)
      {
         if(odd)
            bottom_up_merge(odd_data, starting_cell, subarray_size * active_ratio, even_data);
         else
            bottom_up_merge(even_data, starting_cell, subarray_size * active_ratio, odd_data);
      }
      odd = !odd;
      /* Halve the active threads */
      active_ratio *= 2;

   }
   if(thread_index == 0)
   {
      return_thread = odd;
      printf("Exiting from thread 0\n");
      fflush(stdout);
      pthread_exit(&return_thread);
   }
   return 0;
}

int main(int argc, char ** argv)
{
   if(argc != 3)
   {
      printf("Wrong number of parameters\n");
      return 0;
   }
   array_size = (unsigned int) atoi(argv[1]);
   num_threads = (unsigned int) atoi(argv[2]);
   int* even_local_data = malloc(array_size * sizeof(unsigned int));
   int* odd_local_data = malloc(array_size * sizeof(unsigned int));
   even_data = &even_local_data[0];
   odd_data = &odd_local_data[0];

   /* The return value of a thread */
   void * return_value;

   /* Thread data structure */
   pthread_t threads[num_threads];
   unsigned int indexes[num_threads];
   pthread_attr_t attr;

   /*Initialize the barriers*/
   pthread_barrier_init(&barrier, NULL, num_threads);

   /* Create the threads */
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   unsigned int index;
   for(index = 0; index < num_threads; index++)
   {
      indexes[index] = index;
      pthread_create(&threads[index], &attr, random_initialize, (void *) &indexes[index]);
   }
   /* Wait the end of the first thread */
   pthread_join(threads[0], &return_value);
   printf("Created random input data\n");

   for(index = 0; index < num_threads; index++)
   {
      indexes[index] = index;
      pthread_create(&threads[index], &attr, sort_array, (void *) &indexes[index]);
   }

   /* Wait the end of the first thread */
   pthread_join(threads[0], &return_value);
   const int * output_data = *((int *)(return_value)) == 1 ? odd_data : even_data;

   /* Print the final result */
   for(index = 0; index < array_size; index++)
   {
      printf("%d ", output_data[index]);
   }
   printf("\n");
}

Writing /home/pthread/merge_sort.c


In [None]:
%cd /home/pthread

!clang merge_sort.c -pthread -o merge_sort

/home/pthread


In [None]:
%cd /home/pthread

!clang merge_sort.c -pthread -S -emit-llvm
!cat merge_sort.ll

In [None]:
%cd /home/pthread

!./merge_sort 80 4

# Pipelines in PThreads
This piece of code creates a static number of threads (4) and synchronizes them using a barrier.

The same result could be obtained, for example, using a simple mutex or even better with a condition variable. As an exercise, you can try to modify it in that sense.

Tip: you may want to use pthread_cond_broadcast(), but any other solution is welcome!

In [None]:
%%writefile /home/pthread/pipeline.c

#define _POSIX_C_SOURCE 200112L

#define NUM_THREADS 4
#define FILTER_SIZE 32
#define WINDOW_SIZE 32
#define BUFFER_SIZE WINDOW_SIZE + FILTER_SIZE-1 + WINDOW_SIZE
#define SAMPLE_RATIO 8
#define INPUT_SIZE 65536*16

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

/* First filter */
float first_filter[FILTER_SIZE];

/* Second filter */
float second_filter[FILTER_SIZE];

/* Buffers between first and second thread */
float even_buffer_1[BUFFER_SIZE], odd_buffer_1[BUFFER_SIZE];

/* Buffers between second and third thread */
float even_buffer_2[(WINDOW_SIZE + WINDOW_SIZE)/4 + 1], odd_buffer_2[(WINDOW_SIZE + WINDOW_SIZE)/4 + 1];

/* Buffers between third and fourth thread */
float even_buffer_3[BUFFER_SIZE], odd_buffer_3[BUFFER_SIZE];

/* Input */
float input[INPUT_SIZE];

/* Output */
float output[WINDOW_SIZE];

/* The pthread barrier used to synchronize the threads */
pthread_barrier_t barr;

/* Apply a finite impulse response filter */
void * convolution_1(void * local_data)
{
   /* The position in the output circular buffers */
   unsigned int output_buffer_position = 0;

   /* The position in the input */
   unsigned int input_position = 0;

   /* The loop iteration counter */
   unsigned int local_count = 0;

   while(1)
   {
      unsigned int index;

      /* Pick the output buffer according to the iteration counter */
      float * out_buffer = (local_count % 2) == 0 ? even_buffer_1 : odd_buffer_1;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE); index++)
      {
         unsigned int coefficient_index = 0;
         for(coefficient_index = 0; coefficient_index < FILTER_SIZE; coefficient_index++)
         {
            out_buffer[output_buffer_position] += first_filter[coefficient_index] * input[local_count * WINDOW_SIZE + index];
         }
         output_buffer_position++;
         if(output_buffer_position == BUFFER_SIZE)
         {
            output_buffer_position = 0;
         }
      }

      local_count++;

      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Apply a finite impulse response filter */
void * convolution_2(void * local_data)
{
   /* The position in the circular buffers */
   int buffer_position = (-3 * WINDOW_SIZE);

   /* The loop iteration counter */
   unsigned int local_count = 0;

   while(1)
   {
      unsigned int index;

      /* Pick the input buffer according to the iteration counter */
      float * in_buff = (local_count % 2) != 0 ? even_buffer_3 : odd_buffer_3;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 3; index++)
      {
         if(buffer_position >= 0)
            output[buffer_position] = 0;
         unsigned int coefficient_index = 0;
         for(coefficient_index = 0; coefficient_index < FILTER_SIZE; coefficient_index++)
         {
            /* Compute real position in circular buffer */
            int real_input_position = buffer_position - coefficient_index;
            if(real_input_position < 0)
            {
               real_input_position += WINDOW_SIZE;
            }
            if(buffer_position >= 0 && real_input_position >= 0)
               output[buffer_position] += second_filter[coefficient_index] * in_buff[real_input_position];
         }
         buffer_position++;
         if(buffer_position == WINDOW_SIZE)
         {
            buffer_position = 0;
         }
      }

      local_count++;

      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Perform the downsample */
void * downsample(void * local_data)
{
   /* Index of the current sample */
   unsigned int sample_counter = 0;

   /* The position in the input circular buffer */
   int input_buffer_position = -1 * WINDOW_SIZE;

   /* The position in the output circular buffer */
   int output_buffer_position = (-1 * WINDOW_SIZE) / SAMPLE_RATIO;

   /* The number of times the following loop was executed */
   unsigned int local_count = 0;

   while(1)
   {
      /* The element in the current window */
      unsigned int index = 0;

      /* Pick the input/output buffers according to the iteration counter */
      float * in_buff = (local_count % 2) != 0 ? even_buffer_1 : odd_buffer_1;
      float * out_buff = (local_count % 2) != 0 ? even_buffer_2 : odd_buffer_2;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 1; index++)
      {
         if(sample_counter % SAMPLE_RATIO == 0)
         {
            sample_counter = 0;
            if(output_buffer_position >= 0 && input_buffer_position >= 0)
               out_buff[output_buffer_position] = in_buff[input_buffer_position];
            output_buffer_position++;
            if(output_buffer_position == (WINDOW_SIZE + WINDOW_SIZE)/4 + 1)
            {
               output_buffer_position = 0;
            }
         }
         sample_counter++;
         input_buffer_position++;
         if(input_buffer_position == BUFFER_SIZE)
         {
            input_buffer_position = 0;
         }
      }

      local_count++;
      pthread_barrier_wait(&barr);
   }
   return 0;
}

/* Perform the upsample */
void * upsample(void * local_data)
{
   /* Index of the current sample */
   unsigned int sample_counter = 0;

   /* The position in the input circular buffer */
   int input_buffer_position = (-2 * WINDOW_SIZE) / SAMPLE_RATIO;

   /* The position in the output circular buffer */
   int output_buffer_position = (-2 * WINDOW_SIZE);

   /* The number of times the following loop was executed */
   unsigned int local_count = 0;

   while(1)
   {
      /* The element in the current window */
      unsigned int index = 0;

      /* Pick the input/output buffers according to the iteration counter */
      float * in_buff = (local_count % 2) == 0 ? even_buffer_2 : odd_buffer_2;
      float * out_buff = (local_count % 2) == 0 ? even_buffer_3 : odd_buffer_3;

      for(index = 0; index < WINDOW_SIZE && local_count < (INPUT_SIZE / WINDOW_SIZE) + 2; index++)
      {
         if(sample_counter % SAMPLE_RATIO == 0)
         {
            if(output_buffer_position >= 0 && input_buffer_position >= 0)
               out_buff[output_buffer_position] = in_buff[input_buffer_position];
            sample_counter = 0;
            input_buffer_position++;
            if(input_buffer_position == (WINDOW_SIZE + WINDOW_SIZE)/4 + 1)
            {
               input_buffer_position = 0;
            }
         }
         else
         {
            if(output_buffer_position >= 0)
               out_buff[output_buffer_position] = 0;
         }
         sample_counter++;
         output_buffer_position++;
         if(output_buffer_position == BUFFER_SIZE)
         {
            output_buffer_position = 0;
         }
      }

      local_count++;
      pthread_barrier_wait(&barr);

   }
   return 0;
}

int main(int argc, char * * argv)
{
   unsigned int index;

   /* Generate filters */
   for (index = 0; index < FILTER_SIZE; index++)
   {
      first_filter[index] = index * FILTER_SIZE + index + 1;
      second_filter[index] = index;
   }

   srand(0);
   /* Generate the input */
   for (index = 0; index < INPUT_SIZE; index++)
   {
      input[index] = ((float) rand() / (float) RAND_MAX) * 64.0f;
   }

   /* The current iteration */
   unsigned int current_iteration = 0;

   /* Thread data structure */
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;

   /* Initialize the barrier */
   pthread_barrier_init(&barr, NULL, NUM_THREADS + 1);

   /* Create the threads */
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   pthread_create(&threads[0], &attr, convolution_1, NULL);
   pthread_create(&threads[1], &attr, downsample, NULL);
   pthread_create(&threads[2], &attr, upsample, NULL);
   pthread_create(&threads[3], &attr, convolution_2, NULL);

   /* Counts the loop iterations */
   unsigned int iteration_count = 0;

   /* 3 iterations are added to the loop to allow values to be passed from the first thread to the last */ 
   while(iteration_count < INPUT_SIZE/WINDOW_SIZE + 3)
   {
      pthread_barrier_wait(&barr);
      iteration_count++;
   }
 
    for(int count = 0; count < WINDOW_SIZE; count++)
      printf("%f\n", output[count]);

   return 0;
}

In [None]:
%cd /home/pthread
!clang pipeline.c -pthread -o pipeline

In [None]:
%cd /home/pthread
!./pipeline

In [None]:
%cd /home/pthread
!clang pipeline.c -pthread -S -emit-llvm
!cat pipeline.ll

# Task graphs exercises

Considerato il programma seguente:


*   Disegnare il task graph
*   Dire quali task possono essere eseguiti in parallelo al task 7
*   Trovare i valori minimi e massimi che la printf può stampare



In [None]:
%%writefile /home/pthread/graph_1.c

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>

int data1 = 0;
int data2 = 0;
int data3 = 0;

pthread_t thread1, thread2, thread3, thread4;
pthread_attr_t attr;

/*Other global declarations */

void * return_value;

void * task1(void * input){
    data1 = 2;
    data2 = 3;
    data3 = 1;
    return NULL;
}

void * task6(void * input){
    data1 = data1 + 6;
    return NULL;
}

void * task2(void * input){
    data1= data2 + 2;
    pthread_create(&thread3, &attr, task6, NULL);
    return NULL;
}
void * task3(void * input){
    data2 = data2 + 3;
    return NULL;
}
void * task4(void * input){
    data3 = data1 + 4;
    return NULL;
}
void * task5(void * input){
    data2 = data2 + 5;
    return NULL;
}

void * task7(void * input){
    pthread_join(thread2, &return_value);
    data3 = data2 + data3;
    return NULL;
}

int main(int argc, char ** argv)
{
 /* Declaration of data structure and initialization */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    task1(NULL);
    pthread_create(&thread1, &attr, task2, NULL);
    pthread_create(&thread2, &attr, task4, NULL);
    task3(NULL);
    pthread_create(&thread4, &attr, task7, NULL);
    pthread_join(thread3, &return_value);
    pthread_join(thread4, &return_value);
    task5(NULL);
    printf("%d %d %d\n", data1, data2, data3);
    return 0;
}

Overwriting /home/pthread/graph_1.c


In [None]:
%cd /home/pthread
!clang graph_1.c -pthread -o graph_1

/home/pthread


In [None]:
%cd /home/pthread
!./graph_1

Considerato il programma seguente:


*   Disegnare il task graph
*   Dire quali task possono essere eseguiti in parallelo al task 6
*   Trovare i valori minimi e massimi che la printf può stampare

In [None]:
%%writefile /home/pthread/graph_2.c

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>

int data1 = 0;
int data2 = 0;
int data3 = 0;

pthread_t thread1, thread2, thread3, thread4;
pthread_attr_t attr;

/*Other global declarations */

void * return_value;

void * task1(void * input){
    data1 = 0;
    data2 = 2;
    data3 = 4;
    return NULL;
}

void * task6(void * input){
    data3 = data1 + 1;
    return NULL;
}

void * task5(void * input){
    data1 = data3 + 1;
    return NULL;
}

void * task4(void * input){
    data3 = data2 + 4;
    pthread_create(&thread3, &attr, task5, NULL);
    pthread_create(&thread4, &attr, task6, NULL);
    return NULL;
}

void * task2(void * input){
    data2 = data2 + 1;
    return NULL;
}
void * task3(void * input){
    data2 = data2 + 3;
    pthread_join(thread1, NULL);
    task4(NULL);
    return NULL;
}

void * task7(void * input){
    data3 = data2 + data3;
    return NULL;
}

int main(int argc, char ** argv)
{
 /* Declaration of data structure and initialization */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    task1(NULL);
    pthread_create(&thread1, &attr, task2, NULL);
    task3(NULL);
    pthread_join(thread4, NULL);
    task7(NULL);
    pthread_join(thread3, &return_value);
    printf("%d %d %d\n", data1, data2, data3);
    return 0;
}

Overwriting /home/pthread/graph_2.c


In [None]:
%cd /home/pthread
!clang graph_2.c -pthread -o graph_2

/home/pthread


In [None]:
%cd /home/pthread
!./graph_2

/home/pthread
8 6 7
