# Measuring pipeline performance

## Queue

In [1]:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <vector>
#include <chrono>
#include <cstddef>
#include <math.h>
#include <string>
#include <functional>

The queue code itself. It is defined as a template such that we can defined queues with different item types:

In [2]:
template <typename T>
class myqueue
{
private:
  std::mutex              d_mutex;
  std::condition_variable d_condition;
  std::deque<T>           d_queue;
public:

  myqueue(std::string s) { std::cout << "Created " << s << " queue " << std::endl;  }
  myqueue() {}
  
  void push(T const& value) {
    {
      std::unique_lock<std::mutex> lock(d_mutex);
      d_queue.push_front(value);
    }
    this->d_condition.notify_one();
  }
  
  T pop() {
    std::unique_lock<std::mutex> lock(d_mutex);
    d_condition.wait(lock, [=]{ return !d_queue.empty(); });  // wait if the queue is currently empty
    T rc(std::move(this->d_queue.back()));       // used to get the actual message rather than a copy
    d_queue.pop_back();                   // remove the item just read from the back end of the queue
    return rc;
  }
};


Finally we need something to denote the end of a stream, modelled with the queue. We will use queues of positive integeres, therefore let's go for a negative number

In [3]:
#define EOS -1

## Drain stage definition

This is the function to read and print something from the queue hosting pointers to integers. It will be used as thread body of the last pipeline stage.
We will use a sleep to slow down queue pushes.

In [4]:
#include <chrono>
#include <thread>
using namespace std::chrono_literals;

Second parameter is the delay simulated while processing an item (in milliseconds):

In [5]:
void drain(myqueue<int> &q, std::chrono::milliseconds msecs) {
  std::cout << "Drain started" << std::endl;
  auto e = q.pop();
  
  while(e != EOS) {
    std::this_thread::sleep_for(msecs);
    std::cout << "received " << e << std::endl;
    e = q.pop();
  }
    
  return;
}

## Source stage definition

In [6]:
void source(myqueue<int> &q, int n, std::chrono::milliseconds msecs) {
    for(int i=0; i<n; i++){
        std::this_thread::sleep_for(msecs);
        q.push(i);
        std::cout << "Drain emitted " << i << std::endl; 
    }
    q.push(EOS);
    std::cout << "sent EOS" << std::endl;
    return;
}

## Generic stage definition (computes int -> int function)

In [7]:
void genericstage(myqueue<int> &inq, myqueue<int> &outq, std::chrono::milliseconds msecs) {
  auto e = inq.pop();
  
  while(e != EOS) {
    std::this_thread::sleep_for(msecs);
    auto res = e+1;
    outq.push(res);
    e = inq.pop();
  }
  outq.push(EOS);
  return;
}

## Queue declarations

In [8]:
myqueue<int> source2one, one2drain;

## Pipeline set up

We will implement a
* pipeline(source, genericstage(inc), drain) 

In [9]:
std::thread s1(source, std::ref(source2one), 10, 1000ms);

Drain emitted 0
Drain emitted 1


In [10]:
auto inc = [](int x) { return(x+1);};

Drain emitted 2


In [11]:
std::thread s2(genericstage, std::ref(source2one), std::ref(one2drain), 1000ms);

Drain emitted 3


In [12]:
std::thread s3(drain, std::ref(one2drain), 1000ms);

Drain started
Drain emitted 4
received 1
Drain emitted 5
received 2
Drain emitted 6
received 3
Drain emitted 7
received 4
Drain emitted 8
received 5
Drain emitted 9
sent EOS


Now runnning as expected

In [13]:
s1.join();

received 6


In [14]:
s2.join();

received 7
received 8


In [15]:
s3.join();

received 9
received 10


If you come here, everything is finished

# Exercise

Use combinations of generic stage to set up a two stage pipeline:
* Vary the amount of delay given to the stages (source, generic and drain) 
* Measure the time spent (from start to end) and 
* figure out if the formula given for completion time is exact. 

Feel free to use the right amount and type of queues. 

Suggestion: 
* use the following code to measure time, as seen during the lesson
* include in the utimer scope both thread start and thread join operation(s)

In [16]:
#include <iostream>
#include <chrono>

class utimer {
  std::chrono::system_clock::time_point start;
  std::chrono::system_clock::time_point stop;
  std::string message; 
  using usecs = std::chrono::microseconds;
  using msecs = std::chrono::milliseconds;

private:
  long * us_elapsed;
  
public:

  utimer(const std::string m) : message(m),us_elapsed((long *)NULL) {
    start = std::chrono::system_clock::now();
  }
    
  utimer(const std::string m, long * us) : message(m),us_elapsed(us) {
    start = std::chrono::system_clock::now();
  }

  ~utimer() {
    stop =
      std::chrono::system_clock::now();
    std::chrono::duration<double> elapsed =
      stop - start;
    auto musec =
      std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
    
    std::cout << message << " computed in " << musec << " usec " << std::endl;
    if(us_elapsed != NULL)
      (*us_elapsed) = musec;
  }
};


As an example, to see how much time a source uses to send his data items to the output queue: 

In [17]:
myqueue<int> qq;
{
    utimer t1("time send");
    std::thread tid(source, std::ref(qq), 5, 1000ms);
    tid.join();
}

Drain emitted 0
Drain emitted 1
Drain emitted 2
Drain emitted 3
Drain emitted 4
sent EOS
time send computed in 5001761 usec 
