-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
README_cluster
102 lines (70 loc) · 3.73 KB
/
README_cluster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
The implementation of Non-linear (Preconditioned) Conjugate Gradient,
LBFGS, online learning, and adaptive online learning works on clusters
(both Hadoop and otherwise) now.
To build the code, run make.
At a high level, the code operates by repeatedly executing something
equivalent to the MPI AllReduce function---adding up floats from all
nodes then broadcasting them back to each individual node. In order
to do this, a spanning tree over the nodes must be created. This is
done using the helper daemon 'allreduce_master'.
***********************************************************************
To run the code on non-Hadoop clusters, the script 'single_machine'
has the simplest possible invocation.
In general: start the span server on one of the cluster nodes:
./spanning_tree
Launch vw on each of the worker nodes:
./vw --span_server <location> --total <t> --node <n> --unique_id <u> -d <file>
where:
<location> is the host running spanning_tree
<t> is the total number of nodes
<n> is the node id number
<u> is a number shared by all nodes in the process
<file> is the input source file for that node
***********************************************************************
To run the code on Hadoop clusters:
Decide if you are going to control the number of tasks by:
(a) using gzip compressed files which cannot be broken up by Hadoop
(b) controlling the number of reducers.
We'll assume (a) below.
Start the span server for the Hadoop cluster:
./spanning_tree
Start the map-reduce job using Hadoop streaming:
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-files vw,runvw.sh \
-Dmapred.job.map.memory.mb=2500 -input <input> -output <output> \
-mapper runvw.sh -reducer NONE
where <output> is the directory on HDFS where you want the trained
model to be saved. The trained model is saved to the file
<output>/model on HDFS and can be retreived by hadoop -get.
To modify the arguments to VW, edit the script runvw.sh. Arguments to
hadoop can be directly added in the hadoop streaming command.
See the 'mapscript.sh' which uses 'runvw.sh' for an advanced example
of running VW in a Hadoop enviornmnent.
************************************************************************
The files you need to know about:
runvw.sh: This is the mapper code. It takes as arguments:
The output directory. The trained model from the first mapper is
stored as the file "model" in the output directory.
The hostname of the cluster gateway, so that the mappers can connect
to the gateway
All the other standard VW options are currently hardcoded in the
script, feel free to mess around with them.
#########################################################################
spanning_tree.cc: This is the span server code which runs on the
gateway. You start it before the call to hadoop. The span server
backgrounds itself after starting and listens for incoming
connections. It sets up the topology on the mappers and then let them
communicate amongst themselves.
#########################################################################
allreduce.h: This is the header file for the nodes.
#########################################################################
allreduce.cc: This is the code for doing allreduce. It implement the
routine described above. all_reduce is implemented as a combination of
reduce and broadcast routines. reduce reads data from children, adds it
with local data and passes it up to the parent with a call to pass_up.
broadcast receives data from parent, and passes it down to children with
a call to pass_down.
#########################################################################
cg.cc, gd.cc, bfgs.cc: learning algorithms which use all_reduce
whenever communication is needed. Uses routines accumulate and
accumulate_scalar to reduce vectors and scalars resp.