Skip to content

Commit

Permalink
Move some utility functions into their own file
Browse files Browse the repository at this point in the history
  • Loading branch information
Doug Greiman committed Jun 16, 2017
1 parent 36d0969 commit 6f2275b
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 166 deletions.
96 changes: 12 additions & 84 deletions scripts/local_cloudbuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import yaml

import validation_utils


# Exclude non-printable control characters (including newlines)
PRINTABLE_REGEX = re.compile(r"""^[^\x00-\x1f]*$""")
Expand All @@ -59,10 +61,6 @@
)
""")

# For easier development, we allow redefining builtins like
# --substitutions=PROJECT_ID=foo even though gcloud doesn't.
KEY_VALUE_REGEX = re.compile(r'^([A-Z_][A-Z0-9_]*)=(.*)$')

# Default builtin substitutions
DEFAULT_SUBSTITUTIONS = {
'BRANCH_NAME': '',
Expand Down Expand Up @@ -149,51 +147,6 @@ def sub(match):
quoted_s = shlex.quote(substituted_s)
return quoted_s

def get_field_value(container, field_name, field_type):
"""Fetch a field from a container with typechecking and default values.
The field value is coerced to the desired type. If the field is
not present, a instance of `field_type` is constructed with no
arguments and used as the default value.
Args:
container (dict): Object decoded from yaml
field_name (str): Field that should be present in `container`
field_type (type): Expected type for field value
Returns:
Any: Fetched or default value of field
Raises:
ValueError: if field value cannot be converted to the desired type
"""
try:
value = container[field_name]
except (IndexError, KeyError):
return field_type()

msg = 'Expected "{}" field to be of type "{}", but found type "{}"'
if not isinstance(value, field_type):
# list('some string') is a successful type cast as far as Python
# is concerned, but doesn't exactly produce the results we want.
# We have a whitelist of conversions we will attempt.
whitelist = (
(float, str),
(int, str),
(str, float),
(str, int),
(int, float),
)
if (type(value), field_type) not in whitelist:
raise ValueError(msg.format(field_name, field_type, type(value)))

try:
value = field_type(value)
except ValueError as e:
e.message = msg.format(field_name, field_type, type(value))
raise
return value


def get_cloudbuild(raw_config, args):
"""Read and validate a cloudbuild recipe
Expand All @@ -210,7 +163,7 @@ def get_cloudbuild(raw_config, args):
'Expected {} contents to be of type "dict", but found type "{}"'.
format(args.config, type(raw_config)))

raw_steps = get_field_value(raw_config, 'steps', list)
raw_steps = validation_utils.get_field_value(raw_config, 'steps', list)
if not raw_steps:
raise ValueError('No steps defined in {}'.format(args.config))

