From 9f3a2a81649abba0553d0920710335f7d2268d0f Mon Sep 17 00:00:00 2001 From: CaptGreg Date: Sun, 3 Apr 2016 13:59:40 -0400 Subject: [PATCH] rework stringstream+string thread code, add thread bekery algorithm --- binarysearch.cpp | 55 +++-- sorts.cpp | 123 ++++++++++- thread-bakery.cpp | 490 ++++++++++++++++++++++++++++++++++++++++++++ thread-conqueue.cpp | 125 +++++++++++ thread-poison.cpp | 126 ++++++++++++ threadsafeq.cpp | 30 +-- threadsafequeue.cpp | 304 ++++++++++++++++++++++++++- unique_ptr-vec.cpp | 163 +++++++++++++++ weak-ptr-sutter.cpp | 79 +++++++ 9 files changed, 1446 insertions(+), 49 deletions(-) create mode 100644 thread-bakery.cpp create mode 100644 thread-conqueue.cpp create mode 100644 thread-poison.cpp create mode 100644 unique_ptr-vec.cpp create mode 100644 weak-ptr-sutter.cpp diff --git a/binarysearch.cpp b/binarysearch.cpp index 10f29fa..cf9cde5 100644 --- a/binarysearch.cpp +++ b/binarysearch.cpp @@ -1,26 +1,45 @@ // http://en.wikipedia.org/wiki/Binary_search_algorithm const int KEY_NOT_FOUND = -1; -int binary_search(int A[], int key, int imin, int imax) +int binary_search(int data[], int key, int min, int max) { - // continue searching while [imin,imax] is not empty - while (imax >= imin) - { - // calculate the midpoint for roughly equal partition - auto midpoint = [] (int imin, int imax) { return (imin + imax) / 2; }; - int imid = midpoint(imin, imax); - if(A[imid] == key) - // key found at index imid - return imid; - // determine which subarray to search - else if (A[imid] < key) - // change min index to search upper subarray - imin = imid + 1; - else - // change max index to search lower subarray - imax = imid - 1; + // continue searching while [min,max] is not empty + while (max >= min) { + int mid = (min + max) / 2; // calculate the midpoint for roughly equal partition + if(data[mid] == key) // key found at index mid + return mid; + // determine which half of the subarray to search + else if (data[mid] < key) + min = mid + 1; // change min index to search upper subdataay + else + max = mid - 1; // change max index to search lower subdataay + } + return KEY_NOT_FOUND; // key was not found +} + +// or equivalently: +// http://www.algolist.net/Algorithms/Binary_search +/* +* searches for a value in sorted array +* data is an array to search in +* value is searched value +* low is an index of low boundary +* high is an index of high boundary +* returns position of searched value, if it presents in the dataay +* or -1, if it is absent +*/ + +int binarySearch(int data[], int value, int low, int high) +{ + while (low <= high) { + int middle = (low + high) / 2; + if (data[middle] == value) + return middle; + else if (data[middle] > value) + high = middle - 1; + else + low = middle + 1; } - // key was not found return KEY_NOT_FOUND; } diff --git a/sorts.cpp b/sorts.cpp index de958f1..2fe7931 100644 --- a/sorts.cpp +++ b/sorts.cpp @@ -1,3 +1,9 @@ +// best reference: +// https://en.wikibooks.org/wiki/Special:Search/Algorithm_Implementation/Sorting/ + +// some sorts are also in see Numerical Recipes in C + +// GB original code hacked from // http://stackoverflow.com/questions/244252/a-good-reference-card-cheat-sheet-with-the-basic-sort-algorithms-in-c // GB ported C-code to C++ function templates, new/delete @@ -5,6 +11,7 @@ #include // memcpy #include +#include // for std::make_heap, std::sort_heap using namespace std; @@ -17,8 +24,70 @@ static void swap(T* a, T* b) { } } +template +void heap_sort(Iterator begin, Iterator end) +{ + std::make_heap(begin, end); + std::sort_heap(begin, end); +} + +template // function template +void heapsortCPP(T* a, int l) +{ + heap_sort(a, a+l); +} + +template // function template +void heapsortC(T arr[], unsigned int N) +{ + // https://en.wikibooks.org/wiki/Algorithm_Implementation/Sorting/Heapsort#C + // This is a fast implementation of heapsort in C, adapted from Numerical Recipes in C + // but designed to be slightly more readable and to index from 0. + + if(N==0) // check if heap is empty + return; + + T t; /* the temporary value */ + unsigned int n = N, parent = N/2, index, child; /* heap indexes */ + /* loop until array is sorted */ + while (1) { + if (parent > 0) { + /* first stage - Sorting the heap */ + t = arr[--parent]; /* save old value to t */ + } else { + /* second stage - Extracting elements in-place */ + n--; /* make the heap smaller */ + if (n == 0) { + return; /* When the heap is empty, we are done */ + } + t = arr[n]; /* save lost heap entry to temporary */ + arr[n] = arr[0]; /* save root entry beyond heap */ + } + /* insert operation - pushing t down the heap to replace the parent */ + index = parent; /* start at the parent index */ + child = index * 2 + 1; /* get its left child index */ + while (child < n) { + /* choose the largest child */ + if (child + 1 < n && arr[child + 1] > arr[child]) { + child++; /* right child exists and is bigger */ + } + /* is the largest child larger than the entry? */ + if (arr[child] > t) { + arr[index] = arr[child]; /* overwrite entry with child */ + index = child; /* move index to the child */ + child = index * 2 + 1; /* get the left child and go around again */ + } else { + break; /* t's place is found */ + } + } + /* store the temporary value at its new location */ + arr[index] = t; + } +} + template // function template -void bubblesort(T* a, int l) { +void bubblesort(T* a, int l) +{ int i, j; // GB cannot be unsigned (or size_t) for (i = l - 2; i >= 0; i--) // GB NOTE >=0 terminate condition @@ -27,7 +96,8 @@ void bubblesort(T* a, int l) { } template // function template -void selectionsort(T* a, size_t l) { +void selectionsort(T* a, size_t l) +{ size_t i, j, k; for (i = 0; i < l; i++) { for (j = (k = i) + 1; j < l; j++) { @@ -39,7 +109,8 @@ void selectionsort(T* a, size_t l) { } template // function template -static void hsort_helper(T* a, int i, int l) { +static void hsort_helper(T* a, int i, int l) +{ int j; for (j = 2 * i + 1; j < l; i = j, j = 2 * j + 1) { @@ -58,7 +129,8 @@ static void hsort_helper(T* a, int i, int l) { } template // function template -void heapsort(T* a, int l) { +void heapsort(T* a, int l) +{ // see Numerical Recipes in C int i; for (i = (l - 2) / 2; i >= 0; i--) @@ -71,7 +143,8 @@ void heapsort(T* a, int l) { } template // function template -static void msort_helper(T* a, T* b, size_t l) { +static void msort_helper(T* a, T* b, size_t l) +{ size_t i, j, k, m; switch (l) { @@ -89,7 +162,8 @@ static void msort_helper(T* a, T* b, size_t l) { } template // function template -void mergesort(T* a, size_t l) { +void mergesort(T* a, size_t l) +{ T *b; if (l < 0) @@ -104,7 +178,8 @@ void mergesort(T* a, size_t l) { } template // function template -static int pivot(T* a, size_t l) { +static int pivot(T* a, size_t l) +{ size_t i, j; for (i = j = 1; i < l; i++) @@ -117,7 +192,8 @@ static int pivot(T* a, size_t l) { } template // function template -void quicksort(T* a, size_t l) { +void quicksort(T* a, size_t l) +{ if (l <= 1) return; @@ -127,7 +203,8 @@ void quicksort(T* a, size_t l) { } template // function template -void btreesort(T* a, size_t l) { +void btreesort(T* a, size_t l) +{ size_t i; struct node { T value; @@ -187,6 +264,22 @@ void shellsort(T a[], const int size)// Nothing to do with shells. Named after } } +template // function template +void insertionsort(T a[], const int length) +// https://en.wikibooks.org/wiki/Algorithm_Implementation/Sorting/Insertion_sort +{ + int i, j; + T value; + + for(i = 1 ; i < length ; i++) + { + value = a[i]; + for (j = i - 1; j >= 0 && a[j] > value; j--) + a[j + 1] = a[j]; + a[j + 1] = value; + } +} + int main(int argc, char**argv) { int a[] = {42,19,2,66,1,33,8,5,19}; @@ -207,6 +300,14 @@ int main(int argc, char**argv) heapsort(a, l); cout << "heapsort "; for(auto e: a) cout << e << " "; cout << "\n"; + memcpy(a,b,sizeof(a)); + heapsortC(a, l); + cout << "heapsortC "; for(auto e: a) cout << e << " "; cout << "\n"; + + memcpy(a,b,sizeof(a)); + heapsortCPP(a, l); + cout << "heapsortCPP "; for(auto e: a) cout << e << " "; cout << "\n"; + memcpy(a,b,sizeof(a)); mergesort(a, l); cout << "mergesort "; for(auto e: a) cout << e << " "; cout << "\n"; @@ -222,4 +323,8 @@ int main(int argc, char**argv) memcpy(a,b,sizeof(a)); shellsort(a, l); cout << "shellsort "; for(auto e: a) cout << e << " "; cout << "\n"; + + memcpy(a,b,sizeof(a)); + insertionsort(a, l); + cout << "insertionsort "; for(auto e: a) cout << e << " "; cout << "\n"; } diff --git a/thread-bakery.cpp b/thread-bakery.cpp new file mode 100644 index 0000000..6340dcc --- /dev/null +++ b/thread-bakery.cpp @@ -0,0 +1,490 @@ +// https://matthewarcus.wordpress.com/2014/06/10/at-the-bakery/ + +// GB March 27, 2016 convert to C++11 threads +// GB see +// https://en.wikipedia.org/wiki/Lamport's_bakery_algorithm +// http://research.microsoft.com/en-us/um/people/lamport/pubs/pubs.html#bakery +// http://research.microsoft.com/en-us/um/people/lamport/pubs/bakery.pdf +// http://mykeepit.blogspot.ca/2013/11/study-and-implement-lamports-bakery.html +// http://mykeepit.blogspot.ca/2013/11/study-and-implement-lamports-bakery.html +// (Study and implement the Lamport’s Bakery Algorithm for Interprocess synchronization using C/C++ programming language) +// + +#include +class Timer { // use C++11 std::chrono features to create a stop-watch timer class + std::chrono::time_point start; + std::chrono::time_point stop; +public: + Timer() {} + void Start() { start = std::chrono::high_resolution_clock::now(); } + void Stop () { stop = std::chrono::high_resolution_clock::now(); } + // basic form to calculate time differences, illustrate with microseconds + uint64_t usecs() { + typedef std::chrono::duration microsecs_t ; + microsecs_t duration_get( std::chrono::duration_cast(stop-start) ) ; + uint64_t us = duration_get.count(); + return us; + } + // Now use a macro to return milli, micro, and nano seconds + #define RET(UNITS) uint64_t UNITS##secs() { \ + typedef std::chrono::duration UNITS##secs_t ; \ + UNITS##secs_t duration_get( std::chrono::duration_cast(stop-start) ) ; \ + uint64_t us = duration_get.count(); \ + return us; \ + } + RET(milli) // creates member function 'uint64_t millisecs()' - which returns 'stop-start' in millisecs + RET(micro) // creates member function 'uint64_t microsecs()' - which returns 'stop-start' in microsecs + RET(nano) // creates member function 'uint64_t nanosecs()' - which returns 'stop-start' in nanosecs +}; + + +// #define NUMBER1 +// #define NUMBER2 +#define NUMBER3 + +#ifdef NUMBER1 +#include // GB C++11 +#include // GB C++11 +#include +#include +#include +#include +#include +#include + +// Compile with: g++ -Wall -O3 bakery.cpp -pthread -o bakery + +static const int NTHREADS = 6; // GB On the 6-core AMD1100T processor, doesn't print if NTHREADS > 6 +volatile bool exitFlag = false; // GB shutdown threads + +// Some features to play with +// GB choose one of USE_PTHREAD or USE_CPP11THREAD +// #define USE_PTHREAD // GB +#define USE_CPP11THREAD // GB + +//#define NCHECK // Disable crucial check +//#define NSYNC // Disable memory barrier +//#define NVOLATILE // Disable volatile + +#if defined NVOLATILE +#define VOLATILE +#else +#define VOLATILE volatile +#endif + +VOLATILE bool entering[NTHREADS]; +VOLATILE unsigned number [NTHREADS]; +VOLATILE int count = 0; +VOLATILE int total = 0; + +unsigned getmax(int n) +{ + unsigned max = 0; + for (int i = 0; i < n; i++) { + if (number[i] > max) max = number[i]; + } + return max; +} + +bool check(int i, int j) +{ + return number[j] < number[i] || + (number[j] == number[i] && j < i); +} + +inline void synchronize() +{ +#if !defined NSYNC + // gcc builtin full memory barrier + __sync_synchronize(); +#endif +} + +void lock(int i) { + entering[i] = true; + synchronize(); + // Simulate non-atomic write + number[i] = rand(); // GB, mutex is about 2 x as fast, + // number[i] = 1; // rand(); // GB, mutex is about 2 x as fast, + // eliminating the rand() call dropped the wait time from 4100-4800 ns to 3600 ns + // mutex is 2300 ns + + synchronize(); + number[i] = 1 + getmax(NTHREADS); + assert(number[i] > 0); + entering[i] = false; + synchronize(); + for (int j = 0; j < NTHREADS; j++) { + // Wait until thread j receives its number: +#if !defined NCHECK + while (entering[j]) { /* nothing */ } +#endif + // At this point, we have read a false value for + // "entering[j]", therefore any number picked by j + // later will takes our choice into account, any value + // chosen earlier (and so might be less than ours) + // will be visible to us in the following test. + + // Wait until all threads with smaller numbers or with + // the same number, but with higher priority, finish + // their work: + while ((number[j] != 0) && check(i,j)) { /* nothing */ } + } +} + +void unlock(int i) +{ + number[i] = 0; +} + +void *threadfun(void *arg) +{ + Timer t; + uint64_t nanosecs =0; + uint64_t calls =0; + int i = *(int*)(arg); + while (true) { + if(exitFlag) break; + t.Start(); + lock(i); + t.Stop(); + nanosecs+= t.nanosecs(); + calls++; + total++; // global + if (total % 1000000 == 0) fprintf(stderr,"%c", 'a'+i); + assert(count==0); // Check we have exclusive access + count++; // global + // It's not clear that these syncs are unnecessary, + // but nothing seems to break if I remove them. + //synchronize(); + count--; // global + //synchronize(); + unlock(i); + // non-critical section... + } +t.Start(); + lock(i); +t.Stop(); + nanosecs+= t.nanosecs(); + calls++; + std::cout << "thread " << i + << " " << nanosecs << " nsec waiting" + << ", " << calls << " calls" + << ", " << nanosecs / calls << " nsec/call" + << "\n"; + unlock(i); + return NULL; +} + +std::mutex mtx; +void *threadfunMutex(void *arg) +{ + Timer t; + uint64_t nanosecs =0; + uint64_t calls =0; + int i = *(int*)(arg); + while (true) { + if(exitFlag) break; + t.Start(); + mtx.lock(); + t.Stop(); + nanosecs+= t.nanosecs(); + calls++; + total++; // global + if (total % 1000000 == 0) fprintf(stderr,"%c", 'a'+i); + assert(count==0); // Check we have exclusive access + count++; // global + // It's not clear that these syncs are unnecessary, + // but nothing seems to break if I remove them. + //synchronize(); + count--; // global + //synchronize(); + mtx.unlock(); + // non-critical section... + } +t.Start(); + mtx.lock(); +t.Stop(); + nanosecs+= t.nanosecs(); + calls++; + std::cout << "thread " << i + << " " << nanosecs << " nsec waiting" + << ", " << calls << " calls" + << ", " << nanosecs / calls << " nsec/call" + << "\n"; + mtx.unlock(); + return NULL; +} + + +#ifdef USE_PTHREAD +int main() +{ + pthread_t t[NTHREADS]; + int n[NTHREADS]; + for (int i = 0; i < NTHREADS; i++) { + n[i] = i; + pthread_create(&t[i], NULL, threadfun, (void*)&n[i]); + } + usleep(60*1000*1000); + exitFlag= true; + for (int i = 0; i < NTHREADS; i++) { + pthread_join(t[i], NULL); + } +} +#endif + +#ifdef USE_CPP11THREAD +int main() +{ + std::thread t[NTHREADS]; + int n[NTHREADS]; + for (int i = 0; i < NTHREADS; i++) { + n[i] = i; + t[i] = std::thread(threadfun, (void*)&n[i]); + } + usleep(60*1000*1000); + exitFlag= true; + std::cout << "\n"; + for (int i = 0; i < NTHREADS; i++) { + t[i].join(); + } + + // See what has less overhead: bakery or mutex + std::cout << "\n"; + exitFlag= false; + for (int i = 0; i < NTHREADS; i++) { + n[i] = i; + t[i] = std::thread(threadfunMutex, (void*)&n[i]); + } + usleep(60*1000*1000); + exitFlag= true; + std::cout << "\n"; + for (int i = 0; i < NTHREADS; i++) { + t[i].join(); + } + +} +#endif + +#endif +#ifdef NUMBER2 +// http://mykeepit.blogspot.ca/2013/11/study-and-implement-lamports-bakery.html +// (Study and implement the Lamport’s Bakery Algorithm for Interprocess synchronization using C/C++ programming language) +#include // GB +#include // GB +#include // GB C++11 threads +#include // GB for kill thread +#include // GB for kill thread (SIGTERM) +#include +#include +#include +volatile int NUM_THREADS = 10; +volatile int Number[11] = {0}; // GB bump to 11, indexing is base 0? looks like it might be base 1 in places +volatile int count_cs[11] = {0}; +volatile int Entering[11] = {0}; + +volatile bool exitFlag = false; + +int Max() +{ + int i = 0; + // int j = 0; + int maxvalue = 0; + for(i = 0; i < 10; i++) { + if ((Number[i]) > maxvalue) { + maxvalue = Number[i]; + } + } + return maxvalue; +} +void Lock(int i) +{ + Entering[i] = 1; + Number[i] = 1 + Max(); + Entering[i] = 0; + for (int j = 1; j <= NUM_THREADS; j++) { + while (Entering[j]) { } /* Do nothing */ + while ((Number[j] != 0) + && (Number[j] < Number[i] || + (Number[j] == Number[i] && j < i))) + { } + } +} +void Unlock(int i) { + Number[i] = 0; +} +void Thread(void* voidPtr) +{ + int i = *(int*) voidPtr; + while (1) { + if(exitFlag) break; + Lock(i); + count_cs[i+1] = count_cs[i+1] + 1 ; + //printf("critical section of %d\n", i+1); + Unlock(i); + } +} +int main() +{ + try { + int duration = 5000; + std::thread threads[NUM_THREADS]; + for(int t = 0; t < NUM_THREADS; t++){ + printf("In main: creating thread %d\n", t+1); + threads[t] = std::thread(Thread, (void*)&t); + } + usleep(duration*1000); + for(int t=0; t < NUM_THREADS; t++) { + printf("count of thread no %d is %d\n",t+1,count_cs[t+1]); + } + + usleep(duration*1000); + for(int t=0; t < NUM_THREADS; t++) { + printf("count of thread no %d is %d\n",t+1,count_cs[t+1]); + } + + exitFlag = true; + std::cout << " GB waiting for threads to exit\n"; + usleep(1000*1000); + std::cout << " GB kill threads\n"; + for(int t=0; t < NUM_THREADS; t++) + pthread_kill(threads[t].native_handle(), SIGTERM); + + std::cout << " GB join threads\n"; + for(int t=0; t < NUM_THREADS; t++) + threads[t].join(); + + } catch(const std::exception& e) { + std::cerr << e.what() << "\n"; + } + std::cout << " GB main finished.\n"; + return 0; +} +#endif + +#ifdef NUMBER3 +// http://www.csee.wvu.edu/~jdm/classes/cs550/notes/tech/mutex/Bakery.html +// A Novel N-Process Solution: Lamport's Bakery Algorithm +// The bakery algorithm is a very different approach proposed by Leslie Lamport. It's based on the "take-a-number" system used in bakeries and delicatessens. + +// CONCEPT: A process waiting to enter its critical section chooses a number. This number must be greater than all other numbers currently in use. There is a global shared array of current numbers for each process. The entering process checks all other processes sequentially, and waits for each one which has a lower number. Ties are possible; these are resolved using process IDs. + +#include +#include +#include +#include // usleep +#include // sigterm +#include // strsignal +#include + +const int NUM_THREADS = 4; +volatile bool choosing[NUM_THREADS]; +volatile int num[NUM_THREADS]; + +void lockInit() +{ + for (int j=0; j < NUM_THREADS; j++) { + num[j] = 0; + } +} + +void lock(int i) +{ + /* choose a number */ + choosing[i] = true; + // num[i] = max(num[0], ..., num[NUM_THREADS-1]) + 1; + int max = num[0]; + for(int i = 1; i < NUM_THREADS ; i++) if(num[i] > max) max = num[i]; + num[i] = max+1; + + choosing[i] = false; + + /* for all other processes */ + for (int j=0; j < NUM_THREADS; j++) { + + /* wait if the process is currently choosing */ + while (choosing[j]) + {std::this_thread::yield();} + + /* wait if the process has a number and comes ahead of us */ + if ((num[j] > 0) && + ((num[j] < num[i]) || + (num[j] == num[i]) && (j < i)) + ) { + while (num[j] > 0) + {std::this_thread::yield();} + } + } +} + +void unlock(int i) +{ + num[i] = 0; /* clear our number */ +} + +volatile int count_cs[11] = {0}; +volatile bool exitFlag = false; + +std::mutex m; +void Thread(void* voidPtr) +{ + int i = *(int*) voidPtr; + + // 1. capture 'i' as [i], [&], or [=] --- no error on capture + // 2. using 'i' generates syntax error + signal(SIGTERM, [&] (int sig) {m.lock();std::cout << "thread " << ": sig=" << sig << " " << strsignal(sig) << "\n";m.unlock(); exit(sig);}); + // signal(SIGTERM, [&] (int sig) {std::cout << "thread " << i << ": sig=" << sig << " (SIGTERM)\n"; exit(sig);}); + + // signal(SIGTERM, [&] (int sig) { exit(sig); } ); + + while (1) { + if(exitFlag) break; + lock(i); + count_cs[i+1] = count_cs[i+1] + 1 ; + // printf("critical section of %d\n", i+1); + unlock(i); + } +lock(i); + std::cout << "thread " << i << " exiting.\n"; +unlock(i); +} + +int main(int argc,char*argv[]) +{ + try { + int duration = 5000; + std::thread threads[NUM_THREADS]; + lockInit(); + for(int t = 0; t < NUM_THREADS; t++){ + printf("In main: creating thread %d\n", t+1); + threads[t] = std::thread(Thread, (void*)&t); + } + usleep(duration*1000); + for(int t=0; t < NUM_THREADS; t++) { + printf("count of thread no %d is %d\n",t+1,count_cs[t+1]); + } + + usleep(duration*1000); + for(int t=0; t < NUM_THREADS; t++) { + printf("count of thread no %d is %d\n",t+1,count_cs[t+1]); + } + + exitFlag = true; + std::cout << " GB waiting for threads to exit\n"; + usleep(1000*1000); + std::cout << " GB kill threads\n"; + for(int t=0; t < NUM_THREADS; t++) + pthread_kill(threads[t].native_handle(), SIGTERM); + + std::cout << " GB join threads\n"; + for(int t=0; t < NUM_THREADS; t++) + threads[t].join(); + + } catch(const std::exception& e) { + std::cerr << e.what() << "\n"; + } + std::cout << " GB main finished.\n"; + return 0; +} +#endif diff --git a/thread-conqueue.cpp b/thread-conqueue.cpp new file mode 100644 index 0000000..1c36e54 --- /dev/null +++ b/thread-conqueue.cpp @@ -0,0 +1,125 @@ +// https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html + +// GB 2016-04-03 +// 1. Boost code ported to C++11 +// 2. add size() function +// 3. add emplace() function +// 4. test program + +#include +#include +#include +#include +#include + +template +class concurrent_queue +{ +private: + std::queue q; + mutable std::mutex m; // error if not mutable + // error: binding ‘const std::mutex’ to reference of type ‘std::lock_guard::mutex_type& {aka std::mutex&}’ discards qualifiers + std::condition_variable cv; +public: + void push(T const& data) + { + // either + m.lock(); + q.push(data); + m.unlock(); + + // or if there is chance q.push might throw + // std::unique_lock lck(m); + // q.push(data); + // lck.unlock(); + + cv.notify_one(); + } + + void emplace(const T&& data) + { + m.lock(); + q.emplace(std::move(data)); + m.unlock(); + + cv.notify_one(); + } + + size_t size() const // GB + { + std::lock_guard lck(m); + return q.size(); + } + + bool empty() const + { + std::lock_guard lck(m); // GB added lock + return q.empty(); + } + + bool try_pop(T& popped_value) + { + std::lock_guard lck(m); + if(q.empty()) + return false; + + popped_value=q.front(); + q.pop(); + return true; + } + + void wait_and_pop(T& popped_value) + { + std::unique_lock lck(m); + while(q.empty()) + cv.wait(lck); + + popped_value=q.front(); + q.pop(); + } +}; + +int main(int argc, char**argv) // GB test concurrent_queue +{ + concurrent_queue cq; + + int NUM_PROCS = std::thread::hardware_concurrency(); + + int size = 100; + std::thread tid[size]; + for(int i = 0 ; i < size ; i++) + tid[i] = std::thread([&cq] (int i) { cq.push(double(i)); }, i); + + for(int i = 0 ; i < size ; i++) + tid[i].join(); + + std::cout << "cq.size() = " << cq.size() << "\n"; + + std::mutex coutMutex; + for(int t = 0 ; t < NUM_PROCS ; t++) + tid[t] = std::thread( + [&cq,&coutMutex] (int t) + { + uint64_t count = 0; + uint64_t busy_count = 0; + double d; + while(true) { + if(cq.try_pop(d)) { + count++; + std::this_thread::yield(); // give cpu away + } else { + busy_count++; + } + if(cq.empty()) break; + } + std::lock_guard lck(coutMutex); + std::cout << "thread " << t << " popped " << count << " values, busy " << busy_count << " times, remaining q size " << cq.size() << ".\n"; + }, t); + + for(int t = 0 ; t < NUM_PROCS ; t++) + tid[t].join(); + + std::cout << "cq.size() = " << cq.size() << "\n"; + + return 0; +} diff --git a/thread-poison.cpp b/thread-poison.cpp new file mode 100644 index 0000000..b9d214e --- /dev/null +++ b/thread-poison.cpp @@ -0,0 +1,126 @@ +// rrdtool has a good discussion on thread-safe functions. +// rrd = round robin database. a circular data structure + +// rrd_is_thread_safe.h +` +#pragma GCC poison strtok asctime ctime gmtime localtime tmpnam strerror + +#if 0 +NOTES FOR RRD CONTRIBUTORS + +Some precautions must be followed when developing RRD from now on: + + Only use thread-safe functions in library code. Many often used libc functions + aren't thread-safe. Take care in the following situations or when using the + following library functions: + Direct calls to strerror() must be avoided: use rrd_strerror() instead, + it provides a per-thread error message. + + The getpw*, getgr*, gethost* function families (and some more get* + functions) are not thread-safe: use the *_r variants + + Time functions: asctime, ctime, gmtime, localtime: use *_r variants + + strtok: use strtok_r + + tmpnam: use tmpnam_r + + Many others (lookup documentation) + + A header file named rrd_is_thread_safe.h is provided that works with the + GNU C-preprocessor to "poison" some of the most common non-thread-safe + functions using the #pragma GCC poison directive. Just include this + header in source files you want to keep thread-safe. + + Do not introduce global variables! + + If you really, really have to use a global variable you may add a new + field to the rrd_context structure and modify rrd_error.c, + rrd_thread_safe.c and rrd_non_thread_safe.c + Do not use getopt or getopt_long in *_r (neither directly nor indirectly). + + getopt uses global variables and behaves badly in a multi-threaded + application when called concurrently. Instead provide a *_r function + taking all options as function parameters. You may provide argc and + **argv arguments for variable length argument lists. See rrd_update_r + as an example. + +#endif + +#include // strtok, strerr + + // char *strtok(char *str, const char *delim); + // char *strtok_r(char *str, const char *delim, char **saveptr); + // char *strerror(int errnum); + +#include // tmpnam + // char *tmpnam(char *s); + // Note: Avoid use of tmpnam(); use mkstemp(3) or tmpfile(3) instead. + // The tmpnam() function returns a pointer to a string that is a valid + // filename, and such that a file with this name did not exist at some + // point in time, so that naive programmers may think it a suitable name + // for a temporary file. If the argument s is NULL, this name is generated + // in an internal static buffer and may be overwritten by the next call + // to tmpnam(). If s is not NULL, the name is copied to the character + // array (of length at least L_tmpnam) pointed to by s and the value s is + // returned in case of success. + + // int mkstemp(char *template); + // int mkostemp(char *template, int flags); + // int mkstemps(char *template, int suffixlen); + // int mkostemps(char *template, int suffixlen, int flags); + + // DESCRIPTION + // The mkstemp() function generates a unique temporary filename from + // template, creates and opens the file, and returns an open file + // descriptor for the file. + + // The last six characters of template must be "XXXXXX" and these are + // replaced with a string that makes the filename unique. Since it + // will be modified, template must not be a string constant, but should + // be declared as a character array. + + // The file is created with permissions 0600, that is, read plus + // write for owner only. The returned file descriptor provides + // both read and write access to the file. The file is opened + // with the open(2) O_EXCL flag, guaranteeing that the caller + // is the process that creates the file. + + // The mkostemp() function is like mkstemp(), with the difference + // that the following bits—with the same meaning as for open(2)—may + // be specified in flags: O_APPEND, O_CLOEXEC, and O_SYNC. Note + // that when creating the file, mkostemp() includes the values + // O_RDWR, O_CREAT, and O_EXCL in the flags argument given to + // open(2); including these values in the flags argument given to + // mkostemp() is unnecessary, and produces errors on some systems. + + // The mkstemps() function is like mkstemp(), except that the + // string in template contains a suffix of suffixlen characters. + // Thus, template is of the form prefixXXXXXXsuffix, and the + // string XXXXXX is modified as for mkstemp(). + + // The mkostemps() function is to mkstemps() as mkostemp() is to mkstemp(). + + // RETURN VALUE + // On success, these functions return the file descriptor of the temporary file. On error, -1 is returned, and errno is set appropriately. + +#include // asctime, ctime, gmtine, localtime, + + // char *asctime(const struct tm *tm); + // char *asctime_r(const struct tm *tm, char *buf); + + // char *ctime(const time_t *timep); + // char *ctime_r(const time_t *timep, char *buf); + + // struct tm *gmtime(const time_t *timep); + // struct tm *gmtime_r(const time_t *timep, struct tm *result); + + // struct tm *localtime(const time_t *timep); + // struct tm *localtime_r(const time_t *timep, struct tm *result); + + // time_t mktime(struct tm *tm); + +int main(int argc, char*argv[]) +{ + char *tmp = tmpnam(NULL); +} diff --git a/threadsafeq.cpp b/threadsafeq.cpp index 073067c..0b9db44 100644 --- a/threadsafeq.cpp +++ b/threadsafeq.cpp @@ -4,12 +4,6 @@ #include #include // GB - needed for tsqueue Pop that waits if q empty -// includes for test program -#include -#include -#include -#include - // What is the difference between lock_guard and unique_lock? // The difference is that you can lock and unlock a std::unique_lock. // Must use a std::unique_lock for condition variables. @@ -78,6 +72,13 @@ class tsQueueWait : public tsQueue { // tsqueue Pop that waits for Push if q } }; +// includes for test program +#include +#include +#include +#include +#include + int main() { // runtime error: @@ -103,12 +104,12 @@ int main() int ii; while(q.tryPop(ii) == false) tries++; - std::ostringstream ss; - if(tries == 1) - ss << "thread " << i << " popped " << ii << "\n"; - else - ss << "thread " << i << " popped " << ii << " after " << tries << " tries" << "\n"; - std::cout << ss.str(); + std::string s = "thread " + std::to_string(i) + " popped " + std::to_string(ii); + if(tries != 1) { + s += " after " + std::to_string(tries) + " tries"; + } + s += "\n"; + std::cout << s; }; // test out the tryPop mechanism. @@ -139,9 +140,7 @@ int main() { int i; q.Pop(i); - std::ostringstream ss; - ss << "thread " << t << " popped " << i << " (after possible wait) " << "\n"; - std::cout << ss.str(); + std::cout << "thread " + std::to_string(t) + " popped " + std::to_string(i) + " (after possible wait)\n"; }; // Q N reads @@ -160,4 +159,5 @@ int main() } catch(...) { std::cerr << "program threw unspecified error" << "\n"; } + return 0; } diff --git a/threadsafequeue.cpp b/threadsafequeue.cpp index 7c390dd..a65ec93 100644 --- a/threadsafequeue.cpp +++ b/threadsafequeue.cpp @@ -6,7 +6,301 @@ // #include // random, compiles fine without cstdlib using namespace std; -#include "threadsafequeue.h" + +// #include "threadsafequeue.h" + +// +++++++++++++++++++++++++++++++++++++++++ +// + #include "threadsafequeue.h" START + +// +++++++++++++++++++++++++++++++++++++++++ +//#pragma once + +// http://gnodebian.blogspot.ca/2013/07/a-thread-safe-asynchronous-queue-in-c11.html + +#include +#include +#include +#include +// #include // GB not needed +#include + + + +/** A thread-safe asynchronous queue */ +template > +class SafeQueue +{ + + typedef typename Container::value_type value_type; + typedef typename Container::size_type size_type; + typedef Container container_type; + + public: + + /*! Create safe queue. */ + SafeQueue() = default; + SafeQueue (SafeQueue&& sq) + { + m_queue = std::move (sq.m_queue); + } + SafeQueue (const SafeQueue& sq) + { + std::lock_guard lock (sq.m_mutex); + m_queue = sq.m_queue; + } + + /*! Destroy safe queue. */ + ~SafeQueue() + { + std::lock_guard lock (m_mutex); + } + + /** + * Sets the maximum number of items in the queue. Defaults is 0: No limit + * \param[in] item An item. + */ + void set_max_num_items (unsigned int max_num_items) + { + m_max_num_items = max_num_items; + } + + /** + * Pushes the item into the queue. + * \param[in] item An item. + * \return true if an item was pushed into the queue + */ + bool push (const value_type& item) + { + std::lock_guard lock (m_mutex); + + if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) + return false; + + m_queue.push (item); + m_condition.notify_one(); + return true; + } + + /** + * Pushes the item into the queue. + * \param[in] item An item. + * \return true if an item was pushed into the queue + */ + bool push (const value_type&& item) + { + std::lock_guard lock (m_mutex); + + if (m_max_num_items > 0 && m_queue.size() > m_max_num_items) + return false; + + m_queue.push (item); + m_condition.notify_one(); + return true; + } + + /** + * Pops item from the queue. If queue is empty, this function blocks until item becomes available. + * \param[out] item The item. + */ + void pop (value_type& item) + { + std::unique_lock lock (m_mutex); + m_condition.wait (lock, [this]() // Lambda funct + { return !m_queue.empty(); }); + item = m_queue.front(); + m_queue.pop(); + } + + /** + * Pops item from the queue using the contained type's move assignment operator, if it has one.. + * This method is identical to the pop() method if that type has no move assignment operator. + * If queue is empty, this function blocks until item becomes available. + * \param[out] item The item. + */ + void move_pop (value_type& item) + { + std::unique_lock lock (m_mutex); + m_condition.wait (lock, [this]() // Lambda funct + { return !m_queue.empty(); }); + item = std::move (m_queue.front()); + m_queue.pop(); + } + + /** + * Tries to pop item from the queue. + * \param[out] item The item. + * \return False is returned if no item is available. + */ + bool try_pop (value_type& item) + { + std::lock_guard lock (m_mutex); + + if (m_queue.empty()) + return false; + + item = m_queue.front(); + m_queue.pop(); + return true; + } + + /** + * Tries to pop item from the queue using the contained type's move assignment operator, if it has one.. + * This method is identical to the try_pop() method if that type has no move assignment operator. + * \param[out] item The item. + * \return False is returned if no item is available. + */ + bool try_move_pop (value_type& item) + { + std::lock_guard lock (m_mutex); + + if (m_queue.empty()) + return false; + + item = std::move (m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available. + * \param[out] t An item. + * \param[in] timeout The number of microseconds to wait. + * \return true if get an item from the queue, false if no item is received before the timeout. + */ + bool timeout_pop (value_type& item, std::uint64_t timeout) + { + std::unique_lock lock (m_mutex); + + if (m_queue.empty()) + { + if (timeout == 0) + return false; + + if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout) + return false; + } + + item = m_queue.front(); + m_queue.pop(); + return true; + } + + /** + * Pops item from the queue using the contained type's move assignment operator, if it has one.. + * If the queue is empty, blocks for timeout microseconds, or until item becomes available. + * This method is identical to the try_pop() method if that type has no move assignment operator. + * \param[out] t An item. + * \param[in] timeout The number of microseconds to wait. + * \return true if get an item from the queue, false if no item is received before the timeout. + */ + bool timeout_move_pop (value_type& item, std::uint64_t timeout) + { + std::unique_lock lock (m_mutex); + + if (m_queue.empty()) + { + if (timeout == 0) + return false; + + if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout) + return false; + } + + item = std::move (m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Gets the number of items in the queue. + * \return Number of items in the queue. + */ + size_type size() const + { + std::lock_guard lock (m_mutex); + return m_queue.size(); + } + + /** + * Check if the queue is empty. + * \return true if queue is empty. + */ + bool empty() const + { + std::lock_guard lock (m_mutex); + return m_queue.empty(); + } + + /** + * Swaps the contents. + * \param[out] sq The SafeQueue to swap with 'this'. + */ + void swap (SafeQueue& sq) + { + if (this != &sq) + { + std::lock_guard lock1 (m_mutex); + std::lock_guard lock2 (sq.m_mutex); + m_queue.swap (sq.m_queue); + + if (!m_queue.empty()) + m_condition.notify_all(); + + if (!sq.m_queue.empty()) + sq.m_condition.notify_all(); + } + } + + /*! The copy assignment operator */ + SafeQueue& operator= (const SafeQueue& sq) + { + if (this != &sq) + { + std::lock_guard lock1 (m_mutex); + std::lock_guard lock2 (sq.m_mutex); + std::queue temp {sq.m_queue}; + m_queue.swap (temp); + + if (!m_queue.empty()) + m_condition.notify_all(); + } + + return *this; + } + + /*! The move assignment operator */ + SafeQueue& operator= (SafeQueue && sq) + { + if (this != &sq) // GB added if + { + std::unique_lock lock (m_mutex); + m_queue = std::move (sq.m_queue); + lock.unlock(); + + if (!m_queue.empty()) m_condition.notify_all(); + } + + return *this; + } + + + private: + + std::queue m_queue; + mutable std::mutex m_mutex; + std::condition_variable m_condition; + unsigned int m_max_num_items = 0; +}; + +/*! Swaps the contents of two SafeQueue objects. */ +template +void swap (SafeQueue& q1, SafeQueue& q2) +{ + q1.swap (q2); +} + +// +++++++++++++++++++++++++++++++++++++ +// + #include "threadsafequeue.h" END + +// +++++++++++++++++++++++++++++++++++++ // http://gnodebian.blogspot.ca/2013/07/a-thread-safe-asynchronous-queue-in-c11.html @@ -33,9 +327,7 @@ int main() // GB made this test program this_thread::sleep_for (std::chrono::milliseconds(nap)); double d = arg + 0.1; my_queue.push(d); - stringstream ss; - ss << "thread " << arg << " napped " << nap << " msec + pushed " << d << "\n"; - cout << ss.str(); + cout << string("thread ") + to_string(arg) + " napped " + to_string(nap) + " msec + pushed " + to_string(d) + "\n"; }; for(int i = 0; i < 21; i++) @@ -47,9 +339,7 @@ int main() // GB made this test program auto pop = [&my_queue] (int arg) { double d; my_queue.pop(d); - stringstream ss; - ss << "thread " << arg << " popped " << d << "\n"; - cout << ss.str(); + cout << string("thread ") + to_string(arg) + " popped " + to_string(d) + "\n"; }; t.clear(); diff --git a/unique_ptr-vec.cpp b/unique_ptr-vec.cpp new file mode 100644 index 0000000..d52ebe2 --- /dev/null +++ b/unique_ptr-vec.cpp @@ -0,0 +1,163 @@ +// g++ (Ubuntu 5.2.1-22ubuntu2) 5.2.1 20151010 + +#include +#include +#include +#include +#include + +using namespace std; + +class X { +public: + int time; + + X() :time(0) { cout << "X()\n"; } + X(int i) :time(i) { cout << "X("<> q;\n"; + // create queue + queue> q; + + // add element + unique_ptr p (new int{123}); + q.push(move(p)); + + // try to grab the element + auto p2 = move(q.front()); + q.pop(); + cout << *p2 << "\n"; + } + + ////////////////////////////////////////////// + { + // this works + cout << "\nqueue> q;\n"; + queue> q; + + q.push(move( unique_ptr (new X(123)) ) ); + q.push(move( unique_ptr (new X(99)) ) ); + + unique_ptr up (new X(1234)); + q.push(move(up)); + up = move(q.front()); + q.pop(); + + up -> print (); + up -> operator<< (cout); + + q.push(move( unique_ptr (new X(99)) ) ); + up = move(q.front()); + + up -> print (); + up -> operator<< (cout); + } + + ////////////////////////////////////////////// + { + + cout << "\npriority_queue> > q;\n"; + + class CompareX { + public: + bool operator()(const X& l, const X& r) const { return l.time > r.time; } + }; + + class CompareXup { + public: + bool operator()(const unique_ptr l, const unique_ptr r) const { return l->time > r->time; } + }; + auto lambdaUp = [] (const unique_ptr l, const unique_ptr r) { return l->time > r->time; }; + auto lambdaX = [] (const X& l, const X& r) { return l.time > r.time; }; + + // priority_queue, vector> > q; // WORKS!, without compare function + // priority_queue, vector>, CompareX > q; // ERROR! + // priority_queue, vector>, CompareXup > q; // ERROR! + // priority_queue, vector>, decltype(lambdaUp) > q; // ERROR! + + priority_queue> > q; // WORKS!, without compare function + // priority_queue>, CompareX > q; // ERROR! + // priority_queue>, CompareXup > q; // ERROR! + // priority_queue>, decltype(lambdaUp) > q; // ERROR! + + q.push(move( unique_ptr (new X(123)) ) ); + q.top() -> print(); // works + + q.push(move( unique_ptr (new X(99)) ) ); + q.top() -> print(); // works + + unique_ptr up (new X(456)); + q.push(move(up)); + q.top() -> print(); // works + + q.top() -> operator<< (cout); + + // auto p2 = move(q.top()); // FAILS! top() is const. Cannot move(const) + // auto p2 = q.top().release(); // FAILS! + + q.pop(); + } + { + cout << "\npriority_queue pair, container, and compare function\n"; + // http://en.cppreference.com/w/cpp/container/priority_queue/priority_queue + using my_pair_t = std::pair; + + using my_container_t = std::vector; + + auto my_comp = + [](const my_pair_t& e1, const my_pair_t& e2) + { return e1.first > e2.first; }; + std::priority_queue queue(my_comp); + + queue.push(std::make_pair(5, true)); + queue.push(std::make_pair(3, false)); + queue.push(std::make_pair(7, true)); + + std::cout << std::boolalpha; // print bool as false/true not 0/1 + while(!queue.empty()) { + const auto& p = queue.top(); + std::cout << p.first << " " << p.second << "\n"; + queue.pop(); + } + } + { + cout << "\nclass X hack priority_queue pair, container, and compare function\n"; + // http://en.cppreference.com/w/cpp/container/priority_queue/priority_queue + using my_pair_t = X; + + using my_container_t = std::vector; + + auto my_comp = + [](const my_pair_t& e1, const my_pair_t& e2) + { return e1.time > e2.time; }; + std::priority_queue< my_pair_t, + my_container_t, + decltype(my_comp) + > queue(my_comp); // NOTE my_comp is used twice + + cout << "queue.push(X(5));\n"; + queue.push(X(5)); + + cout << "queue.push(X(3));\n"; + queue.push(X(3)); + + queue.push(X(7)); + cout << "queue.push(X(7));\n"; + + while(!queue.empty()) { + const auto& p = queue.top(); + p.print(); + queue.top().print(); + queue.pop(); + } + } +} // end main diff --git a/weak-ptr-sutter.cpp b/weak-ptr-sutter.cpp new file mode 100644 index 0000000..2ede560 --- /dev/null +++ b/weak-ptr-sutter.cpp @@ -0,0 +1,79 @@ +// http://flyingfrogblog.blogspot.ca/2013/10/herb-sutters-favorite-c-10-liner.html + +// a complete reference-counted object cache, or in other words a concurrent weak dictionary, specifically one with weak values. + +// GB - not sure this works or is useful. + + + #include + #include + #include + #include // shared_ptr/weak_ptr + + using namespace std; + + class widget { + int id; + public: + widget() :id(-1) { cout << "widget() " << (void*) this << "\n"; } + widget(int i) :id(i) { cout << "widget(" << id << ") " << (void*) this << "\n"; } + ~widget() { cout << "~widget() " << id << "," << (void*) this << "\n"; } + }; + +// In a recently-posted video, Herb Sutter (a prominent C++ expert) describes his favorite C++ 10-liner as “a thread-safe reference-counted object cache”: +// https://channel9.msdn.com/Events/GoingNative/2013/My-Favorite-Cpp-10-Liner + + +// isocpp.org + +widget& instance() { // Scott Meyers singleton + static widget w; + return w; +} + +shared_ptr load_widget(int id) +{ + + // 1. allocate memory as a smart pionter + shared_ptr sp(new widget(id)); + + // 2. read widget id from backing store + // ... + + // 3. return smart pointer + return sp; +} + +shared_ptr get_widget(int id) { // Sutter's fav 10 lines of code (auctually 7 lines) + static map> cache; // thread safe static variable + static mutex m; + + lock_guard hold(m); + auto sp = cache[id].lock(); + if (!sp) cache[id] = sp = load_widget(id); + return sp; +} + + +// This example is very interesting. Firstly, it manages to pull in reference counting, weak references and a mutex which are all very rare in modern programming. Secondly, it contains a memory leak that is difficult to fix in C++ because APIs are burdened with memory management details and this API is incapable of expressing deterministic cleanup because there is no facility for a widget's destructor to remove its entry in the map. Finally, the correct name for this data structure is a concurrent weak dictionary, specifically one with weak values. You'll find correct implementations of this data structure are widely available for C#, F# and Java such as the one here. + +// The obvious fix is to sweep stale entries from the map when get_widget is called but this leaves floating garbage in the map between calls to get_widget, is asymptotically less efficient and incurs unbounded pauses for an unbounded number of threads. + +// Update: Matthew Avery (from the USA) suggests altering the API and semantics of the functions involved so load_widget returns a shared_ptr with a custom deleter that removes the stale map entry as soon as a widget is destructed. If this idea can be made to work then it would be the only deterministic solution to have been proposed to date. + + +int main() +{ + shared_ptr sp; + + for(int i = 10; i < 15; i++) { + sp = get_widget(i); + cout << "main:sp=" << i << "," << (void*) sp.get() << "\n"; + } + + cout << "\n"; + for(int i = 10; i < 15; i++) { + sp = get_widget(i); + cout << "main:sp=" << i << "," << (void*) sp.get() << "\n"; + } +}