Skip to content

Commit

Permalink
rdoc FTW
Browse files Browse the repository at this point in the history
  • Loading branch information
kirs committed Jul 27, 2018
1 parent a76168b commit db2b3fd
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .yardopts
@@ -0,0 +1,3 @@
--no-private
--embed-mixins
--markup markdown
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -22,3 +22,4 @@ gem 'pry'
gem 'mocha'

gem 'rubocop'
gem 'yard'
4 changes: 3 additions & 1 deletion Gemfile.lock
Expand Up @@ -88,6 +88,7 @@ GEM
unicode-display_width (1.4.0)
vegas (0.1.11)
rack (>= 1.0.0)
yard (0.9.15)

PLATFORMS
ruby
Expand All @@ -108,6 +109,7 @@ DEPENDENCIES
resque
rubocop
sidekiq
yard

BUNDLED WITH
1.16.1
1.16.3
3 changes: 2 additions & 1 deletion README.md
@@ -1,6 +1,7 @@
# Job Iteration API

[![Build Status](https://travis-ci.com/Shopify/job-iteration.svg?branch=master)](https://travis-ci.com/Shopify/job-iteration)
[![Build Status](https://travis-ci.com/Shopify/job-iteration.svg?branch=master)](https://travis-ci.com/Shopify/job-iteration)
[![rubydoc](https://img.shields.io/readthedocs/pip/stable.svg)](https://www.rubydoc.info/github/Shopify/job-iteration)

Meet Iteration, an extension for [ActiveJob](https://github.com/rails/rails/tree/master/activejob) that makes your jobs interruptible and resumable, saving all progress that the job has made (aka checkpoint for jobs).

Expand Down
2 changes: 1 addition & 1 deletion lib/job-iteration/active_record_cursor.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true

module JobIteration
class ActiveRecordCursor
class ActiveRecordCursor # @private
include Comparable

attr_reader :position
Expand Down
2 changes: 2 additions & 0 deletions lib/job-iteration/active_record_enumerator.rb
@@ -1,6 +1,8 @@
# frozen_string_literal: true
require_relative "./active_record_cursor"
module JobIteration
# Builds Enumerator based on ActiveRecord Relation. Supports enumerating on rows and batches.
# @see EnumeratorBuilder
class ActiveRecordEnumerator
def initialize(relation, columns: nil, batch_size: 100, cursor: nil)
@relation = relation
Expand Down
21 changes: 21 additions & 0 deletions lib/job-iteration/csv_enumerator.rb
@@ -1,7 +1,24 @@
# frozen_string_literal: true

module JobIteration
# CsvEnumerator makes it possible to write an Iteration job
# that uses CSV file as a collection to Iterate.
# @example
# def build_enumerator(cursor:)
# csv = CSV.open('tmp/files', { converters: :integer, headers: true })
# JobIteration::CsvEnumerator.new(csv).rows(cursor: cursor)
# end
#
# def each_iteration(row)
# ...
# end
class CsvEnumerator
# Constructs CsvEnumerator instance based on a CSV file.
# @param [CSV] csv An instance of CSV object
# @return [JobIteration::CsvEnumerator]
# @example
# csv = CSV.open('tmp/files', { converters: :integer, headers: true })
# JobIteration::CsvEnumerator.new(csv).rows(cursor: cursor)
def initialize(csv)
unless csv.instance_of?(CSV)
raise ArgumentError, "CsvEnumerator.new takes CSV object"
Expand All @@ -10,13 +27,17 @@ def initialize(csv)
@csv = csv
end

# Constructs a enumerator on CSV rows
# @return [Enumerator] Enumerator instance
def rows(cursor:)
@csv.lazy
.each_with_index
.drop(cursor.to_i)
.to_enum { count_rows_in_file }
end

# Constructs a enumerator on batches of CSV rows
# @return [Enumerator] Enumerator instance
def batches(batch_size:, cursor:)
@csv.lazy
.each_slice(batch_size)
Expand Down
6 changes: 3 additions & 3 deletions lib/job-iteration/integrations/resque.rb
Expand Up @@ -4,13 +4,13 @@

module JobIteration
module Integrations
# The trick is required in order to call shutdown? on a Resque::Worker instance
module ResqueIterationExtension
def initialize(*)
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
end
end
# The patch is required in order to call shutdown? on a Resque::Worker instance
Resque::Worker.prepend(ResqueIterationExtension)

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
Expand Down
2 changes: 1 addition & 1 deletion lib/job-iteration/integrations/sidekiq.rb
Expand Up @@ -3,7 +3,7 @@
require 'sidekiq'

module JobIteration
module Integrations
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
Expand Down
6 changes: 3 additions & 3 deletions lib/job-iteration/iteration.rb
Expand Up @@ -53,22 +53,22 @@ def initialize(*arguments)
self.total_time = 0.0
end

def serialize
def serialize # @private
super.merge(
'cursor_position' => cursor_position,
'times_interrupted' => times_interrupted,
'total_time' => total_time,
)
end

def deserialize(job_data)
def deserialize(job_data) # @private
super
self.cursor_position = job_data['cursor_position']
self.times_interrupted = job_data['times_interrupted'] || 0
self.total_time = job_data['total_time'] || 0
end

def perform(*params)
def perform(*params) # @private
interruptible_perform(*params)
end

Expand Down
18 changes: 15 additions & 3 deletions lib/job-iteration/test_helper.rb
@@ -1,8 +1,9 @@
# frozen_string_literal: true

module JobIteration
# Include JobIteration::TestHelper to mock interruption when testing your jobs.
module TestHelper
class StoppingSupervisor
class StoppingSupervisor # @private
def initialize(stop_after_count)
@stop_after_count = stop_after_count
@calls = 0
Expand All @@ -14,24 +15,35 @@ def call
end
end

private

# Stubs interruption adapter to interrupt the job after every N iterations.
# @param [Integer] n_times Number of times before the job is interrupted
# @example
# test "this stuff interrupts" do
# iterate_exact_times(3.times)
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
# @see #iterate_exact_times
def iterate_once
iterate_exact_times(1.times)
end

# Removes previous stubs and tells the job to iterate until the end.
def continue_iterating
stub_shutdown_adapter_to_return(false)
end

# Stubs the worker as already interrupted.
def mark_job_worker_as_interrupted
stub_shutdown_adapter_to_return(true)
end

private

def stub_shutdown_adapter_to_return(_value)
adapter = mock
adapter.stubs(:call).returns(false)
Expand Down

0 comments on commit db2b3fd

Please sign in to comment.