-
Notifications
You must be signed in to change notification settings - Fork 0
/
knn_stl.cpp
153 lines (124 loc) · 3.99 KB
/
knn_stl.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
* Parallel and Distributed Systems 2020/2021
* @author Fabio Murgese
*
* Standard library parallel implementation of Knn.
*
* Usage: ./knn_stl input.csv 10 4
* for 4 workers (+1 writer) computing 10-nn over input file
* */
#include <cstdlib>
#include <vector>
#include <queue>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <thread>
#include "read_file.cpp"
#include "utils.cpp"
#include "myqueue.cpp"
#include "utimer.cpp"
using namespace std;
// function for the single worker to execute
void job(vector<pair<float,float>> &data, int k, vector<myqueue<string>*> &out,
int nw, int threadid, int size) {
myqueue<string>* myqueue = out[threadid];
for (int i=threadid; i<size; i+=nw) {
// list where will be stored neighbors for element i
vector<pair<float, int>> neighbors;
{
utimer t0("Prova");
// compute and store distance and item in the vector
for (int j=0; j<data.size(); j++) {
if (i==j)
continue;
neighbors.push_back(move(make_pair(euclidean_distance(data[i], data[j]), j)));
}
}
{
utimer t1("heap");
// sort neighbors by distance
make_heap(neighbors.begin(), neighbors.end());
sort_heap(neighbors.begin(), neighbors.end());
}
myqueue->push(to_string(i+1) + ": { " + write_best_neighbors(neighbors, k) + " }\n");
}
// notify termination status
myqueue->push("");
}
// function for the writer to execute
void print(vector<myqueue<string>*> &out, int nw) {
string s = "";
string to_flush = "";
ofstream outfile;
outfile.open("output_stl.txt");
int current = 0;
int previous = 0;
// for each worker running
while (nw > 0) {
// pop from current queue
to_flush = out[current]->pop();
previous = current;
current = (current+1) % nw;
if (to_flush == "") {
// erase the EOS previously received
out.erase(out.begin()+previous);
nw--; // decrease number of workers
if (current == nw) {
current = 0;
}
} else {
s.append(to_flush);
}
}
// write and close file
outfile << s << endl;
outfile.close();
}
int main(int argc, char* argv[]) {
if (argc < 4) {
cout << "Usage: " + string (argv[0]) + " input_file k nw" << endl;
return -1;
}
char* name = argv[1]; // input file
int k = atoi(argv[2]); // k parameter
int nw = atoi(argv[3]); // number of workers
vector<thread> threads; // workers
vector<pair<float, float>> data; // points
vector<myqueue<string>*> out(nw);
{
utimer reader("Reading input file");
data = ReadFile(name);
}
int size = data.size();
{
utimer knn("Knn");
// number of workers as nw
for (int i=0; i<nw; ++i) {
// output queue for the single worker
out[i] = new myqueue<string>();
threads.push_back(move(thread([&data, k, &out, nw, i, size](){
job(data, k, out, nw, i, size);
})));
// create a cpu_set_t object representing a set of CPUs
// clear it and mark only CPU i as set
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
// Give the native handler a mask that tells on which core the thread should run
int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) { // possible failure by hardware limitations
std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
}
}
// writer to clear out the queues
threads.push_back(move(thread([&out, nw]() {
print(out, nw);
})));
// waiting for the threads to finish execution
for (auto &t: threads) {
t.join();
}
}
return 0;
}