-
-
Notifications
You must be signed in to change notification settings - Fork 36
/
ExpressionParser.py
225 lines (190 loc) · 9.24 KB
/
ExpressionParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# The MIT License (MIT)
#
# Copyright (c) 2016 Adam Schubert
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
from .Exception import MissingFieldException, FormatException
class ExpressionParser(object):
_expression = ''
_options = None
_cron_days = {
0: 'SUN',
1: 'MON',
2: 'TUE',
3: 'WED',
4: 'THU',
5: 'FRI',
6: 'SAT'
}
_cron_months = {
1: 'JAN',
2: 'FEB',
3: 'MAR',
4: 'APR',
5: 'MAY',
6: 'JUN',
7: 'JUL',
8: 'AUG',
9: 'SEP',
10: 'OCT',
11: 'NOV',
12: 'DEC'
}
def __init__(self, expression, options):
"""Initializes a new instance of the ExpressionParser class
Args:
expression: The cron expression string
options: Parsing options
"""
self._expression = expression
self._options = options
def parse(self):
"""Parses the cron expression string
Returns:
A 7 part string array, one part for each component of the cron expression (seconds, minutes, etc.)
Raises:
MissingFieldException: if _expression is empty or None
FormatException: if _expression has wrong format
"""
# Initialize all elements of parsed array to empty strings
parsed = ['', '', '', '', '', '', '']
if self._expression is None or len(self._expression) == 0:
raise MissingFieldException("ExpressionDescriptor.expression")
else:
expression_parts_temp = self._expression.split()
expression_parts_temp_length = len(expression_parts_temp)
if expression_parts_temp_length < 5:
raise FormatException(
"Error: Expression only has {0} parts. At least 5 part are required.".format(
expression_parts_temp_length
)
)
elif expression_parts_temp_length == 5:
# 5 part cron so shift array past seconds element
for i, expression_part_temp in enumerate(expression_parts_temp):
parsed[i + 1] = expression_part_temp
elif expression_parts_temp_length == 6:
# We will detect if this 6 part expression has a year specified and if so we will shift the parts and treat the
# first part as a minute part rather than a second part.
# Ways we detect:
# 1. Last part is a literal year (i.e. 2020)
# 2. 3rd or 5th part is specified as "?" (DOM or DOW)
year_regex = re.compile(r"\d{4}$")
is_year_with_no_seconds_part = bool(year_regex.search(expression_parts_temp[5])) or "?" in [expression_parts_temp[4], expression_parts_temp[2]]
for i, expression_part_temp in enumerate(expression_parts_temp):
if is_year_with_no_seconds_part:
# Shift parts over by one
parsed[i + 1] = expression_part_temp
else:
parsed[i] = expression_part_temp
elif expression_parts_temp_length == 7:
parsed = expression_parts_temp
else:
raise FormatException(
"Error: Expression has too many parts ({0}). Expression must not have more than 7 parts.".format(
expression_parts_temp_length
)
)
self.normalize_expression(parsed)
return parsed
def normalize_expression(self, expression_parts):
"""Converts cron expression components into consistent, predictable formats.
Args:
expression_parts: A 7 part string array, one part for each component of the cron expression
Returns:
None
"""
# convert ? to * only for DOM and DOW
expression_parts[3] = expression_parts[3].replace("?", "*")
expression_parts[5] = expression_parts[5].replace("?", "*")
# convert 0/, 1/ to */
if expression_parts[0].startswith("0/"):
expression_parts[0] = expression_parts[0].replace("0/", "*/") # seconds
if expression_parts[1].startswith("0/"):
expression_parts[1] = expression_parts[1].replace("0/", "*/") # minutes
if expression_parts[2].startswith("0/"):
expression_parts[2] = expression_parts[2].replace("0/", "*/") # hours
if expression_parts[3].startswith("1/"):
expression_parts[3] = expression_parts[3].replace("1/", "*/") # DOM
if expression_parts[4].startswith("1/"):
expression_parts[4] = expression_parts[4].replace("1/", "*/") # Month
if expression_parts[5].startswith("1/"):
expression_parts[5] = expression_parts[5].replace("1/", "*/") # DOW
if expression_parts[6].startswith("1/"):
expression_parts[6] = expression_parts[6].replace("1/", "*/") # Years
# Adjust DOW based on dayOfWeekStartIndexZero option
def digit_replace(match):
match_value = match.group()
dow_digits = re.sub(r'\D', "", match_value)
dow_digits_adjusted = dow_digits
if self._options.day_of_week_start_index_zero:
if dow_digits == "7":
dow_digits_adjusted = "0"
else:
dow_digits_adjusted = str(int(dow_digits) - 1)
return match_value.replace(dow_digits, dow_digits_adjusted)
expression_parts[5] = re.sub(r'(^\d)|([^#/\s]\d)', digit_replace, expression_parts[5])
# Convert DOM '?' to '*'
if expression_parts[3] == "?":
expression_parts[3] = "*"
# convert SUN-SAT format to 0-6 format
for day_number in self._cron_days:
expression_parts[5] = expression_parts[5].upper().replace(self._cron_days[day_number], str(day_number))
# convert JAN-DEC format to 1-12 format
for month_number in self._cron_months:
expression_parts[4] = expression_parts[4].upper().replace(
self._cron_months[month_number], str(month_number))
# convert 0 second to (empty)
if expression_parts[0] == "0":
expression_parts[0] = ''
# If time interval is specified for seconds or minutes and next time part is single item, make it a "self-range" so
# the expression can be interpreted as an interval 'between' range.
# For example:
# 0-20/3 9 * * * => 0-20/3 9-9 * * * (9 => 9-9)
# */5 3 * * * => */5 3-3 * * * (3 => 3-3)
star_and_slash = ['*', '/']
has_part_zero_star_and_slash = any(ext in expression_parts[0] for ext in star_and_slash)
has_part_one_star_and_slash = any(ext in expression_parts[1] for ext in star_and_slash)
has_part_two_special_chars = any(ext in expression_parts[2] for ext in ['*', '-', ',', '/'])
if not has_part_two_special_chars and (has_part_zero_star_and_slash or has_part_one_star_and_slash):
expression_parts[2] += '-{}'.format(expression_parts[2])
# Loop through all parts and apply global normalization
length = len(expression_parts)
for i in range(length):
# convert all '*/1' to '*'
if expression_parts[i] == "*/1":
expression_parts[i] = "*"
"""
Convert Month,DOW,Year step values with a starting value (i.e. not '*') to between expressions.
This allows us to reuse the between expression handling for step values.
For Example:
- month part '3/2' will be converted to '3-12/2' (every 2 months between March and December)
- DOW part '3/2' will be converted to '3-6/2' (every 2 days between Tuesday and Saturday)
"""
if "/" in expression_parts[i] and not any(exp in expression_parts[i] for exp in ['*', '-', ',']):
choices = {
4: "12",
5: "6",
6: "9999"
}
step_range_through = choices.get(i)
if step_range_through is not None:
parts = expression_parts[i].split('/')
expression_parts[i] = "{0}-{1}/{2}".format(parts[0], step_range_through, parts[1])