/
util.rb
163 lines (142 loc) · 4.57 KB
/
util.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/ruby
module Gearman
# = Util
#
# == Description
# Static helper methods and data used by other classes.
class Util
# Map from Integer representations of commands used in the network
# protocol to more-convenient symbols.
COMMANDS = {
1 => :can_do, # W->J: FUNC
23 => :can_do_timeout, # W->J: FUNC[0]TIMEOUT
2 => :cant_do, # W->J: FUNC
3 => :reset_abilities, # W->J: --
22 => :set_client_id, # W->J: [RANDOM_STRING_NO_WHITESPACE]
4 => :pre_sleep, # W->J: --
6 => :noop, # J->W: --
7 => :submit_job, # C->J: FUNC[0]UNIQ[0]ARGS
21 => :submit_job_high, # C->J: FUNC[0]UNIQ[0]ARGS
18 => :submit_job_bg, # C->J: FUNC[0]UNIQ[0]ARGS
8 => :job_created, # J->C: HANDLE
9 => :grab_job, # W->J: --
10 => :no_job, # J->W: --
11 => :job_assign, # J->W: HANDLE[0]FUNC[0]ARG
12 => :work_status, # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
13 => :work_complete, # W->J/C: HANDLE[0]RES
14 => :work_fail, # W->J/C: HANDLE
15 => :get_status, # C->J: HANDLE
20 => :status_res, # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
16 => :echo_req, # ?->J: TEXT
17 => :echo_res, # J->?: TEXT
19 => :error, # J->?: ERRCODE[0]ERR_TEXT
}
# Map e.g. 'can_do' => 1
NUMS = COMMANDS.invert
@@debug = false
##
# Enable or disable debugging output (off by default).
#
# @param v print debugging output
def Util.debug=(v)
@@debug = v
end
##
# Construct a request packet.
#
# @param type_name command type's name (see COMMANDS)
# @param arg optional data to pack into the command
# @return packet (as a string)
def Util.pack_request(type_name, arg='')
type_num = NUMS[type_name.to_sym]
raise InvalidArgsError, "Invalid type name '#{type_name}'" unless type_num
"\0REQ" + [type_num, arg.size].pack('NN') + arg
end
##
# Return a Task based on the passed-in arguments.
#
# @param args either a single Task object or the arguments accepted by
# Task.new
# @return Task object
def Util.get_task_from_args(*args)
if args[0].class == Task
return args[0]
elsif args.size <= 3
return Task.new(*args)
else
raise InvalidArgsError, 'Incorrect number of args to get_task_from_args'
end
end
##
# Read a response packet from a socket.
#
# @param sock Socket connected to a job server
# @param timeout timeout in seconds, 0 to disable (grr, doesn't work)
# @return array consisting of integer packet type and data
def Util.read_response(sock, timeout=0)
# FIXME: use a non-blocking socket and do the work ourselves, i guess...
#sock.setsockopt(Socket::IPPROTO_TCP, Socket::SO_RCVTIMEO, timeout)
head = sock.recv(12)
magic, type, len = head.unpack('a4NN')
raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
buf = (len > 0) ? sock.recv(len) : ''
type = COMMANDS[type]
raise ProtocolError, "Invalid packet type #{type}" unless type
[type, buf]
end
##
# Send a request packet over a socket.
#
# @param sock Socket connected to a job server
# @param req request packet to send
def Util.send_request(sock, req)
len = sock.write(req)
if len != req.size
raise NetworkError, "Wrote #{len} instead of #{req.size}"
end
end
##
# Add default ports to a job server or list of servers.
#
# @param servers a server hostname or "host:port" or array of servers
# @return an array of "host:port" strings
def Util.normalize_job_servers(servers)
if servers.class == String or servers.class == Symbol
servers = [ servers.to_s ]
end
servers.map {|s| s =~ /:/ ? s : "#{s}:#{DEFAULT_PORT}" }
end
##
# Convert job server info and a handle into a string.
#
# @param hostport "host:port" of job server
# @param handle job server-returned handle for a task
# @return "host:port//handle"
def Util.handle_to_str(hostport, handle)
"#{hostport}//#{handle}"
end
##
# Reverse Util.handle_to_str.
#
# @param str "host:port//handle"
# @return [hostport, handle]
def Util.str_to_handle(str)
str =~ %r{^([^:]+:\d+)//(.+)}
return [$1, $3]
end
##
# Log a message if debugging is enabled.
#
# @param str message to log
def Util.log(str, force=false)
puts "#{Time.now.strftime '%Y%m%d %H%M%S'} #{str}" if force or @@debug
end
##
# Log a message no matter what.
#
# @param str message to log
def Util.err(str)
log(str, true)
end
end
end