Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tranferring thread variables #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
require "bundler/gem_tasks"
require "rake/testtask"

Rake::TestTask.new do |t|
t.test_files = FileList["tests/**/test_*.rb"]
t.libs.push "test"
t.pattern = "test/**/*_test.rb"
t.warning = true
t.verbose = true
task :test do
exec "bundle exec pry-test test/bg"
end

task default: :test
task :default => :test
2 changes: 1 addition & 1 deletion bg.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Gem::Specification.new do |gem|
gem.add_dependency "concurrent-ruby", ">= 1.0"

gem.add_development_dependency "rake"
gem.add_development_dependency "purdytest"
gem.add_development_dependency "coveralls"
gem.add_development_dependency "pry"
gem.add_development_dependency "pry-nav"
gem.add_development_dependency "pry-stack_explorer"
gem.add_development_dependency "pry-test"
end
10 changes: 4 additions & 6 deletions lib/bg.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
require "bg/version"
require "bg/asyncable"
require "bg/deferrable"

module Bg
end
require_relative "bg/version"
require_relative "bg/thread_variables"
require_relative "bg/asyncable"
require_relative "bg/deferrable"
81 changes: 42 additions & 39 deletions lib/bg/asyncable.rb
Original file line number Diff line number Diff line change
@@ -1,57 +1,60 @@
require "active_record"
require "concurrent"

module Bg
class Asyncable
class Wrapper
include ::Concurrent::Async

def initialize(object, wait: 0)
# IMPORTANT: call super without any arguments
# https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html
super()
@object = object
@wait = wait.to_f
end
class Bg::Asyncable
include Bg::ThreadVariables

class Wrapper
include Bg::ThreadVariables
include Concurrent::Async

def invoke_method(name, *args)
sleep @wait if @wait > 0
base = self.is_a?(::ActiveRecord::Base) ? self.class : ::ActiveRecord::Base
def initialize(object, wait: 0)
# IMPORTANT: call super without any arguments
# https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html
super()
@object = object
@wait = wait.to_f
end

def invoke_method(name, args, thread_variables)
sleep @wait if @wait > 0
base = self.is_a?(ActiveRecord::Base) ? self.class : ActiveRecord::Base
with_thread_variables do
base.connection_pool.with_connection do
@object.send name, *args
end
end
end
end

module Behavior
def async(wait: 0)
::Bg::Asyncable.new(self, wait: wait.to_f)
end
module Behavior
def async(wait: 0)
Bg::Asyncable.new(self, wait: wait.to_f)
end
end

def initialize(object, wait: 0)
@object = object
@wait = wait.to_f
end
def initialize(object, wait: 0)
@object = object
@wait = wait.to_f
end

def method_missing(name, *args)
if @object.respond_to? name
raise ::ArgumentError.new("blocks are not supported") if block_given?
begin
wrapped = ::Bg::Asyncable::Wrapper.new(@object, wait: @wait)
wrapped.async.invoke_method name, *args
rescue ::StandardError => e
raise ::ArgumentError.new("Failed to execute method asynchronously! <#{@object.class.name}##{name}> #{e.message}")
ensure
return
end
def method_missing(name, *args)
if @object.respond_to? name
raise ArgumentError.new("blocks are not supported") if block_given?
begin
wrapped = Bg::Asyncable::Wrapper.new(@object, wait: @wait)
wrapped.async.invoke_method name, *args, thread_variables
rescue StandardError => e
raise ArgumentError.new("Failed to execute method asynchronously! <#{@object.class.name}##{name}> #{e.message}")
ensure
return
end
super
end
super
end

def respond_to?(name)
return true if @object.respond_to? name
super
end
def respond_to?(name)
return true if @object.respond_to? name
super
end
end
88 changes: 43 additions & 45 deletions lib/bg/deferrable.rb
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
require "globalid"
require "bg/deferred_method_call_job"
require_relative "deferred_method_call_job"

