Skip to content

Commit

Permalink
Complete support for ARG statement handling
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick J. McNerthney <pat@mcnerthney.com>
  • Loading branch information
Patrick J. McNerthney authored and iciclespider committed Jun 15, 2020
1 parent cfee00f commit ce3a377
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 147 deletions.
222 changes: 148 additions & 74 deletions dockerfile_parse/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,92 +15,87 @@
import re
from contextlib import contextmanager
from six import string_types
from six.moves import shlex_quote as quote

from .constants import DOCKERFILE_FILENAME, COMMENT_INSTRUCTION
from .util import (b2u, extract_labels_or_envs, get_key_val_dictionary,
from .util import (b2u, extract_key_values, get_key_val_dictionary,
u2b, Context, WordSplitter)

try:
# py3
from shlex import quote
except ImportError:
from pipes import quote


logger = logging.getLogger(__name__)


class Labels(dict):
class KeyValues(dict):
"""
A class for allowing direct write access to Dockerfile labels, e.g.:
Abstract base class for allowing direct write access to Dockerfile
instructions which result in a set of key value pairs.
parser.labels['label'] = 'value'
Subclasses must override the `parser_attr` value.
"""
parser_attr = None

def __init__(self, labels, parser):
super(Labels, self).__init__(labels)
def __init__(self, key_values, parser):
super(KeyValues, self).__init__(key_values)
self.parser = parser

def __delitem__(self, key):
super(Labels, self).__delitem__(key)
self.parser.labels = dict(self)
super(KeyValues, self).__delitem__(key)
setattr(self.parser, self.parser_attr, dict(self))

def __setitem__(self, key, value):
super(Labels, self).__setitem__(key, value)
self.parser.labels = dict(self)
super(KeyValues, self).__setitem__(key, value)
setattr(self.parser, self.parser_attr, dict(self))

def __eq__(self, other):
if not isinstance(other, dict):
return False
return dict(self) == other

def __hash__(self):
return hash(json.dumps(self, separators=(',', ':'), sort_keys=True))


class Envs(dict):
class Labels(KeyValues):
"""
A class for allowing direct write access to Dockerfile env. vars., e.g.:
A class for allowing direct write access to Dockerfile labels, e.g.:
parser.envs['variable_name'] = 'value'
parser.labels['label'] = 'value'
"""
parser_attr = 'labels'

def __init__(self, envs, parser):
super(Envs, self).__init__(envs)
self.parser = parser

def __delitem__(self, key):
super(Envs, self).__delitem__(key)
self.parser.envs = dict(self)
class Envs(KeyValues):
"""
A class for allowing direct write access to Dockerfile env. vars., e.g.:
def __setitem__(self, key, value):
super(Envs, self).__setitem__(key, value)
self.parser.envs = dict(self)
parser.envs['variable_name'] = 'value'
"""
parser_attr = 'envs'


class Args(dict):
class Args(KeyValues):
"""
A class for allowing direct write access to Dockerfile args, e.g.:
A class for allowing direct write access to Dockerfile build args, e.g.:
parser.args['variable_name'] = 'value'
"""

def __init__(self, args, parser):
super(Args, self).__init__(args)
self.parser = parser

def __delitem__(self, key):
super(Args, self).__delitem__(key)
self.parser.envs = dict(self)

def __setitem__(self, key, value):
super(Args, self).__setitem__(key, value)
self.parser.envs = dict(self)
parser_attr = 'args'


class DockerfileParser(object):
def __init__(self, path=None,
cache_content=False,
env_replace=True,
parent_env=None,
build_args=None,
fileobj=None):
"""
Initialize source of Dockerfile
:param path: path to (directory with) Dockerfile
:param cache_content: cache Dockerfile content inside DockerfileParser
:param env_replace: return content with variables replaced
:param parent_env: python dict of inherited env vars from parent image
:param build_args: python dict of build args used when building image
:param fileobj: seekable file-like object containing Dockerfile content
as bytes (will be truncated on write)
"""
Expand Down Expand Up @@ -139,6 +134,13 @@ def __init__(self, path=None,
logger.debug("Setting inherited parent image ENV vars: %s", parent_env)
self.parent_env = parent_env

if build_args is None:
self.build_args = {}
else:
assert isinstance(build_args, dict)
logger.debug("Setting build args: %s", build_args)
self.build_args = build_args

@contextmanager
def _open_dockerfile(self, mode):
if self.fileobj is not None:
Expand Down Expand Up @@ -318,17 +320,26 @@ def parent_images(self):
"""
:return: list of parent images -- one image per each stage's FROM instruction
"""
in_stage = False
top_args = {}
parents = []
for instr in self.structure:
if instr['instruction'] != 'FROM':
continue
image, _ = image_from(instr['value'])
if image is not None:
# replace any ARG keys with values
for key in self.args.keys():
pattern = '\\$'+key+'|'+'\\${'+key+'}'
image = re.sub(pattern, self.args[key], image)
parents.append(image)
if instr['instruction'] == 'ARG':
if not in_stage:
key_val_list = extract_key_values(
env_replace=False,
args={}, envs={},
instruction_value=instr['value'])
for key, value in key_val_list:
if key in self.build_args:
value = self.build_args[key]
top_args[key] = value
elif instr['instruction'] == 'FROM':
in_stage = True
image, _ = image_from(instr['value'])
if image is not None:
image = WordSplitter(image, args=top_args).dequote()
parents.append(image)
return parents

@parent_images.setter
Expand All @@ -348,7 +359,7 @@ def parent_images(self, parents):
continue

old_image, stage = image_from(instr['value'])
if not old_image:
if old_image is None:
continue # broken FROM, fixing would just confuse things
if not parents:
raise RuntimeError("not enough parents to match build stages")
Expand Down Expand Up @@ -384,7 +395,14 @@ def baseimage(self, new_image):
"""
change image of final stage FROM instruction
"""
images = self.parent_images or [None]
images = []
for instr in self.structure:
if instr['instruction'] == 'FROM':
image, _ = image_from(instr['value'])
if image is not None:
images.append(image)
if not images:
raise RuntimeError('No stage defined to set base image on')
images[-1] = new_image
self.parent_images = images

Expand Down Expand Up @@ -448,28 +466,45 @@ def args(self):

def _instruction_getter(self, name, env_replace):
"""
Get LABEL or ENV instructions with environment replacement
Get LABEL or ENV or ARG instructions with environment replacement
:param name: e.g. 'LABEL' or 'ENV'
:param name: e.g. 'LABEL' or 'ENV' or 'ARG'
:param env_replace: bool, whether to perform ENV substitution
:return: Labels instance or Envs instance
"""
supported_instrs = ('LABEL', 'ENV', 'ARG')
if name not in supported_instrs:
raise ValueError("Unsupported instruction '%s'", name)
if name not in ('LABEL', 'ENV', 'ARG'):
raise ValueError("Unsupported instruction '{0}'".format(name))
in_stage = False
top_args = {}
instructions = {}
args = {}
envs = {}

for instruction_desc in self.structure:
this_instruction = instruction_desc['instruction']
if this_instruction == 'FROM':
in_stage = True
instructions.clear()
args = {}
envs = self.parent_env.copy()
elif this_instruction in supported_instrs:
elif this_instruction in (name, 'ENV', 'ARG'):
logger.debug("%s value: %r", name.lower(), instruction_desc['value'])
key_val_list = extract_labels_or_envs(env_replace=env_replace,
envs=envs,
instruction_value=instruction_desc['value'])
key_val_list = extract_key_values(
env_replace=this_instruction != 'ARG' and env_replace,
args=args, envs=envs,
instruction_value=instruction_desc['value'])
for key, value in key_val_list:
if this_instruction == 'ARG':
if in_stage:
if key in top_args:
value = top_args[key]
elif key in self.build_args:
value = self.build_args[key]
args[key] = value
else:
if key in self.build_args:
value = self.build_args[key]
top_args[key] = value
if this_instruction == name:
instructions[key] = value
logger.debug("new %s %r=%r", name.lower(), key, value)
Expand Down Expand Up @@ -500,6 +535,14 @@ def envs(self, envs):
"""
self._instructions_setter('ENV', envs)

@args.setter
def args(self, args):
"""
Setter for ARG instruction, i.e. sets ARGs per input param.
:param args: dictionary of arg names & values to be set
"""
self._instructions_setter('ARG', args)

def _instructions_setter(self, name, instructions):
if not isinstance(instructions, dict):
raise TypeError('instructions needs to be a dictionary {name: value}')
Expand All @@ -508,6 +551,10 @@ def _instructions_setter(self, name, instructions):
existing = self.labels
elif name == 'ENV':
existing = self.envs
elif name == 'ARG':
existing = self.args
else:
raise ValueError("Unexpected instruction '%s'" % name)

logger.debug("setting %s instructions: %r", name, instructions)

Expand All @@ -533,6 +580,9 @@ def _modify_instruction_label(self, label_key, instr_value):
def _modify_instruction_env(self, env_var_key, env_var_value):
self._modify_instruction_label_env('ENV', env_var_key, env_var_value)

def _modify_instruction_arg(self, arg_key, arg_value):
self._modify_instruction_label_env('ARG', arg_key, arg_value)

def _modify_instruction_label_env(self, instruction, instr_key, instr_value):
"""
set <INSTRUCTION> instr_key to instr_value
Expand All @@ -544,6 +594,8 @@ def _modify_instruction_label_env(self, instruction, instr_key, instr_value):
instructions = self.labels
elif instruction == 'ENV':
instructions = self.envs
elif instruction == 'ARG':
instructions = self.args
else:
raise ValueError("Unknown instruction '%s'" % instruction)

Expand All @@ -563,8 +615,8 @@ def _modify_instruction_label_env(self, instruction, instr_key, instr_value):
for candidate in candidates:
words = list(WordSplitter(candidate['value']).split(dequote=False))

# LABEL/ENV syntax is one of two types:
if '=' not in words[0]: # LABEL/ENV name value
# LABEL/ENV/ARG syntax is one of two types:
if '=' not in words[0]: # LABEL/ENV/ARG name value
# Remove quotes from key name and see if it's the one
# we're looking for.
if WordSplitter(words[0]).dequote() == instr_key:
Expand All @@ -581,7 +633,7 @@ def _modify_instruction_label_env(self, instruction, instr_key, instr_value):
startline = candidate['startline']
endline = candidate['endline']
break
else: # LABEL/ENV "name"="value"
else: # LABEL/ENV/ARG "name"="value"
for index, token in enumerate(words):
key, _ = token.split("=", 1)
if WordSplitter(key).dequote() == instr_key:
Expand Down Expand Up @@ -626,6 +678,9 @@ def _delete_instructions(self, instruction, value=None):
if instruction == 'ENV' and value:
self._modify_instruction_env(value, None)
return
if instruction == 'ARG' and value:
self._modify_instruction_arg(value, None)
return

lines = self.lines
deleted = False
Expand All @@ -643,7 +698,7 @@ def _add_instruction(self, instruction, value):
:param instruction: instruction name to be added
:param value: instruction value
"""
if (instruction == 'LABEL' or instruction == 'ENV') and len(value) == 2:
if instruction in ('LABEL', 'ENV', 'ARG') and len(value) == 2:
new_line = instruction + ' ' + '='.join(map(quote, value)) + '\n'
else:
new_line = '{0} {1}\n'.format(instruction, value)
Expand Down Expand Up @@ -745,23 +800,42 @@ def add_lines_at(self, anchor, *lines, **kwargs):
def context_structure(self):
"""
:return: list of Context objects
(Contains info about labels and environment variables for each line.)
(Contains info about build arguments, labels, and environment variables for each line.)
"""
in_stage = False
top_args = {}
instructions = []
last_context = Context()
for instr in self.structure:
instruction_type = instr['instruction']
if instruction_type == "FROM": # reset per stage
last_context = Context()
in_stage = True
last_context = Context(envs=dict(self.parent_env))

context = Context(envs=dict(last_context.envs),
context = Context(args=dict(last_context.args),
envs=dict(last_context.envs),
labels=dict(last_context.labels))

if instruction_type in ["ENV", "LABEL"]:
val = get_key_val_dictionary(instruction_value=instr['value'],
env_replace=self.env_replace,
envs=last_context.envs)
context.set_line_value(context_type=instruction_type, value=val)
if instruction_type in ('ARG', 'ENV', 'LABEL'):
values = get_key_val_dictionary(
instruction_value=instr['value'],
env_replace=instruction_type != 'ARG' and self.env_replace,
args=last_context.args,
envs=last_context.envs)
if instruction_type == 'ARG' and self.env_replace:
if in_stage:
for key in list(values.keys()):
if key in top_args:
values[key] = top_args[key]
elif key in self.build_args:
values[key] = self.build_args[key]
else:
for key, value in list(values.items()):
if key in self.build_args:
value = self.build_args[key]
top_args[key] = value
values[key] = value
context.set_line_value(context_type=instruction_type, value=values)

instructions.append(context)
last_context = context
Expand Down
Loading

0 comments on commit ce3a377

Please sign in to comment.