-
Notifications
You must be signed in to change notification settings - Fork 1
/
execute.c
180 lines (156 loc) · 5.03 KB
/
execute.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
"Система обработки транзакций с использованием
распределенных колоночных индексов"
Модуль исполнения запросов.
Авторы: Иванова Е.В., Соколинский Л.Б.
*/
#include "execute.h"
#include "cicompress.h"
int execute()
{
int segment_num; // Счетчик сегментов
int i;
int *tmp_segmentR, *tmp_segmentS;
double start_t, end_t;
double start_calc_t, end_calc_t;
#ifdef USE_COMPRESS // Режим с использованием сжатия данных
start_t = omp_get_wtime(); // Замер времени
long int max_byte_segm_size = (long int)(MAX_SEGMENT_S_SIZE*sizeof(int)*2.4 + 12);
long int uncompress_size = max_byte_segm_size;
// Выделение памяти для временного хранения сегментов
tmp_segmentR = memalign(ALIGN, 2*max_byte_segm_size*sizeof(int));
tmp_segmentS = memalign(ALIGN, 2*max_byte_segm_size*sizeof(int));
if((tmp_segmentR == NULL) || (tmp_segmentS == NULL)) {
printf("ERROR: could not allocate memmory for tmp_segment R or S!\n");
}
end_t = omp_get_wtime(); // Замер времени
UNCOMPRESS_TIME += (end_t - start_t);
#endif
// Распределение сегментов по нитям
#pragma omp for schedule(dynamic, 1)
for(segment_num=0; segment_num<NUM_SEGMENTS; segment_num++) {
#ifdef USE_COMPRESS // Режим с использвоанием сжатия данных
#ifdef USE_UNCOMPRESS_TIME
//if(omp_get_thread_num() == 0)
start_t = omp_get_wtime(); // Замер времени
#endif
uncompress_size = max_byte_segm_size;
segment_uncompress(segmentR[segment_num].ptr, segmentR[segment_num].compress_size, tmp_segmentR, &uncompress_size);
uncompress_size = max_byte_segm_size;
segment_uncompress(segmentS[segment_num].ptr, segmentS[segment_num].compress_size, tmp_segmentS, &uncompress_size);
#ifdef USE_UNCOMPRESS_TIME
//if(omp_get_thread_num() == 0) {
end_t = omp_get_wtime(); // Замер времени
#pragma omp atomic
UNCOMPRESS_TIME += (end_t - start_t);
//}
#endif
#else
tmp_segmentR = (int *)segmentR[segment_num].ptr;
tmp_segmentS = (int *)segmentS[segment_num].ptr;
#endif
#ifdef USE_CALC_TIME
//if(omp_get_thread_num() == 0)
start_calc_t = omp_get_wtime(); // Замер времени
#endif
//merge_join(tmp_segmentR, segmentR[segment_num].size, tmp_segmentS, segmentS[segment_num].size, (int *)segmentRes[segment_num].ptr, &segmentRes[segment_num].size);
teta_join(tmp_segmentR, segmentR[segment_num].size, tmp_segmentS, segmentS[segment_num].size, (int *)segmentRes[segment_num].ptr, &segmentRes[segment_num].size);
#ifdef USE_CALC_TIME
//if(omp_get_thread_num() == 0) {
end_calc_t = omp_get_wtime(); // Замер времени
#pragma omp atomic
CALC_TIME += (end_calc_t - start_calc_t);
//}
#endif
}
#ifdef USE_COMPRESS // Режим с использвоанием сжатия данных
free(tmp_segmentR);
free(tmp_segmentS);
#endif
return 0;
}
int teta_join(int *Rbuf, int sizeR, int *Sbuf, int sizeS, int *Resbuf, int *sizeRes)
{
int startR, endR, startS, endS;
int iR, iS, i;
int cnt = 0;
startR = 0;
endR = sizeR;
startS = 0;
endS = sizeS;
for(iR=startR; iR<endR; iR++) {
for(iS=startS; iS<endS; iS++) {
if(Rbuf[db_index_value(sizeR, iR)] < Sbuf[db_index_value(sizeS, iS)]) {
Resbuf[res_index_key1(cnt)] = Rbuf[db_index_key(sizeR, iR)];
Resbuf[res_index_key2(cnt)] = Sbuf[db_index_key(sizeS, iS)];
cnt++;
for(i=iS+1; i<endS; i++) {
Resbuf[res_index_key1(cnt)] = Rbuf[db_index_key(sizeR, iR)];
Resbuf[res_index_key2(cnt)] = Sbuf[db_index_key(sizeS, i)];
cnt++;
}
//cnt += endS-iS+1;
startS = iS;
break;
}
}
}
*sizeRes = cnt;
return 0;
}
// Соединение слиянием
int merge_join(int *Rbuf, int sizeR, int *Sbuf, int sizeS, int *Resbuf, int *sizeRes)
{
int startR, endR, startS, endS;
int iR, iS;
int cnt = 0;
startR = 0;
endR = sizeR;
startS = 0;
endS = sizeS;
for(iR=startR; iR<endR; iR++) {
#ifdef USE_FLOPS
#pragma omp atomic
FLOPS++;
printf("+1\n");
#endif
for(iS=startS; iS<endS; iS++) {
#ifdef USE_FLOPS
#pragma omp atomic
FLOPS++;
printf("+1\n");
#endif
if(Rbuf[db_index_value(sizeR, iR)] == Sbuf[db_index_value(sizeS, iS)]) {
Resbuf[res_index_key1(cnt)] = Rbuf[db_index_key(sizeR, iR)];
Resbuf[res_index_key2(cnt)] = Sbuf[db_index_key(sizeS, iS)];
cnt++;
#ifdef USE_FLOPS
#pragma omp atomic
FLOPS += 4;
printf("+4\n");
#endif
}
else {
if(Sbuf[db_index_value(sizeS, iS)] < Rbuf[db_index_value(sizeR, iR)]) {
iS++;
#ifdef USE_FLOPS
#pragma omp atomic
FLOPS += 3;
printf("+3\n");
#endif
}
else {
startS = iS;
#ifdef USE_FLOPS
#pragma omp atomic
FLOPS += 2;
printf("+2\n");
#endif
break;
}
}
}
}
*sizeRes = cnt;
return 0;
}