Expand All @@ -236,14 +189,14 @@ def get_step(raw_step):
raise ValueError(
'Expected step to be of type "dict", but found type "{}"'.
format(type(raw_step)))
raw_args = get_field_value(raw_step, 'args', list)
args = [get_field_value(raw_args, index, str)
raw_args = validation_utils.get_field_value(raw_step, 'args', list)
args = [validation_utils.get_field_value(raw_args, index, str)
for index in range(len(raw_args))]
dir_ = get_field_value(raw_step, 'dir', str)
raw_env = get_field_value(raw_step, 'env', list)
env = [get_field_value(raw_env, index, str)
dir_ = validation_utils.get_field_value(raw_step, 'dir', str)
raw_env = validation_utils.get_field_value(raw_step, 'env', list)
env = [validation_utils.get_field_value(raw_env, index, str)
for index in range(len(raw_env))]
name = get_field_value(raw_step, 'name', str)
name = validation_utils.get_field_value(raw_step, 'name', str)
return Step(
args=args,
dir_=dir_,
Expand Down Expand Up @@ -373,46 +326,21 @@ def local_cloudbuild(args):
subprocess.check_call(args)


def validate_arg_regex(flag_value, flag_regex):
"""Check a named command line flag against a regular expression"""
if not re.match(flag_regex, flag_value):
raise argparse.ArgumentTypeError(
'Value "{}" does not match pattern "{}"'.format(
flag_value, flag_regex.pattern))
return flag_value


def validate_arg_dict(flag_value):
"""Parse a command line flag as a key=val,... dict"""
if not flag_value:
return {}
entries = flag_value.split(',')
pairs = []
for entry in entries:
match = re.match(KEY_VALUE_REGEX, entry)
if not match:
raise argparse.ArgumentTypeError(
'Value "{}" should be a list like _KEY1=value1,_KEY2=value2"'.format(
flag_value))
pairs.append((match.group(1), match.group(2)))
return dict(pairs)


def parse_args(argv):
"""Parse and validate command line flags"""
parser = argparse.ArgumentParser(
description='Process cloudbuild.yaml locally to build Docker images')
parser.add_argument(
'--config',
type=functools.partial(
validate_arg_regex, flag_regex=PRINTABLE_REGEX),
validation_utils.validate_arg_regex, flag_regex=PRINTABLE_REGEX),
default='cloudbuild.yaml',
help='Path to cloudbuild.yaml file'
)
parser.add_argument(
'--output_script',
type=functools.partial(
validate_arg_regex, flag_regex=PRINTABLE_REGEX),
validation_utils.validate_arg_regex, flag_regex=PRINTABLE_REGEX),
help='Filename to write shell script to',
)
parser.add_argument(
Expand All @@ -423,7 +351,7 @@ def parse_args(argv):
)
parser.add_argument(
'--substitutions',
type=validate_arg_dict,
type=validation_utils.validate_arg_dict,
default={},
help='Parameters to be substituted in the build specification',
)
Expand Down
83 changes: 1 addition & 82 deletions scripts/local_cloudbuild_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,91 +34,10 @@
STAGING_DIR_REGEX = re.compile(
b'(?m)Copying source to staging directory (.+)$')

class ValidationUtilsTest(unittest.TestCase):

def test_get_field_value(self):
valid_cases = (
# Normal case, field present and correct type
({ 'present': 1 }, 'present', int, 1),
({ 'present': '1' }, 'present', str, '1'),
({ 'present': [1] }, 'present', list, [1]),
({ 'present': {1: 2} }, 'present', dict, {1: 2}),
# Missing field replaced by default
({}, 'missing', str, ''),
# Valid conversions
({ 'str_to_int': '1' }, 'str_to_int', int, 1),
({ 'int_to_str': 1 }, 'int_to_str', str, '1'),
)
for valid_case in valid_cases:
with self.subTest(valid_case=valid_case):
container, field_name, field_type, expected = valid_case
self.assertEqual(
local_cloudbuild.get_field_value(
container, field_name, field_type),
expected)

invalid_cases = (
# Type conversion failures
({ 'bad_list_to_dict': [1] }, 'bad_list_to_dict', dict),
({ 'bad_list_to_str': [1] }, 'bad_list_to_str', str),
({ 'bad_dict_to_list': {1: 2} }, 'bad_dict_to_list', list),
({ 'bad_str_to_int': 'not_an_int' }, 'bad_str_to_int', int),
({ 'bad_str_to_list': 'abc' }, 'bad_str_to_list', list),
)
for invalid_case in invalid_cases:
with self.subTest(invalid_case=invalid_case):
container, field_name, field_type = invalid_case
with self.assertRaises(ValueError):
local_cloudbuild.get_field_value(
container, field_name, field_type)

def test_validate_arg_regex(self):
self.assertEqual(
local_cloudbuild.validate_arg_regex('abc', re.compile('a[b]c')),
'abc')
with self.assertRaises(argparse.ArgumentTypeError):
local_cloudbuild.validate_arg_regex('abc', re.compile('a[d]c'))


def test_validate_arg_dict(self):
valid_cases = (
# Normal case, field present and correct type
('', {}),
('_A=1', {'_A':'1'}),
('_A=1,_B=2', {'_A':'1', '_B':'2'}),
# Repeated key is ok
('_A=1,_A=2', {'_A':'2'}),
# Extra = is ok
('_A=x=y=z,_B=2', {'_A':'x=y=z', '_B':'2'}),
# No value is ok
('_A=', {'_A':''}),
)
for valid_case in valid_cases:
with self.subTest(valid_case=valid_case):
s, expected = valid_case
self.assertEqual(
local_cloudbuild.validate_arg_dict(s),
expected)

invalid_cases = (
# No key
',_A',
'_A,',
# Invalid variable name
'_Aa=1',
'_aA=1',
'0A=1',
)
for invalid_case in invalid_cases:
with self.subTest(invalid_case=invalid_case):
with self.assertRaises(argparse.ArgumentTypeError):
local_cloudbuild.validate_arg_dict(invalid_case)


class LocalCloudbuildTest(unittest.TestCase):

def setUp(self):
self.testdata_dir = 'testdata'
self.testdata_dir = os.path.join(os.path.dirname(__file__), 'testdata') # Sigh
assert os.path.isdir(self.testdata_dir), 'Could not run test: testdata directory not found'

def test_sub_and_quote(self):
Expand Down
98 changes: 98 additions & 0 deletions scripts/validation_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3

# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for schema and command line validation"""

import argparse
import re


# For easier development, we allow redefining builtins like
# --substitutions=PROJECT_ID=foo even though gcloud doesn't.
KEY_VALUE_REGEX = re.compile(r'^([A-Z_][A-Z0-9_]*)=(.*)$')


def get_field_value(container, field_name, field_type):
"""Fetch a field from a container with typechecking and default values.
The field value is coerced to the desired type. If the field is
not present, a instance of `field_type` is constructed with no
arguments and used as the default value.
Args:
container (dict): Object decoded from yaml
field_name (str): Field that should be present in `container`
field_type (type): Expected type for field value
Returns:
Any: Fetched or default value of field
Raises:
ValueError: if field value cannot be converted to the desired type
"""
try:
value = container[field_name]
if value is None:
return field_type()
except (IndexError, KeyError):
return field_type()

msg = 'Expected "{}" field to be of type "{}", but found type "{}"'
if not isinstance(value, field_type):
# list('some string') is a successful type cast as far as Python
# is concerned, but doesn't exactly produce the results we want.
# We have a whitelist of conversions we will attempt.
whitelist = (
(float, str),
(int, str),
(str, float),
(str, int),
(int, float),
)
if (type(value), field_type) not in whitelist:
raise ValueError(msg.format(field_name, field_type, type(value)))

try:
value = field_type(value)
except ValueError as e:
e.message = msg.format(field_name, field_type, type(value))
raise
return value


def validate_arg_regex(flag_value, flag_regex):
"""Check a named command line flag against a regular expression"""
if not re.match(flag_regex, flag_value):
raise argparse.ArgumentTypeError(
'Value "{}" does not match pattern "{}"'.format(
flag_value, flag_regex.pattern))
return flag_value


def validate_arg_dict(flag_value):
"""Parse a command line flag as a key=val,... dict"""
if not flag_value:
return {}
entries = flag_value.split(',')
pairs = []
for entry in entries:
match = re.match(KEY_VALUE_REGEX, entry)
if not match:
raise argparse.ArgumentTypeError(
'Value "{}" should be a list like _KEY1=value1,_KEY2=value2"'.format(
flag_value))
pairs.append((match.group(1), match.group(2)))
return dict(pairs)

0 comments on commit 6f2275b

Please sign in to comment.