diff --git a/.travis.yml b/.travis.yml index 27279dd..6507bae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,6 @@ env: - DB=sqlite - DB=postgres before_script: - - bundle exec bin/kaede dbus-policy $USER > kaede.conf - - sudo mv kaede.conf /etc/dbus-1/system.d/kaede.conf - psql -c "CREATE ROLE kaede WITH LOGIN" -U postgres - psql -c "CREATE DATABASE kaede_test WITH OWNER = kaede ENCODING = 'UTF-8' TEMPLATE = template0" -U postgres matrix: diff --git a/README.md b/README.md index 617268a..e0c2222 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,6 @@ Or install it yourself as: ### Requirements - SQLite3 or PostgreSQL - redis -- dbus - recpt1 - b25 - [statvfs](https://github.com/eagletmt/eagletmt-recutils/tree/master/statvfs) @@ -35,9 +34,6 @@ Some of them should be optional, though. ### Setup ```sh -kaede dbus-policy $KAEDE_USER > kaede.conf -sudo mv kaede.conf /etc/dbus-1/system.d/kaede.conf - cp kaede.rb.sample kaede.rb vim kaede.rb gem install pg # gem install sqlite3 diff --git a/kaede.gemspec b/kaede.gemspec index 6203df5..8128de9 100644 --- a/kaede.gemspec +++ b/kaede.gemspec @@ -29,10 +29,10 @@ Gem::Specification.new do |spec| spec.add_development_dependency "vcr" spec.add_development_dependency "webmock" spec.add_dependency "fluent-logger" + spec.add_dependency "grpc" spec.add_dependency "nokogiri" spec.add_dependency "redis" spec.add_dependency "retryable", ">= 2.0.0" - spec.add_dependency "ruby-dbus" spec.add_dependency "sequel" spec.add_dependency "sleepy_penguin" spec.add_dependency "thor" diff --git a/kaede.rb.sample b/kaede.rb.sample index b26597c..04861e8 100644 --- a/kaede.rb.sample +++ b/kaede.rb.sample @@ -1,6 +1,10 @@ require 'redis' require 'twitter' +GRPC.extend(Logging.globally) +Logging.logger.root.appenders = Logging.appenders.stdout +Logging.logger.root.level = :debug + Kaede.configure do |config| config.b25 = '/usr/bin/b25' config.recpt1 = '/usr/bin/recpt1' diff --git a/lib/kaede/cli.rb b/lib/kaede/cli.rb index 9f1bab4..3263f50 100644 --- a/lib/kaede/cli.rb +++ b/lib/kaede/cli.rb @@ -62,13 +62,6 @@ def update Kaede::Updater.new(db, syobocal).update end - desc 'dbus-policy USER', 'Generate dbus policy file' - def dbus_policy(user) - require 'kaede/dbus/generator' - - puts DBus::Generator.new.generate_policy(user) - end - desc 'db-prepare', 'Create tables' def db_prepare require 'kaede/database' diff --git a/lib/kaede/config.rb b/lib/kaede/config.rb index fb2d7c5..df3dd38 100644 --- a/lib/kaede/config.rb +++ b/lib/kaede/config.rb @@ -3,7 +3,7 @@ module Kaede class Config - attr_accessor :database_url, :redis, :redis_queue, :fluent_tag_prefix, :fluent_host, :fluent_port + attr_accessor :database_url, :redis, :redis_queue, :fluent_tag_prefix, :fluent_host, :fluent_port, :grpc_port path_attrs = [:b25, :recpt1, :assdumper, :clean_ts, :statvfs, :record_dir, :cache_dir, :cabinet_dir] attr_reader *path_attrs @@ -26,6 +26,7 @@ def initialize self.cabinet_dir = basedir.join('cabinet') self.redis = Redis.new self.redis_queue = 'jobs' + self.grpc_port = 'localhost:4195' end end end diff --git a/lib/kaede/dbus.rb b/lib/kaede/dbus.rb deleted file mode 100644 index 5913e32..0000000 --- a/lib/kaede/dbus.rb +++ /dev/null @@ -1,5 +0,0 @@ -module Kaede - module DBus - DESTINATION = 'cc.wanko.kaede1' - end -end diff --git a/lib/kaede/dbus/generator.rb b/lib/kaede/dbus/generator.rb deleted file mode 100644 index 1c35250..0000000 --- a/lib/kaede/dbus/generator.rb +++ /dev/null @@ -1,32 +0,0 @@ -require 'nokogiri' -require 'kaede/dbus' - -module Kaede - module DBus - class Generator - def generate_policy(user) - Nokogiri::XML::Builder.new do |xml| - xml.comment 'Put this policy configuration file into /etc/dbus-1/system.d' - xml.doc.create_internal_subset( - 'busconfig', - '-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN', - 'http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd', - ) - xml.busconfig do - xml.policy(user: 'root') do - xml.allow(own: DESTINATION) - end - xml.policy(user: user) do - xml.allow(own: DESTINATION) - end - - xml.policy(context: 'default') do - xml.allow(send_destination: DESTINATION) - xml.allow(receive_sender: DESTINATION) - end - end - end.to_xml - end - end - end -end diff --git a/lib/kaede/dbus/main.rb b/lib/kaede/dbus/main.rb deleted file mode 100644 index 6747723..0000000 --- a/lib/kaede/dbus/main.rb +++ /dev/null @@ -1,73 +0,0 @@ -# Based on DBus::Main in ruby-dbus gem. -# Stop the main loop as quick as possible. -require 'sleepy_penguin' - -module Kaede - module DBus - class Main - def initialize - @buses = Hash.new - @quit_event = SleepyPenguin::EventFD.new(0, :SEMAPHORE) - end - - def <<(bus) - @buses[bus.message_queue.socket] = bus - end - - def quit - @quit_event.incr(1) - end - - def loop - flush_buffers - - epoll = prepare_epoll - begin - while !@buses.empty? - epoll.wait do |events, io| - if io == @quit_event - io.value - return - end - handle_socket(io) - end - end - ensure - epoll.close - end - end - - def handle_socket(socket) - bus = @buses[socket] - begin - bus.message_queue.buffer_from_socket_nonblock - rescue EOFError, SystemCallError - @buses.delete(socket) # this bus died - return - end - while message = bus.message_queue.message_from_buffer_nonblock - bus.process(message) - end - end - - def flush_buffers - # before blocking, empty the buffers - # https://bugzilla.novell.com/show_bug.cgi?id=537401 - @buses.each_value do |b| - while m = b.message_queue.message_from_buffer_nonblock - b.process(m) - end - end - end - - def prepare_epoll - SleepyPenguin::Epoll.new.tap do |epoll| - epoll.add(@quit_event, [:IN]) - @buses.each_key do |socket| - epoll.add(socket, [:IN]) - end - end - end - end - end -end diff --git a/lib/kaede/dbus/program.rb b/lib/kaede/dbus/program.rb deleted file mode 100644 index 553ffd0..0000000 --- a/lib/kaede/dbus/program.rb +++ /dev/null @@ -1,65 +0,0 @@ -require 'dbus' -require 'nokogiri' -require 'time' -require 'kaede/dbus/properties' - -module Kaede - module DBus - class Program < ::DBus::Object - PATH = '/cc/wanko/kaede1/program' - PROPERTY_INTERFACE = 'org.freedesktop.DBus.Properties' - INTROSPECT_INTERFACE = 'org.freedesktop.DBus.Introspectable' - PROGRAM_INTERFACE = 'cc.wanko.kaede1.Program' - - def initialize(program, enqueued_at) - super("#{PATH}/#{program.pid}") - @program = program - @enqueued_at = enqueued_at - end - - include Properties - properties_for PROGRAM_INTERFACE, :properties - define_properties - - def to_xml - Nokogiri::XML::Builder.new do |xml| - xml.doc.create_internal_subset( - 'node', - '-//freedesktop//DTD D-BUS Object Introspection 1.0//EN', - 'http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd', - ) - xml.node do - xml.interface(name: INTROSPECT_INTERFACE) do - xml.method_(name: 'Introspect') do - xml.arg(name: 'data', direction: 'out', type: 's') - end - end - - xml_for_properties(xml) - end - end.to_xml - end - - dbus_interface PROGRAM_INTERFACE do - end - - def properties - @properties ||= { - 'Pid' => @program.pid, - 'Tid' => @program.tid, - 'StartTime' => @program.start_time.iso8601, - 'EndTime' => @program.end_time.iso8601, - 'ChannelName' => @program.channel_name, - 'ChannelForSyoboi' => @program.channel_for_syoboi, - 'ChannelForRecorder' => @program.channel_for_recorder, - 'Count' => @program.count, - 'StartOffset' => @program.start_offset, - 'SubTitle' => @program.subtitle, - 'Title' => @program.title, - 'Comment' => @program.comment, - 'EnqueuedAt' => @enqueued_at.iso8601, - } - end - end - end -end diff --git a/lib/kaede/dbus/properties.rb b/lib/kaede/dbus/properties.rb deleted file mode 100644 index 5e8e060..0000000 --- a/lib/kaede/dbus/properties.rb +++ /dev/null @@ -1,96 +0,0 @@ -module Kaede - module DBus - module Properties - PROPERTY_INTERFACE = 'org.freedesktop.DBus.Properties' - - def self.included(base) - base.extend(ClassMethods) - end - - module ClassMethods - def properties_method - @properties_method ||= {} - end - - def properties_for(iface, method_sym) - self.properties_method[iface] = method_sym - end - - def define_properties - dbus_interface PROPERTY_INTERFACE do - dbus_method :Get, 'in interface:s, in property:s, out value:v' do |iface, prop| - get_property(iface, prop) - end - - dbus_method :GetAll, 'in interface:s, out properties:a{sv}' do |iface| - get_properties(iface) - end - - dbus_method :Set, 'in interface:s, in property:s, in value:v' do |iface, prop, val| - raise_access_denied! - end - end - end - end - - def xml_for_dbus_properties(xml) - xml.interface(name: PROPERTY_INTERFACE) do - xml.method_(name: 'GetAll') do - xml.arg(name: 'interface', direction: 'in', type: 's') - xml.arg(name: 'properties', direction: 'out', type: 'a{sv}') - end - - helper = lambda do |verb, dir| - xml.method_(name: verb) do - xml.arg(name: 'interface', direction: 'in', type: 's') - xml.arg(name: 'property', direction: 'in', type: 's') - xml.arg(name: 'value', direction: dir, type: 'v') - end - end - helper.call('Get', 'out') - helper.call('Set', 'in') - end - end - - def xml_for_properties(xml) - xml_for_dbus_properties(xml) - self.class.properties_method.each do |iface, method_sym| - xml.interface(name: iface) do - send(method_sym).each_key do |key| - xml.property(name: key, type: 's', access: 'read') - end - end - end - end - - def get_property(iface, prop) - props = get_properties(iface).first - if props.has_key?(prop) - [props[prop]] - else - raise_unknown_property! - end - end - - def get_properties(iface) - if sym = self.class.properties_method[iface] - [send(sym)] - else - unknown_interface! - end - end - - def raise_unknown_interface! - raise ::DBus.error('org.freedesktop.DBus.Error.UnknownInterface') - end - - def raise_unknown_property! - raise ::DBus.error('org.freedesktop.DBus.Error.UnknownProperty') - end - - def raise_access_denied! - raise ::DBus.error('org.freedesktop.DBus.Error.AccessDenied') - end - end - end -end diff --git a/lib/kaede/dbus/scheduler.rb b/lib/kaede/dbus/scheduler.rb deleted file mode 100644 index ddbafde..0000000 --- a/lib/kaede/dbus/scheduler.rb +++ /dev/null @@ -1,28 +0,0 @@ -require 'dbus' - -module Kaede - module DBus - class Scheduler < ::DBus::Object - PATH = '/cc/wanko/kaede1/scheduler' - SCHEDULER_INTERFACE = 'cc.wanko.kaede1.Scheduler' - - def initialize(reload_event, stop_event) - super(PATH) - @reload_event = reload_event - @stop_event = stop_event - end - - dbus_interface SCHEDULER_INTERFACE do - dbus_method :Reload do - @reload_event.incr(1) - nil - end - - dbus_method :Stop do - @stop_event.incr(1) - nil - end - end - end - end -end diff --git a/lib/kaede/grpc/kaede.proto b/lib/kaede/grpc/kaede.proto new file mode 100644 index 0000000..d7ab186 --- /dev/null +++ b/lib/kaede/grpc/kaede.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package kaede.grpc; + +import "google/protobuf/timestamp.proto"; + +service Scheduler { + rpc Reload(SchedulerReloadInput) returns (SchedulerReloadOutput) {} + rpc Stop(SchedulerStopInput) returns (SchedulerStopOutput) {} + rpc GetPrograms(GetProgramsInput) returns (GetProgramsOutput) {} +} + +message SchedulerReloadInput {} +message SchedulerReloadOutput {} +message SchedulerStopInput {} +message SchedulerStopOutput {} +message GetProgramsInput {} +message GetProgramsOutput { + repeated Program programs = 1; +} + +message Program { + uint32 pid = 1; + uint32 tid = 2; + google.protobuf.Timestamp start_time = 3; + google.protobuf.Timestamp end_time = 4; + string channel_name = 5; + uint32 channel_for_syoboi = 6; + uint32 channel_for_recorder = 7; + string count = 8; + int32 start_offset = 9; + string subtitle = 10; + string title = 11; + string comment = 12; + google.protobuf.Timestamp enqueued_at = 13; +} diff --git a/lib/kaede/grpc/kaede_pb.rb b/lib/kaede/grpc/kaede_pb.rb new file mode 100644 index 0000000..724571d --- /dev/null +++ b/lib/kaede/grpc/kaede_pb.rb @@ -0,0 +1,48 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: kaede/grpc/kaede.proto + +require 'google/protobuf' + +require 'google/protobuf/timestamp_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "kaede.grpc.SchedulerReloadInput" do + end + add_message "kaede.grpc.SchedulerReloadOutput" do + end + add_message "kaede.grpc.SchedulerStopInput" do + end + add_message "kaede.grpc.SchedulerStopOutput" do + end + add_message "kaede.grpc.GetProgramsInput" do + end + add_message "kaede.grpc.GetProgramsOutput" do + repeated :programs, :message, 1, "kaede.grpc.Program" + end + add_message "kaede.grpc.Program" do + optional :pid, :uint32, 1 + optional :tid, :uint32, 2 + optional :start_time, :message, 3, "google.protobuf.Timestamp" + optional :end_time, :message, 4, "google.protobuf.Timestamp" + optional :channel_name, :string, 5 + optional :channel_for_syoboi, :uint32, 6 + optional :channel_for_recorder, :uint32, 7 + optional :count, :string, 8 + optional :start_offset, :int32, 9 + optional :subtitle, :string, 10 + optional :title, :string, 11 + optional :comment, :string, 12 + optional :enqueued_at, :message, 13, "google.protobuf.Timestamp" + end +end + +module Kaede + module Grpc + SchedulerReloadInput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.SchedulerReloadInput").msgclass + SchedulerReloadOutput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.SchedulerReloadOutput").msgclass + SchedulerStopInput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.SchedulerStopInput").msgclass + SchedulerStopOutput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.SchedulerStopOutput").msgclass + GetProgramsInput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.GetProgramsInput").msgclass + GetProgramsOutput = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.GetProgramsOutput").msgclass + Program = Google::Protobuf::DescriptorPool.generated_pool.lookup("kaede.grpc.Program").msgclass + end +end diff --git a/lib/kaede/grpc/kaede_services_pb.rb b/lib/kaede/grpc/kaede_services_pb.rb new file mode 100644 index 0000000..e81cab3 --- /dev/null +++ b/lib/kaede/grpc/kaede_services_pb.rb @@ -0,0 +1,26 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: kaede/grpc/kaede.proto for package 'kaede.grpc' + +require 'grpc' +require 'kaede/grpc/kaede_pb' + +module Kaede + module Grpc + module Scheduler + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'kaede.grpc.Scheduler' + + rpc :Reload, SchedulerReloadInput, SchedulerReloadOutput + rpc :Stop, SchedulerStopInput, SchedulerStopOutput + rpc :GetPrograms, GetProgramsInput, GetProgramsOutput + end + + Stub = Service.rpc_stub_class + end + end +end diff --git a/lib/kaede/scheduler.rb b/lib/kaede/scheduler.rb index 555ef2a..72bc551 100644 --- a/lib/kaede/scheduler.rb +++ b/lib/kaede/scheduler.rb @@ -1,11 +1,8 @@ -require 'dbus' require 'thread' require 'sleepy_penguin' -require 'kaede/dbus' -require 'kaede/dbus/main' -require 'kaede/dbus/program' -require 'kaede/dbus/scheduler' +require 'logging' require 'kaede/notifier' +require 'kaede/scheduler_service' module Kaede module Scheduler @@ -77,14 +74,14 @@ def prepare_epoll def start_epoll epoll = prepare_epoll puts "Loaded #{@timerfds.size} schedules" - start_dbus + start_grpc catch(:reload) do epoll_loop(epoll) end ensure epoll.close - stop_dbus + stop_grpc end def epoll_loop(epoll) @@ -110,70 +107,32 @@ def epoll_loop(epoll) end end - def start_dbus - bus = ::DBus.system_bus - service = bus.request_service(DBus::DESTINATION) - dbus_export_programs(service) - service.export(DBus::Scheduler.new(@reload_event, @stop_event)) - - @dbus_thread = start_dbus_loop(bus) + def start_grpc + service = SchedulerService.new(@reload_event, @stop_event) + load_grpc_programs(service) + @grpc_thread = start_grpc_loop(service) end - def dbus_export_programs(service) + def load_grpc_programs(service) programs = @db.get_programs(@timerfds.values.map { |_, pid| pid }) + now = Time.now @timerfds.each_value do |tfd, pid| _, value = tfd.gettime program = programs[pid] - obj = DBus::Program.new(program, Time.now + value) - service.export(obj) - - # ruby-dbus doesn't emit properties when Introspect is requested. - # Kaede manually creates Introspect XML so that `gdbus introspect` outputs properties. - node = service.get_node(obj.path) - node.singleton_class.class_eval do - define_method :to_xml do - obj.to_xml - end - end + service.add_program(programs[pid], now + value) end end - def start_dbus_loop(bus) - @dbus_main = DBus::Main.new - @dbus_main << bus - Thread.start do - max_retries = 10 - retries = 0 - begin - @dbus_main.loop - rescue ::DBus::Connection::NameRequestError => e - puts "#{e.class}: #{e.message}" - if retries < max_retries - retries += 1 - sleep 1 - retry - end - end - end + def start_grpc_loop(service) + @rpc_server = GRPC::RpcServer.new + @rpc_server.add_http2_port(Kaede.config.grpc_port, :this_port_is_insecure) + @rpc_server.handle(service) + Thread.start { @rpc_server.run } end - DBUS_STOP_TIMEOUT = 5 - def stop_dbus - return unless @dbus_main - @dbus_main.quit - begin - unless @dbus_thread.join(DBUS_STOP_TIMEOUT) - @dbus_thread.kill - end - rescue Exception => e - $stderr.puts "Exception on DBus thread: #{e.class}: #{e.message}" - e.backtrace.each do |bt| - $stderr.puts " #{bt}" - end - end - @dbus_main = nil - @dbus_thread = nil - ::DBus.system_bus.proxy.ReleaseName(DBus::DESTINATION) + def stop_grpc + @rpc_server.stop + @grpc_thread.join end def spawn_recorder(pid) diff --git a/lib/kaede/scheduler_service.rb b/lib/kaede/scheduler_service.rb new file mode 100644 index 0000000..c3fe33a --- /dev/null +++ b/lib/kaede/scheduler_service.rb @@ -0,0 +1,54 @@ +require 'kaede/grpc/kaede_services_pb' + +module Kaede + class SchedulerService < Grpc::Scheduler::Service + def initialize(reload_event, stop_event) + super() + @reload_event = reload_event + @stop_event = stop_event + @programs = [] + end + + def reload(_input, _call) + @reload_event.incr(1) + Grpc::SchedulerReloadOutput.new + end + + def stop(_input, _call) + @stop_event.incr(1) + Grpc::SchedulerStopOutput.new + end + + def get_programs(_input, _call) + Grpc::GetProgramsOutput.new( + programs: @programs, + ) + end + + def add_program(program, enqueued_at) + @programs.push( + Grpc::Program.new( + pid: program.pid, + tid: program.tid, + start_time: encode_timestamp(program.start_time), + end_time: encode_timestamp(program.end_time), + channel_name: program.channel_name, + channel_for_syoboi: program.channel_for_syoboi, + channel_for_recorder: program.channel_for_recorder, + count: program.count, + start_offset: program.start_offset, + subtitle: program.subtitle, + title: program.title, + comment: program.comment, + enqueued_at: encode_timestamp(enqueued_at), + ) + ) + end + + private + + def encode_timestamp(time) + Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec) + end + end +end diff --git a/lib/kaede/updater.rb b/lib/kaede/updater.rb index 54a6c5b..249eac3 100644 --- a/lib/kaede/updater.rb +++ b/lib/kaede/updater.rb @@ -1,7 +1,5 @@ -require 'dbus' require 'set' -require 'kaede/dbus' -require 'kaede/dbus/scheduler' +require 'kaede/grpc/kaede_services_pb' module Kaede class Updater @@ -52,10 +50,8 @@ def update_job_for(program) end def reload_scheduler - service = ::DBus.system_bus.service(DBus::DESTINATION) - scheduler = service.object(DBus::Scheduler::PATH) - scheduler.introspect - scheduler.Reload + stub = Grpc::Scheduler::Stub.new(Kaede.config.grpc_port, :this_channel_is_insecure) + stub.reload(Grpc::SchedulerReloadInput.new) end end end diff --git a/spec/kaede/scheduler_spec.rb b/spec/kaede/scheduler_spec.rb index 8eead3c..2aba9bd 100644 --- a/spec/kaede/scheduler_spec.rb +++ b/spec/kaede/scheduler_spec.rb @@ -6,7 +6,6 @@ require 'kaede/database' require 'kaede/recorder' require 'kaede/scheduler' -require 'kaede/dbus' describe Kaede::Scheduler do let(:db) { Kaede::Database.new(DatabaseHelper.database_url) }