-
Notifications
You must be signed in to change notification settings - Fork 25
/
libtv.rb
257 lines (221 loc) · 5.42 KB
/
libtv.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#! /usr/bin/env ruby
require 'gserver'
require 'helper'
require 'sqlop/tv_view_count'
require 'fileutils'
require 'termcast_config'
require 'thread'
module TV
@@tv_args = nil
@channel_server = !!ENV['TV_CHANNEL_SERVER']
TV_QUEUE_DIR = 'tmp/tv'
TV_QUEUE_FILE = 'tv.queue'
TV_LOCK_FILE = 'tv.queue.lock'
TV_LOG_FILE = 'tv.queue.log'
def self.queue_dir
TV_QUEUE_DIR
end
def self.queue_dir_file(file)
FileUtils.mkdir_p(self.queue_dir)
File.join(self.queue_dir, file)
end
def self.queue_file
queue_dir_file(TV_QUEUE_FILE)
end
def self.lock_file
queue_dir_file(TV_LOCK_FILE)
end
def self.log_file
queue_dir_file(TV_LOG_FILE)
end
def self.channel_server?
@channel_server
end
def self.as_channel_server
old_channel_server = @channel_server
old_env = ENV['TV_CHANNEL_SERVER']
begin
@channel_server = true
ENV['TV_CHANNEL_SERVER'] = 'y'
yield
ensure
@channel_server = old_channel_server
ENV['TV_CHANNEL_SERVER'] = old_env
end
end
class SyncQueue < Array
attr_reader :mutex, :c
def initialize(*args)
super
@mutex = Mutex.new
@c = ConditionVariable.new
end
def wait
c.wait(mutex)
end
def signal
c.signal
end
def synchronize(&block)
mutex.synchronize(&block)
end
end
# Serves TV requests to FooTV instances.
class TVServ < GServer
def initialize(port = 21976, host = "0.0.0.0")
puts "Starting TV notification server."
@started = Time.now.strftime("%s").to_i
@clients = []
@mutex = Mutex.new
@monitor = nil
super(port, host, Float::MAX, $stderr, true)
end
def bootstrap_client
client_request_queue = SyncQueue.new
@mutex.synchronize do
@clients << client_request_queue
unless @monitor
@monitor = Thread.new { run_monitor }
end
end
client_request_queue
end
def run_monitor
begin
while true
open(TV.queue_file, 'r+') do |af|
TV.flock(af, File::LOCK_EX) do |f|
lines = f.readlines
f.truncate(0)
new_lines = lines.find_all do |line|
if line =~ /^(\d+) .*/
start = $1.to_i
start >= @started
end
end
clients = @mutex.synchronize { @clients }
clients.each do |c|
c.synchronize do
c.push(*new_lines)
c.signal
end
end
end
end
sleep 3
end
rescue
puts "Monitor: #$!"
end
end
def serve(sock)
client_queue = nil
begin
client_queue = bootstrap_client()
while true
client_queue.synchronize do
client_queue.wait
client_queue.each do |q|
sock.write(q)
sock.flush
end
client_queue.clear
end
end
rescue
puts "Ack: #$!"
ensure
if client_queue
@mutex.synchronize do
@clients.delete_if { |q| q.equal?(client_queue) }
end
end
end
end
end
def self.flock(file, mode)
success = file.flock(mode)
if success
begin
res = yield file
return res
ensure
file.flock(File::LOCK_UN)
end
end
nil
end
def self.oflock(filename, mode)
open(filename, 'w') do |of|
flock(of, mode) do |f|
return yield(f)
end
end
nil
end
def self.launch_daemon()
return if fork()
begin
Process.setsid
ensure
end
# Try for a lock, but do not block
oflock(TV.lock_file, File::LOCK_EX | File::LOCK_NB) do |f|
# Be a good citizen:
logfile = File.open(TV.log_file, 'w')
logfile.sync = true
STDOUT.reopen(logfile)
STDERR.reopen(logfile)
STDIN.close()
# Start the notification server and wait on it.
tv = TVServ.new
tv.start()
tv.join()
end
exit 0
end
def self.request_game(g)
# Launch a daemon that keeps a server socket open for interested
# parties (i.e. C-SPLAT) to listen in.
launch_daemon()
open(TV.queue_file, 'a') do |file|
flock(file, File::LOCK_EX) do |f|
# Make sure we're really at eof.
f.seek(0, IO::SEEK_END)
stripped = g
f.puts "#{Time.now.strftime('%s')} #{munge_game(stripped)}"
end
end
end
def self.tv_description(tv)
"#{tv}: #{TermcastConfig.client_urls.join(' or ')}"
end
def self.request_game_verbosely(n, g, who, tv_opt)
puts(self.request_game_msg(n, g, who, tv_opt))
end
def self.request_game_msg(n, g, who, tv_opt)
msg = nil
summary = short_game_summary(g)
if tv_opt && tv_opt[:channel]
tv_opt[:channel] = tv_opt.channel_name(g)
end
tv = (tv_opt && tv_opt[:channel]) || 'FooTV'
unless TV.channel_server?
if tv_opt && tv_opt[:nuke]
msg = "#{tv} playlist clear requested by #{who}."
else
suffix = tv_opt && tv_opt[:cancel] ? ' cancel' : ''
msg = "#{n}. #{summary}#{suffix} requested for #{tv_description(tv)}."
end
Sqlop::TVViewCount.increment(g)
g['req'] = ARGV[1]
end
g = g.merge(tv_opt.opts) if tv_opt
if TV.channel_server?
return "#{n}. :#{munge_game(g)}:"
else
request_game(g)
msg
end
end
end