module Bg
class Deferrable
module Behavior
# Enqueues the method call to be executed by a DeferredMethodCallJob background worker.
def defer(queue: :default, wait: 0)
::Bg::Deferrable.new self, queue: queue, wait: wait
end
class Bg::Deferrable
module Behavior
# Enqueues the method call to be executed by a DeferredMethodCallJob background worker.
def defer(queue: :default, wait: 0)
Bg::Deferrable.new self, queue: queue, wait: wait
end
end

def self.make_enqueable(value)
case value
when ::Hash then
value.each.with_object({}) do |(key, val), memo|
memo[key.to_s] = make_enqueable(val)
end
when ::Array then
value.map { |val| make_enqueable val }
when ::Symbol then
value.to_s
when ::Date, ::Time, ::DateTime then
value.respond_to?(:iso8601) ? value.iso8601 : value.to_s
else
value
def self.make_enqueable(value)
case value
when Hash then
value.each.with_object({}) do |(key, val), memo|
memo[key.to_s] = make_enqueable(val)
end
when Array then
value.map { |val| make_enqueable val }
when Symbol then
value.to_s
when Date, Time, DateTime then
value.respond_to?(:iso8601) ? value.iso8601 : value.to_s
else
value
end
end

def initialize(object, queue: :default, wait: 0)
raise ::ArgumentError unless object.is_a?(::GlobalID::Identification)
@object = object
@queue = queue || :default
@wait = wait.to_i
end
def initialize(object, queue: :default, wait: 0)
raise ArgumentError unless object.is_a?(GlobalID::Identification)
@object = object
@queue = queue || :default
@wait = wait.to_i
end

def method_missing(name, *args)
if @object.respond_to? name
raise ::ArgumentError.new("blocks are not supported") if block_given?
begin
queue_args = { queue: @queue }
queue_args[:wait] = @wait if @wait > 0
job = ::Bg::DeferredMethodCallJob.set(**queue_args).perform_later @object, name.to_s, *self.class.make_enqueable(args)
rescue ::StandardError => e
raise ::ArgumentError.new("Failed to background method call! <#{@object.class.name}##{name}> #{e.message}")
ensure
return job
end
def method_missing(name, *args)
if @object.respond_to? name
raise ArgumentError.new("blocks are not supported") if block_given?
begin
queue_args = { queue: @queue }
queue_args[:wait] = @wait if @wait > 0
job = Bg::DeferredMethodCallJob.set(**queue_args).perform_later @object, name.to_s, *self.class.make_enqueable(args)
rescue StandardError => e
raise ArgumentError.new("Failed to background method call! <#{@object.class.name}##{name}> #{e.message}")
ensure
return job
end
super
end
super
end

def respond_to?(name)
return true if @object.respond_to? name
super
end
def respond_to?(name)
return true if @object.respond_to? name
super
end
end
10 changes: 4 additions & 6 deletions lib/bg/deferred_method_call_job.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
require "active_job"

module Bg
class DeferredMethodCallJob < ::ActiveJob::Base
queue_as :default
class Bg::DeferredMethodCallJob < ActiveJob::Base
queue_as :default

def perform(object, method, *args)
object.send method, *args
end
def perform(object, method, *args)
object.send method, *args
end
end
23 changes: 23 additions & 0 deletions lib/bg/thread_variables.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module Bg::ThreadVariables
def get_thread_variables
Thread.current.thread_variables.each_with_object({}) do |key, memo|
memo[key] = Thread.current.thread_variable_get(key)
end
end

def set_thread_variables(thread_variables={})
thread_variables.each do |key, value|
Thread.current.thread_variable_set key, value
end
end

def with_thread_variables(thread_variables={})
previous_thread_variables = get_thread_variables
set_thread_variables thread_variables
begin
yield
ensure
set_thread_variables previous_thread_variables
end
end
end
8 changes: 3 additions & 5 deletions test/bg/asyncable_test.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
require_relative "../test_helper"

class Bg::AsyncableTest < ::ActiveSupport::TestCase

