Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 54 additions & 54 deletions docker/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import base64
import fileinput
import json
import logging
import os
Expand Down Expand Up @@ -132,78 +131,79 @@ def parse_auth(entries):
return conf


def find_config_file(config_path=None):
environment_path = os.path.join(
os.environ.get('DOCKER_CONFIG'),
os.path.basename(DOCKER_CONFIG_FILENAME)
) if os.environ.get('DOCKER_CONFIG') else None

paths = [
config_path, # 1
environment_path, # 2
os.path.join(os.path.expanduser('~'), DOCKER_CONFIG_FILENAME), # 3
os.path.join(
os.path.expanduser('~'), LEGACY_DOCKER_CONFIG_FILENAME
) # 4
]

for path in paths:
if path and os.path.exists(path):
return path
return None


def load_config(config_path=None):
"""
Loads authentication data from a Docker configuration file in the given
root directory or if config_path is passed use given path.
Lookup priority:
explicit config_path parameter > DOCKER_CONFIG environment variable >
~/.docker/config.json > ~/.dockercfg
"""
conf = {}
data = None

# Prefer ~/.docker/config.json.
config_file = config_path or os.path.join(os.path.expanduser('~'),
DOCKER_CONFIG_FILENAME)

log.debug("Trying {0}".format(config_file))

if os.path.exists(config_file):
try:
with open(config_file) as f:
for section, data in six.iteritems(json.load(f)):
if section != 'auths':
continue
log.debug("Found 'auths' section")
return parse_auth(data)
log.debug("Couldn't find 'auths' section")
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
pass
else:
log.debug("File doesn't exist")

config_file = config_path or os.path.join(os.path.expanduser('~'),
LEGACY_DOCKER_CONFIG_FILENAME)

log.debug("Trying {0}".format(config_file))
config_file = find_config_file(config_path)

if not os.path.exists(config_file):
log.debug("File doesn't exist - returning empty config")
if not config_file:
log.debug("File doesn't exist")
return {}

log.debug("Attempting to parse as JSON")
try:
with open(config_file) as f:
return parse_auth(json.load(f))
except Exception as e:
data = json.load(f)
if data.get('auths'):
log.debug("Found 'auths' section")
return parse_auth(data)
else:
log.debug("Couldn't find 'auths' section")
f.seek(0)
return parse_auth(json.load(f))
except (IOError, KeyError, ValueError) as e:
# Likely missing new Docker config file or it's in an
# unknown format, continue to attempt to read old location
# and format.
log.debug(e)
pass

# If that fails, we assume the configuration file contains a single
# authentication token for the public registry in the following format:
#
# auth = AUTH_TOKEN
# email = email@domain.com
log.debug("Attempting to parse legacy auth file format")
try:
data = []
for line in fileinput.input(config_file):
data.append(line.strip().split(' = ')[1])
if len(data) < 2:
# Not enough data
raise errors.InvalidConfigFile(
'Invalid or empty configuration file!')
with open(config_file) as f:
for line in f.readlines():
data.append(line.strip().split(' = ')[1])
if len(data) < 2:
# Not enough data
raise errors.InvalidConfigFile(
'Invalid or empty configuration file!'
)

username, password = decode_auth(data[0])
conf[INDEX_NAME] = {
'username': username,
'password': password,
'email': data[1],
'serveraddress': INDEX_URL,
return {
INDEX_NAME: {
'username': username,
'password': password,
'email': data[1],
'serveraddress': INDEX_URL,
}
}
return conf
except Exception as e:
log.debug(e)
pass
Expand Down
33 changes: 30 additions & 3 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2387,7 +2387,7 @@ def test_load_config(self):
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
cfg = docker.auth.load_config(dockercfg_path)
self.assertTrue(docker.auth.INDEX_NAME in cfg)
assert docker.auth.INDEX_NAME in cfg
self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
cfg = cfg[docker.auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
Expand All @@ -2412,17 +2412,44 @@ def test_load_config_with_random_name(self):
}

with open(dockercfg_path, 'w') as f:
f.write(json.dumps(config))
json.dump(config, f)

cfg = docker.auth.load_config(dockercfg_path)
self.assertTrue(registry in cfg)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)

def test_load_config_custom_config_env(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)

dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': '{0}'.format(auth_),
'email': 'sakuya@scarlet.net'
}
}

with open(dockercfg_path, 'w') as f:
json.dump(config, f)

with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = docker.auth.load_config(None)
assert registry in cfg
self.assertNotEqual(cfg[registry], None)
cfg = cfg[registry]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('auth'), None)

def test_tar_with_excludes(self):
dirs = [
'foo',
Expand Down