-
Notifications
You must be signed in to change notification settings - Fork 24
/
cron_workflow.rb
107 lines (92 loc) · 4.1 KB
/
cron_workflow.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
require 'parse-cron'
require_relative 'cron_activity'
require_relative '../cron_utils'
# CronWorkflow class defines the workflows for the Cron sample
class CronWorkflow
extend AWS::Flow::Workflows
# Use the workflow method to define workflow entry point. In this case
# make_booking is the entry point
workflow :run do
{
version: CronUtils::WF_VERSION,
default_task_list: CronUtils::WF_TASKLIST,
default_execution_start_to_close_timeout: 600,
}
end
# Create an activity client using the activity_client method to schedule
# activities
activity_client(:activity) { { from_class: "CronActivity" } }
# This is the workflow entry point. It determines the times at which to run
# the job and then schedules them.
# interval_length is defaulted to a prime number (601) to avoid overlapping
# with periodicity of the job. 601 is the closest prime to 600 (which equals
# 10 minutes)
# @param (see #get_schedule_times)
def run(job, base_time = Time.now, interval_length = 601)
puts "Workflow has started" unless is_replaying?
# Get a list of times at which the job needs to be scheduled
times_to_schedule = get_schedule_times(job, base_time, interval_length)
# Schedule all invocations of the job asynchronously
puts "Scheduling activity invocations" unless is_replaying?
times_to_schedule.each do |time|
async_create_timer(time) do
activity.run_job(job[:func], *job[:args])
end
end
# Update base_time to move to the next interval of time
base_time += times_to_schedule.last
create_timer(times_to_schedule.last)
# Call the continue_as_new method that is available to all Workflow classes
# that extend AWS::Flow::Workflows so that this workflow will be called
# again once complete (after the interval is over)
puts "Workflow is continuing as new" unless is_replaying?
continue_as_new(job, base_time, interval_length)
end
# This is a utility function that determines the schedule times for a cron job
# that lie within the current interval and creates a list of those schedule
# times
#
# @param job [Hash] information about the job that needs to be run. It
# contains a cron string, the function to call (in activity.rb), and the
# function call's arguments
# @param base_time [Time] time to start the cron workflow
# @param interval_length [Integer] how often to reset history (seconds)
# @return [Array] list of times at which to invoke the job
def get_schedule_times(job, base_time, interval_length)
return [] if job.empty?
# Generate a cron_parser for each job
cron_parser = CronParser.new(job[:cron])
# Store the times at which this job will be called within the given interval
times_to_schedule = []
next_time = cron_parser.next(base_time)
while(base_time <= next_time and next_time < base_time + interval_length) do
times_to_schedule.push((next_time - base_time).to_i)
next_time = cron_parser.next(next_time)
end
# Checks if the interval_length is less than the periodicity of the task
if times_to_schedule.empty?
raise ArgumentError, "interval length should be longer than periodicity"
end
# Return the list of times at which the job needs to be scheduled
times_to_schedule
end
# Helper method to check if Flow is replaying the workflow. This is used to
# avoid duplicate log messages
def is_replaying?
decision_context.workflow_clock.replaying
end
end
# Start a WorkflowWorker to work on the CronWorkflow tasks
CronUtils.new.workflow_worker.start if $0 == __FILE__