From a0597f0802babe0f9d5e9f3abb3c22bf9a44c5e1 Mon Sep 17 00:00:00 2001 From: Nathan Hopkins Date: Thu, 17 May 2018 17:18:53 -0600 Subject: [PATCH] Add support for tranferring thread variables --- Rakefile | 11 +--- bg.gemspec | 2 +- lib/bg.rb | 10 ++-- lib/bg/asyncable.rb | 81 +++++++++++++------------ lib/bg/deferrable.rb | 88 ++++++++++++++-------------- lib/bg/deferred_method_call_job.rb | 10 ++-- lib/bg/thread_variables.rb | 23 ++++++++ test/bg/asyncable_test.rb | 8 +-- test/bg/deferrable_test.rb | 42 +++++++------ test/bg/deferred_method_call_test.rb | 65 ++++++++++---------- test/test_helper.rb | 33 ++++++++--- 11 files changed, 203 insertions(+), 170 deletions(-) create mode 100644 lib/bg/thread_variables.rb diff --git a/Rakefile b/Rakefile index 620b4ce..dafa807 100644 --- a/Rakefile +++ b/Rakefile @@ -1,12 +1,7 @@ require "bundler/gem_tasks" -require "rake/testtask" -Rake::TestTask.new do |t| - t.test_files = FileList["tests/**/test_*.rb"] - t.libs.push "test" - t.pattern = "test/**/*_test.rb" - t.warning = true - t.verbose = true +task :test do + exec "bundle exec pry-test test/bg" end -task default: :test +task :default => :test diff --git a/bg.gemspec b/bg.gemspec index acdb588..a993a3f 100644 --- a/bg.gemspec +++ b/bg.gemspec @@ -18,9 +18,9 @@ Gem::Specification.new do |gem| gem.add_dependency "concurrent-ruby", ">= 1.0" gem.add_development_dependency "rake" - gem.add_development_dependency "purdytest" gem.add_development_dependency "coveralls" gem.add_development_dependency "pry" gem.add_development_dependency "pry-nav" gem.add_development_dependency "pry-stack_explorer" + gem.add_development_dependency "pry-test" end diff --git a/lib/bg.rb b/lib/bg.rb index 3e2d7cf..96a3bcc 100644 --- a/lib/bg.rb +++ b/lib/bg.rb @@ -1,6 +1,4 @@ -require "bg/version" -require "bg/asyncable" -require "bg/deferrable" - -module Bg -end +require_relative "bg/version" +require_relative "bg/thread_variables" +require_relative "bg/asyncable" +require_relative "bg/deferrable" diff --git a/lib/bg/asyncable.rb b/lib/bg/asyncable.rb index 97b43a8..91ddb44 100644 --- a/lib/bg/asyncable.rb +++ b/lib/bg/asyncable.rb @@ -1,57 +1,60 @@ require "active_record" require "concurrent" -module Bg - class Asyncable - class Wrapper - include ::Concurrent::Async - - def initialize(object, wait: 0) - # IMPORTANT: call super without any arguments - # https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html - super() - @object = object - @wait = wait.to_f - end +class Bg::Asyncable + include Bg::ThreadVariables + + class Wrapper + include Bg::ThreadVariables + include Concurrent::Async - def invoke_method(name, *args) - sleep @wait if @wait > 0 - base = self.is_a?(::ActiveRecord::Base) ? self.class : ::ActiveRecord::Base + def initialize(object, wait: 0) + # IMPORTANT: call super without any arguments + # https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html + super() + @object = object + @wait = wait.to_f + end + + def invoke_method(name, args, thread_variables) + sleep @wait if @wait > 0 + base = self.is_a?(ActiveRecord::Base) ? self.class : ActiveRecord::Base + with_thread_variables do base.connection_pool.with_connection do @object.send name, *args end end end + end - module Behavior - def async(wait: 0) - ::Bg::Asyncable.new(self, wait: wait.to_f) - end + module Behavior + def async(wait: 0) + Bg::Asyncable.new(self, wait: wait.to_f) end + end - def initialize(object, wait: 0) - @object = object - @wait = wait.to_f - end + def initialize(object, wait: 0) + @object = object + @wait = wait.to_f + end - def method_missing(name, *args) - if @object.respond_to? name - raise ::ArgumentError.new("blocks are not supported") if block_given? - begin - wrapped = ::Bg::Asyncable::Wrapper.new(@object, wait: @wait) - wrapped.async.invoke_method name, *args - rescue ::StandardError => e - raise ::ArgumentError.new("Failed to execute method asynchronously! <#{@object.class.name}##{name}> #{e.message}") - ensure - return - end + def method_missing(name, *args) + if @object.respond_to? name + raise ArgumentError.new("blocks are not supported") if block_given? + begin + wrapped = Bg::Asyncable::Wrapper.new(@object, wait: @wait) + wrapped.async.invoke_method name, *args, thread_variables + rescue StandardError => e + raise ArgumentError.new("Failed to execute method asynchronously! <#{@object.class.name}##{name}> #{e.message}") + ensure + return end - super end + super + end - def respond_to?(name) - return true if @object.respond_to? name - super - end + def respond_to?(name) + return true if @object.respond_to? name + super end end diff --git a/lib/bg/deferrable.rb b/lib/bg/deferrable.rb index d8cd7bf..1200f8a 100644 --- a/lib/bg/deferrable.rb +++ b/lib/bg/deferrable.rb @@ -1,58 +1,56 @@ require "globalid" -require "bg/deferred_method_call_job" +require_relative "deferred_method_call_job" -module Bg - class Deferrable - module Behavior - # Enqueues the method call to be executed by a DeferredMethodCallJob background worker. - def defer(queue: :default, wait: 0) - ::Bg::Deferrable.new self, queue: queue, wait: wait - end +class Bg::Deferrable + module Behavior + # Enqueues the method call to be executed by a DeferredMethodCallJob background worker. + def defer(queue: :default, wait: 0) + Bg::Deferrable.new self, queue: queue, wait: wait end + end - def self.make_enqueable(value) - case value - when ::Hash then - value.each.with_object({}) do |(key, val), memo| - memo[key.to_s] = make_enqueable(val) - end - when ::Array then - value.map { |val| make_enqueable val } - when ::Symbol then - value.to_s - when ::Date, ::Time, ::DateTime then - value.respond_to?(:iso8601) ? value.iso8601 : value.to_s - else - value + def self.make_enqueable(value) + case value + when Hash then + value.each.with_object({}) do |(key, val), memo| + memo[key.to_s] = make_enqueable(val) end + when Array then + value.map { |val| make_enqueable val } + when Symbol then + value.to_s + when Date, Time, DateTime then + value.respond_to?(:iso8601) ? value.iso8601 : value.to_s + else + value end + end - def initialize(object, queue: :default, wait: 0) - raise ::ArgumentError unless object.is_a?(::GlobalID::Identification) - @object = object - @queue = queue || :default - @wait = wait.to_i - end + def initialize(object, queue: :default, wait: 0) + raise ArgumentError unless object.is_a?(GlobalID::Identification) + @object = object + @queue = queue || :default + @wait = wait.to_i + end - def method_missing(name, *args) - if @object.respond_to? name - raise ::ArgumentError.new("blocks are not supported") if block_given? - begin - queue_args = { queue: @queue } - queue_args[:wait] = @wait if @wait > 0 - job = ::Bg::DeferredMethodCallJob.set(**queue_args).perform_later @object, name.to_s, *self.class.make_enqueable(args) - rescue ::StandardError => e - raise ::ArgumentError.new("Failed to background method call! <#{@object.class.name}##{name}> #{e.message}") - ensure - return job - end + def method_missing(name, *args) + if @object.respond_to? name + raise ArgumentError.new("blocks are not supported") if block_given? + begin + queue_args = { queue: @queue } + queue_args[:wait] = @wait if @wait > 0 + job = Bg::DeferredMethodCallJob.set(**queue_args).perform_later @object, name.to_s, *self.class.make_enqueable(args) + rescue StandardError => e + raise ArgumentError.new("Failed to background method call! <#{@object.class.name}##{name}> #{e.message}") + ensure + return job end - super end + super + end - def respond_to?(name) - return true if @object.respond_to? name - super - end + def respond_to?(name) + return true if @object.respond_to? name + super end end diff --git a/lib/bg/deferred_method_call_job.rb b/lib/bg/deferred_method_call_job.rb index 3a9587e..db021cc 100644 --- a/lib/bg/deferred_method_call_job.rb +++ b/lib/bg/deferred_method_call_job.rb @@ -1,11 +1,9 @@ require "active_job" -module Bg - class DeferredMethodCallJob < ::ActiveJob::Base - queue_as :default +class Bg::DeferredMethodCallJob < ActiveJob::Base + queue_as :default - def perform(object, method, *args) - object.send method, *args - end + def perform(object, method, *args) + object.send method, *args end end diff --git a/lib/bg/thread_variables.rb b/lib/bg/thread_variables.rb new file mode 100644 index 0000000..6b6ab4a --- /dev/null +++ b/lib/bg/thread_variables.rb @@ -0,0 +1,23 @@ +module Bg::ThreadVariables + def get_thread_variables + Thread.current.thread_variables.each_with_object({}) do |key, memo| + memo[key] = Thread.current.thread_variable_get(key) + end + end + + def set_thread_variables(thread_variables={}) + thread_variables.each do |key, value| + Thread.current.thread_variable_set key, value + end + end + + def with_thread_variables(thread_variables={}) + previous_thread_variables = get_thread_variables + set_thread_variables thread_variables + begin + yield + ensure + set_thread_variables previous_thread_variables + end + end +end diff --git a/test/bg/asyncable_test.rb b/test/bg/asyncable_test.rb index d1a1b4b..6ae5964 100644 --- a/test/bg/asyncable_test.rb +++ b/test/bg/asyncable_test.rb @@ -1,13 +1,11 @@ require_relative "../test_helper" -class Bg::AsyncableTest < ::ActiveSupport::TestCase - +class Bg::AsyncableTest < PryTest::Test test "slow io bound method invocations run in parallel" do - obj = ::Bg::BackgroundableObject.new(:example) - obj.eigen.send :include, ::Bg::Asyncable::Behavior + obj = Bg::BackgroundableObject.new(:example) + obj.eigen.send :include, Bg::Asyncable::Behavior start = Time.now 10.times { obj.async.wait 1 } assert (Time.now - start) <= 1.1 end - end diff --git a/test/bg/deferrable_test.rb b/test/bg/deferrable_test.rb index ae0e74d..c002e14 100644 --- a/test/bg/deferrable_test.rb +++ b/test/bg/deferrable_test.rb @@ -1,72 +1,71 @@ require_relative "../test_helper" -class Bg::DeferrableTest < ::ActiveSupport::TestCase - - setup do - @deferrable = ::Bg::Deferrable.new(::Bg::BackgroundableObject.new(:example)) +class Bg::DeferrableTest < PryTest::Test + before do + @deferrable = Bg::Deferrable.new(Bg::BackgroundableObject.new(:example)) end test ".make_enqueable with Symbol" do - value = ::Bg::Deferrable.make_enqueable(:foo) + value = Bg::Deferrable.make_enqueable(:foo) assert value == "foo" end test ".make_enqueable with Date" do - date = ::Date.today - value = ::Bg::Deferrable.make_enqueable(date) + date = Date.today + value = Bg::Deferrable.make_enqueable(date) assert value == date.iso8601 end test ".make_enqueable with Time" do - time = ::Time.now - value = ::Bg::Deferrable.make_enqueable(time) + time = Time.now + value = Bg::Deferrable.make_enqueable(time) assert value == time.iso8601 end test ".make_enqueable with DateTime" do - date_time = ::DateTime.now - value = ::Bg::Deferrable.make_enqueable(date_time) + date_time = DateTime.now + value = Bg::Deferrable.make_enqueable(date_time) assert value == date_time.iso8601 end test ".make_enqueable with Array" do - date_time = ::DateTime.now + date_time = DateTime.now list = [:foo, "bar", true, date_time] - value = ::Bg::Deferrable.make_enqueable(list) + value = Bg::Deferrable.make_enqueable(list) assert value == ["foo", "bar", true, date_time.iso8601] end test ".make_enqueable with nested Array" do - date_time = ::DateTime.now + date_time = DateTime.now list = [:foo, "bar", true, date_time] list << list.dup - value = ::Bg::Deferrable.make_enqueable(list) + value = Bg::Deferrable.make_enqueable(list) expected = ["foo", "bar", true, date_time.iso8601] expected << expected.dup assert value == expected end test ".make_enqueable with Hash" do - date_time = ::DateTime.now + date_time = DateTime.now hash = { a: :foo, b: "bar", c: true, d: date_time } - value = ::Bg::Deferrable.make_enqueable(hash) + value = Bg::Deferrable.make_enqueable(hash) assert value == { "a" => "foo", "b" => "bar", "c" => true, "d" => date_time.iso8601 } end test ".make_enqueable with nested Hash" do - date_time = ::DateTime.now + date_time = DateTime.now hash = { a: :foo, b: "bar", c: true, d: date_time } hash[:e] = hash.dup - value = ::Bg::Deferrable.make_enqueable(hash) + value = Bg::Deferrable.make_enqueable(hash) expected = { "a" => "foo", "b" => "bar", "c" => true, "d" => date_time.iso8601 } expected["e"] = expected.dup assert value == expected end test ".make_enqueable with complex Hash" do - time = ::Time.now + time = Time.now hash = { a: :foo, b: time, c: { a: :bar, b: time.dup }, d: [:baz, time.dup, {a: :wat, b: time.dup, c: [a: time.dup]}] } - value = ::Bg::Deferrable.make_enqueable(hash) + value = Bg::Deferrable.make_enqueable(hash) expected = { "a" => "foo", "b" => time.iso8601, @@ -75,5 +74,4 @@ class Bg::DeferrableTest < ::ActiveSupport::TestCase } assert value == expected end - end diff --git a/test/bg/deferred_method_call_test.rb b/test/bg/deferred_method_call_test.rb index 081f349..bbb8c9a 100644 --- a/test/bg/deferred_method_call_test.rb +++ b/test/bg/deferred_method_call_test.rb @@ -1,44 +1,49 @@ require_relative "../test_helper" -require "active_job/test_helper" - -class Bg::DeferredMethodCallJobTest < ::ActiveJob::TestCase +class Bg::DeferredMethodCallJobTest < PryTest::Test test "enqueues with no args" do - assert_enqueued_with job: ::Bg::DeferredMethodCallJob do - obj = ::Bg::BackgroundableObject.new(:example) - obj.eigen.send :include, ::Bg::Deferrable::Behavior - obj.defer.update - end + obj = Bg::BackgroundableObject.new(:example) + obj.eigen.send :include, Bg::Deferrable::Behavior + object, method, args = obj.defer.update + assert object == obj + assert method == "update" + assert args == [] end test "enqueues with simple args" do - assert_enqueued_with job: ::Bg::DeferredMethodCallJob do - obj = ::Bg::BackgroundableObject.new(:example) - obj.eigen.send :include, ::Bg::Deferrable::Behavior - obj.defer.update foo: true, bar: "baz" - end + obj = Bg::BackgroundableObject.new(:example) + obj.eigen.send :include, Bg::Deferrable::Behavior + object, method, args = obj.defer.update(foo: true, bar: "baz") + assert object == obj + assert method == "update" + assert args == [{"foo" => true, "bar" => "baz"}] end test "enqueues with globalid args" do - assert_enqueued_with job: ::Bg::DeferredMethodCallJob do - parent = ::Bg::BackgroundableObject.new(:parent) - parent.eigen.send :include, ::Bg::Deferrable::Behavior - parent.defer.update child: ::Bg::BackgroundableObject.new(:child) - end + parent = Bg::BackgroundableObject.new(:parent) + child = Bg::BackgroundableObject.new(:child) + parent.eigen.send :include, Bg::Deferrable::Behavior + object, method, args = parent.defer.update(child: child) + assert object == parent + assert method == "update" + assert args == [{"child"=>"gid://test/Bg::BackgroundableObject/child"}] end test "enqueues with complex args" do - assert_enqueued_with job: ::Bg::DeferredMethodCallJob do - parent = ::Bg::BackgroundableObject.new(:parent) - parent.eigen.send :include, ::Bg::Deferrable::Behavior - parent.defer.update children: [::Bg::BackgroundableObject.new(:child1), ::Bg::BackgroundableObject.new(:child2)], - foo: { bar: [:baz, Date.new, Time.new, DateTime.new] } - end - end - - test "#perform_now properly invokes the method" do - obj = ::Bg::BackgroundableObject.new(:example) - assert ::Bg::DeferredMethodCallJob.perform_now(obj, :update) + parent = Bg::BackgroundableObject.new(:parent) + parent.eigen.send :include, Bg::Deferrable::Behavior + a = Date.new + b = Time.new + c = DateTime.new + object, method, args = parent.defer.update( + children: [Bg::BackgroundableObject.new(:child1), Bg::BackgroundableObject.new(:child2)], + foo: { bar: [:baz, a, b, c] } + ) + assert object == parent + assert method == "update" + assert args == [{ + "children"=>["gid://test/Bg::BackgroundableObject/child1", "gid://test/Bg::BackgroundableObject/child2"], + "foo"=>{"bar"=>["baz", a.iso8601, b.iso8601, c.iso8601]} + }] end - end diff --git a/test/test_helper.rb b/test/test_helper.rb index 6c7a51b..313ced6 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,16 +1,11 @@ +require "pry-test" require "coveralls" Coveralls.wear! require_relative "../lib/bg" require_relative "backgroundable_object" -require "minitest/autorun" -require "purdytest" -require "pry" -require "pry-nav" -require "pry-stack_explorer" -::ActiveSupport::TestCase.test_order = :random -::GlobalID.app = "test" -::ActiveRecord::Base = Class.new do +GlobalID.app = "test" +ActiveRecord::Base = Class.new do def connection_pool Class.new do def with_connection @@ -19,3 +14,25 @@ def with_connection end end end + +class Bg::DeferredMethodCallJob + class << self + def set(**args) + self + end + + def perform_later(object, method, *args) + [object, method, args.map { |arg| argument arg }] + end + + private + + def argument(arg) + case arg + when Array then arg.map { |a| argument a } + when Hash then arg.each_with_object({}) { |(key, val), memo| memo[key] = argument(val) } + else arg.respond_to?(:to_gid) ? arg.to_gid.to_s : arg + end + end + end +end