-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
accumulate.cc
139 lines (116 loc) · 4.53 KB
/
accumulate.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
Copyright (c) by respective owners including Yahoo!, Microsoft, and
individual contributors. All rights reserved. Released under a BSD (revised)
license as described in the file LICENSE.
*/
/*
This implements the allreduce function of MPI. Code primarily by
Alekh Agarwal and John Langford, with help Olivier Chapelle.
*/
#include <iostream>
#include <sys/timeb.h>
#include <cmath>
#include <stdint.h>
#include "global_data.h"
#include "vw_allreduce.h"
using namespace std;
void add_float(float& c1, const float& c2) { c1 += c2; }
void accumulate(vw& all, parameters& weights, size_t offset)
{ uint64_t length = 1 << all.num_bits; //This is size of gradient
float* local_grad = new float[length];
if (weights.sparse)
for (uint64_t i = 0; i < length; i++)
local_grad[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset];
else
for (uint64_t i = 0; i < length; i++)
local_grad[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset];
all_reduce<float, add_float>(all, local_grad, length); //TODO: modify to not use first()
if (weights.sparse)
for (uint64_t i = 0; i < length; i++)
(&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset] = local_grad[i];
else
for (uint64_t i = 0; i < length; i++)
(&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset] = local_grad[i];
delete[] local_grad;
}
float accumulate_scalar(vw& all, float local_sum)
{ float temp = local_sum;
all_reduce<float, add_float>(all, &temp, 1);
return temp;
}
void accumulate_avg(vw& all, parameters& weights, size_t offset)
{ uint32_t length = 1 << all.num_bits; //This is size of gradient
float numnodes = (float)all.all_reduce->total;
float* local_grad = new float[length];
if (weights.sparse)
for (uint64_t i = 0; i < length; i++)
local_grad[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset];
else
for (uint64_t i = 0; i < length; i++)
local_grad[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset];
all_reduce<float, add_float>(all, local_grad, length); //TODO: modify to not use first()
if (weights.sparse)
for (uint64_t i = 0; i < length; i++)
(&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[offset] = local_grad[i] / numnodes;
else
for (uint64_t i = 0; i < length; i++)
(&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[offset] = local_grad[i] / numnodes;
delete[] local_grad;
}
float max_elem(float* arr, int length)
{ float max = arr[0];
for(int i = 1; i < length; i++)
if(arr[i] > max) max = arr[i];
return max;
}
float min_elem(float* arr, int length)
{ float min = arr[0];
for(int i = 1; i < length; i++)
if(arr[i] < min && arr[i] > 0.001) min = arr[i];
return min;
}
template<class T>
void do_weighting(vw& all, uint64_t length, float* local_weights, T& weights)
{
for (uint64_t i = 0; i < length; i++)
{
float* weight = &weights[i << weights.stride_shift()];
if (local_weights[i] > 0)
{ float ratio = weight[1] / local_weights[i];
local_weights[i] = weight[0] * ratio;
weight[0] *= ratio;
weight[1] *= ratio; //A crude max
if (all.normalized_updates)
weight[all.normalized_idx] *= ratio; //A crude max
}
else
{ local_weights[i] = 0;
*weight = 0;
}
}
}
void accumulate_weighted_avg(vw& all, parameters& weights)
{ if(!all.adaptive)
{ all.trace_message<<"Weighted averaging is implemented only for adaptive gradient, use accumulate_avg instead\n";
return;
}
uint32_t length = 1 << all.num_bits; //This is the number of parameters
float* local_weights = new float[length];
if (weights.sparse)
for (uint64_t i = 0; i < length; i++)
local_weights[i] = (&(weights.sparse_weights[i << weights.sparse_weights.stride_shift()]))[1];
else
for (uint64_t i = 0; i < length; i++)
local_weights[i] = (&(weights.dense_weights[i << weights.dense_weights.stride_shift()]))[1];
//First compute weights for averaging
all_reduce<float, add_float>(all, local_weights, length);
if (weights.sparse)
do_weighting(all, length, local_weights, weights.sparse_weights);
else
do_weighting(all, length, local_weights, weights.dense_weights);
if (weights.sparse)
cout << "sparse parameters not supported with parallel computation!" << endl;
else
all_reduce<float, add_float>(all, weights.dense_weights.first(), length*weights.stride_shift());
delete[] local_weights;
}