class Bg::AsyncableTest < PryTest::Test
test "slow io bound method invocations run in parallel" do
obj = ::Bg::BackgroundableObject.new(:example)
obj.eigen.send :include, ::Bg::Asyncable::Behavior
obj = Bg::BackgroundableObject.new(:example)
obj.eigen.send :include, Bg::Asyncable::Behavior
start = Time.now
10.times { obj.async.wait 1 }
assert (Time.now - start) <= 1.1
end

end
42 changes: 20 additions & 22 deletions test/bg/deferrable_test.rb
Original file line number Diff line number Diff line change
@@ -1,72 +1,71 @@
require_relative "../test_helper"

class Bg::DeferrableTest < ::ActiveSupport::TestCase

setup do
@deferrable = ::Bg::Deferrable.new(::Bg::BackgroundableObject.new(:example))
class Bg::DeferrableTest < PryTest::Test
before do
@deferrable = Bg::Deferrable.new(Bg::BackgroundableObject.new(:example))
end

test ".make_enqueable with Symbol" do
value = ::Bg::Deferrable.make_enqueable(:foo)
value = Bg::Deferrable.make_enqueable(:foo)
assert value == "foo"
end

test ".make_enqueable with Date" do
date = ::Date.today
value = ::Bg::Deferrable.make_enqueable(date)
date = Date.today
value = Bg::Deferrable.make_enqueable(date)
assert value == date.iso8601
end

test ".make_enqueable with Time" do
time = ::Time.now
value = ::Bg::Deferrable.make_enqueable(time)
time = Time.now
value = Bg::Deferrable.make_enqueable(time)
assert value == time.iso8601
end

test ".make_enqueable with DateTime" do
date_time = ::DateTime.now
value = ::Bg::Deferrable.make_enqueable(date_time)
date_time = DateTime.now
value = Bg::Deferrable.make_enqueable(date_time)
assert value == date_time.iso8601
end

test ".make_enqueable with Array" do
date_time = ::DateTime.now
date_time = DateTime.now
list = [:foo, "bar", true, date_time]
value = ::Bg::Deferrable.make_enqueable(list)
value = Bg::Deferrable.make_enqueable(list)
assert value == ["foo", "bar", true, date_time.iso8601]
end

test ".make_enqueable with nested Array" do
date_time = ::DateTime.now
date_time = DateTime.now
list = [:foo, "bar", true, date_time]
list << list.dup
value = ::Bg::Deferrable.make_enqueable(list)
value = Bg::Deferrable.make_enqueable(list)
expected = ["foo", "bar", true, date_time.iso8601]
expected << expected.dup
assert value == expected
end

test ".make_enqueable with Hash" do
date_time = ::DateTime.now
date_time = DateTime.now
hash = { a: :foo, b: "bar", c: true, d: date_time }
value = ::Bg::Deferrable.make_enqueable(hash)
value = Bg::Deferrable.make_enqueable(hash)
assert value == { "a" => "foo", "b" => "bar", "c" => true, "d" => date_time.iso8601 }
end

test ".make_enqueable with nested Hash" do
date_time = ::DateTime.now
date_time = DateTime.now
hash = { a: :foo, b: "bar", c: true, d: date_time }
hash[:e] = hash.dup
value = ::Bg::Deferrable.make_enqueable(hash)
value = Bg::Deferrable.make_enqueable(hash)
expected = { "a" => "foo", "b" => "bar", "c" => true, "d" => date_time.iso8601 }
expected["e"] = expected.dup
assert value == expected
end

test ".make_enqueable with complex Hash" do
time = ::Time.now
time = Time.now
hash = { a: :foo, b: time, c: { a: :bar, b: time.dup }, d: [:baz, time.dup, {a: :wat, b: time.dup, c: [a: time.dup]}] }
value = ::Bg::Deferrable.make_enqueable(hash)
value = Bg::Deferrable.make_enqueable(hash)
expected = {
"a" => "foo",
"b" => time.iso8601,
Expand All @@ -75,5 +74,4 @@ class Bg::DeferrableTest < ::ActiveSupport::TestCase
}
assert value == expected
end

end
Loading