779 changes: 0 additions & 779 deletions reprotest/lib/adt_testbed.py
Expand Up @@ -211,8 +211,6 @@ def _opened(self, pl):
if c.startswith('suggested-normal-user='):
self.user = c.split('=', 1)[1]

self.run_setup_commands()

# determine testbed architecture
self.dpkg_arch = self.check_exec(['dpkg', '--print-architecture'], True).strip()
adtlog.info('testbed dpkg architecture: ' + self.dpkg_arch)
Expand Down Expand Up @@ -241,95 +239,6 @@ def close(self):
self.command('close')
self.shared_downtmp = None

def reboot(self, prepare_only=False):
'''Reboot the testbed'''

self.command('reboot', prepare_only and ('prepare-only', ) or ())
self.post_boot_setup()

def run_setup_commands(self):
'''Run --setup-commmands and --copy'''

if not self.setup_commands and not self.add_apt_pockets and not self.copy_files:
return

adtlog.info('@@@@@@@@@@@@@@@@@@@@ test bed setup')
for (host, tb) in self.copy_files:
adtlog.debug('Copying file %s to testbed %s' % (host, tb))
Path(self, host, tb, os.path.isdir(host)).copydown()

# create apt sources for --apt-pocket
for pocket in self.add_apt_pockets:
pocket = pocket.split('=', 1)[0] # strip off package list
script = '''sed -rn 's/^(deb|deb-src) +(\[.*\] *)?([^ ]*(ubuntu.com|debian.org|ftpmaster|file:\/\/\/tmp\/testarchive)[^ ]*) +([^ -]+) +(.*)$/\\1 \\2\\3 \\5-%s \\6/p' /etc/apt/sources.list `ls /etc/apt/sources.list.d/*.list 2>/dev/null|| true)` > /etc/apt/sources.list.d/%s.list; for retry in 1 2 3; do apt-get --no-list-cleanup -o Dir::Etc::sourcelist=/etc/apt/sources.list.d/%s.list -o Dir::Etc::sourceparts=/dev/null update 2>&1 && break || sleep 15; done''' % (pocket, pocket, pocket)
self.check_exec(['sh', '-ec', script])

# create apt pinning for --apt-pocket with package list
for pocket in self.add_apt_pockets:
# do we have a package list?
try:
(pocket, pkglist) = pocket.split('=', 1)
except ValueError:
continue
self._create_apt_pinning_for_packages(pocket, pkglist)

# record the mtimes of dirs affecting the boot
boot_dirs = '/boot /etc/init /etc/init.d /etc/systemd/system /lib/systemd/system'
self.check_exec(['bash', '-ec',
'for d in %s; do [ ! -d $d ] || touch -r $d %s/${d//\//_}.stamp; done' % (
boot_dirs, self.scratch)])

xenv = ['ADT_IS_SETUP_COMMAND=1']
if self.user:
xenv.append('ADT_NORMAL_USER=' + self.user)

for c in self.setup_commands:
rc = self.execute(['sh', '-ec', c], xenv=xenv, kind='install')[0]
if rc:
self.bomb('testbed setup commands failed with status %i' % rc)

# if the setup commands affected the boot, then reboot
if self.setup_commands and 'reboot' in self.caps:
boot_affected = self.execute(
['bash', '-ec', '[ ! -e /run/autopkgtest_no_reboot.stamp ] || exit 0;'
'for d in %s; do s=%s/${d//\//_}.stamp;'
' [ ! -d $d ] || [ `stat -c %%Y $d` = `stat -c %%Y $s` ]; done' % (
boot_dirs, self.scratch)])[0]
if boot_affected:
adtlog.info('rebooting testbed after setup commands that affected boot')
self.reboot()

def reset(self, deps_new, with_recommends):
'''Reset the testbed, if possible and necessary'''

adtlog.debug('testbed reset: modified=%s, deps_installed=%s(r: %s), deps_new=%s(r: %s)' %
(self.modified, self.deps_installed, self.recommends_installed,
deps_new, with_recommends))
if 'revert' in self.caps and (
self.modified or self.recommends_installed != with_recommends or
[d for d in self.deps_installed if d not in deps_new]):
adtlog.debug('testbed reset')
pl = self.command('revert', (), 1)
self._opened(pl)
self.modified = False

def install_deps(self, deps_new, recommends):
'''Install dependencies into testbed'''
adtlog.debug('install_deps: deps_new=%s, recommends=%s' % (deps_new, recommends))

self.deps_installed = deps_new
self.recommends_installed = recommends
if not deps_new:
return
self.satisfy_dependencies_string(', '.join(deps_new), 'install-deps', recommends)

def needs_reset(self):
# show what caused a reset
(fname, lineno, function, code) = traceback.extract_stack(limit=2)[0]
adtlog.debug('needs_reset, previously=%s, requested by %s() line %i' %
(self.modified, function, lineno))
self.modified = True

def bomb(self, m, _type=adtlog.TestbedFailure):
adtlog.debug('%s %s' % (_type.__name__, m))
# self.stop()
Expand Down Expand Up @@ -475,691 +384,3 @@ def check_exec(self, argv, stdout=False, kind='short', xenv=[]):
self.bomb('"%s" failed with status %i' % (' '.join(argv), code),
adtlog.AutopkgtestError)
return out

def install_apt(self, deps, recommends=False, shell_on_failure=False):
'''Install dependencies with apt-get into testbed
This requires root privileges and a writable file system.
'''
# create a dummy deb with the deps
pkgdir = tempfile.mkdtemp(prefix='adt-satdep.')
debdir = os.path.join(pkgdir, 'DEBIAN')
os.chmod(pkgdir, 0o755)
os.mkdir(debdir)
os.chmod(debdir, 0o755)
with open(os.path.join(debdir, 'control'), 'w') as f:
f.write('''Package: adt-satdep
Section: oldlibs
Priority: extra
Maintainer: autogenerated
Version: 0
Architecture: %s
Depends: %s
Description: satisfy autopkgtest test dependencies
''' % (self.dpkg_arch, deps))

deb = TempPath(self, 'adt-satdep.deb')
subprocess.check_call(['dpkg-deb', '-b', pkgdir, deb.host],
stdout=subprocess.PIPE)
shutil.rmtree(pkgdir)
deb.copydown()

# install it and its dependencies in the tb; our apt pinning is not
# very clever wrt. resolving transitional dependencies in the pocket,
# so we might need to retry without pinning
download_fail_retries = 3
while True:
self.check_exec(['dpkg', '--unpack', deb.tb], stdout=subprocess.PIPE)
# capture status-fd to stderr
(rc, _, serr) = self.execute(['/bin/sh', '-ec', '%s apt-get install '
'--assume-yes --fix-broken '
'-o APT::Status-Fd=3 '
'-o APT::Install-Recommends=%s '
'-o Debug::pkgProblemResolver=true 3>&2 2>&1' %
(' '.join(self.eatmydata_prefix), recommends)],
kind='install', stderr=subprocess.PIPE)
if rc != 0:
adtlog.debug('apt-get install failed; status-fd:\n%s' % serr)
# check if apt failed during package download, which might be a
# transient error, so retry
if 'dlstatus:' in serr and 'pmstatus:' not in serr:
download_fail_retries -= 1
if download_fail_retries > 0:
adtlog.warning('apt failed to download packages, retrying in 10s...')
time.sleep(10)
continue
else:
self.bomb('apt repeatedly failed to download packages')

if shell_on_failure:
self.run_shell()
else:
# apt-get -f may succeed, but its solution might remove
# adt-satdep, which is still a failure
rc = self.execute(['dpkg', '--status', 'adt-satdep'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)[0]

if rc != 0:
if self.apt_pin_for_pockets:
pocket = self.apt_pin_for_pockets.pop()
adtlog.warning('Test dependencies are unsatisfiable with using apt pinning. '
'Retrying with using all packages from %s' % pocket)
self.check_exec(['/bin/sh', '-ec', 'rm /etc/apt/preferences.d/autopkgtest-*-' + pocket])
continue

self.badpkg('Test dependencies are unsatisfiable. A common reason is '
'that your testbed is out of date with respect to the '
'archive, and you need to use a current testbed or run '
'apt-get update or use -U.')
break

# remove adt-satdep to avoid confusing tests, but avoid marking our
# test dependencies for auto-removal
out = self.check_exec(['apt-get', '--simulate', '--quiet',
'-o', 'APT::Get::Show-User-Simulation-Note=False',
'--auto-remove',
'purge', 'adt-satdep'],
True)
test_deps = []
for line in out.splitlines():
if not line.startswith('Purg '):
continue
pkg = line.split()[1]
if pkg != 'adt-satdep':
test_deps.append(pkg)
if test_deps:
adtlog.debug('Marking test dependencies as manually installed: %s' %
' '.join(test_deps))
# avoid overly long command lines
batch = 0
while batch < len(test_deps):
self.check_exec(['apt-mark', 'manual', '-qq'] + test_deps[batch:batch + 20])
batch += 20

self.execute(['dpkg', '--purge', 'adt-satdep'])

def install_tmp(self, deps, recommends=False):
'''Unpack dependencies into temporary directory
This is a fallback if the testbed does not have root privileges or a
writable file system, and will only work for packages that can be
used from a different directory with PATH, LD_LIBRARY_PATH, PYTHONPATH
etc. set.
Sets/updates self.install_tmp_env to necessary variables.
'''
unsupported = []
pkg_constraints = {} # pkg -> (relation, version)

# parse deps into pkg_constraints
dep_re = re.compile(
r'(?P<p>[a-z0-9+-.]+)\s*'
r'(\((?P<r><<|<=|>=|=|>>)\s*(?P<v>[^\)]*)\))?$')
for dep in deps.split(','):
dep = dep.strip()
if not dep:
continue # trailing comma
m = dep_re.match(dep)
if not m:
unsupported.append(dep)
continue
pkg_constraints[m.group('p')] = (m.group('r'), m.group('v'))

adtlog.debug('install_tmp: "%s" -> %s, unsupported: %s' %
(deps, pkg_constraints, unsupported))

if unsupported:
adtlog.warning('The following dependencies cannot be handled in '
'reduced "unpack to temporary directory" mode: ' +
', '.join(unsupported))

# simulate installation, grab packages, and check constraints
(rc, out, _) = self.execute(['apt-get', '--quiet', '--simulate', '--no-remove',
'-o', 'Debug::pkgProblemResolver=true',
'-o', 'Debug::NoLocking=true',
'-o', 'APT::Install-Recommends=%s' % recommends,
'-o', 'APT::Get::Show-User-Simulation-Note=False',
'install'] + list(pkg_constraints),
stdout=subprocess.PIPE)
if rc != 0:
self.badpkg('Test dependencies are unsatisfiable. A common reason is '
'that your testbed is out of date with respect to the '
'archive, and you need to use a current testbed, or '
'try "--setup-commands ro-apt-update".')

def check_constraint(pkg, ver):
constraint = pkg_constraints.get(pkg, (None, None))
if constraint[0] is None:
return True
comp = debian_support.version_compare(ver, constraint[1])
if constraint[0] == '<<':
return comp < 0
if constraint[0] == '<=':
return comp <= 0
if constraint[0] == '==':
return comp == 0
if constraint[0] == '>=':
return comp >= 0
if constraint[0] == '>>':
return comp > 0
raise ValueError('invalid dependency version relation %s' % constraint[0])

to_install = []
for line in out.splitlines():
if not line.startswith('Inst '):
continue
fields = line.split()
pkg = fields[1]
if fields[2].startswith('('):
ver = fields[2][1:]
elif fields[3].startswith('('):
ver = fields[3][1:]
else:
raise ValueError('Cannot parse line: %s' % line)
# ignore Python 2 stuff, with PYTHONPATH we can only support one
# Python major version (3)
if pkg.startswith('python-') or pkg.startswith('libpython-') or \
'python2.' in pkg or pkg == 'python':
adtlog.warning('Ignoring Python 2.x dependency %s, not '
'supported in unpack only mode' % pkg)
continue
if not check_constraint(pkg, ver):
self.badpkg('test dependency %s (%s %s) is unsatisfiable: available version %s' %
(pkg, pkg_constraints[pkg][0], pkg_constraints[pkg][1], ver))
to_install.append(pkg)

adtlog.debug('install_tmp: packages to install: %s' % ' '.join(to_install))

if not to_install:
# we already have everything, all good
return

adtlog.warning('virtualisation system does not offer root or writable '
'testbed; unpacking dependencies to temporary dir, '
'which will only work for some packages')

# download and unpack all debs
script = '''d=%(t)s/deps
mkdir -p $d; cd $d
apt-get download %(pkgs)s >&2
for p in *.deb; do dpkg-deb --extract $p .; rm $p; done
# executables
echo PATH=$d/sbin:$d/bin:$d/usr/sbin:$d/usr/bin:$d/usr/games:$PATH
# shared libraries / Qt plugins
l=""
q=""
for candidate in $(find $d -type d \( -name 'lib' -o -path '*/lib/*-linux-*' \)); do
[ -z "$(ls $candidate/*.so $candidate/*.so.* 2>/dev/null)" ] || l="$candidate:$l"
[ -z "$(ls $candidate/lib*qt*.so* 2>/dev/null)" ] || q="$candidate:$q"
done
[ -z "$l" ] || echo LD_LIBRARY_PATH=$l${LD_LIBRARY_PATH:-}
[ -z "$q" ] || echo QT_PLUGIN_PATH="$q"
# ImageMagick needs some hacks to make python[3]-wand find its library
l=""
for ml in $(ls usr/lib/*-linux-*/libMagick*.so.* 2>/dev/null); do
if [ -L $ml ]; then continue; fi
l=$(dirname $ml)
ln -sf $(basename "$ml") "${ml%%.so.*}.so"
done
if [ -n "$l" ]; then
[ -d "$l/lib" ] || ln -sf . "$l/lib"
echo MAGICK_HOME="$d/$l"
fi
# Python modules
p=""
for candidate in $d/usr/lib/python3*/dist-packages; do
[ ! -d $candidate ] || p="$candidate:$p"
done
[ -z "$p" ] || echo PYTHONPATH=$p${PYTHONPATH:-}
# Perl modules
p=""
for candidate in $d/usr/share/perl* $d/usr/lib/perl5 $d/usr/lib/*/perl5/*; do
[ ! -d $candidate ] || p="$candidate:$p"
done
[ -z "$p" ] || echo PERL5LIB=$p${PERL5LIB:-}
# gobject-introspection
l=""
if [ -d $d/usr/lib/girepository-1.0 ]; then
l=$d/usr/lib/girepository-1.0
fi
for candidate in $(find $d -type d -path '*/usr/lib/*/girepository-*'); do
[ -z "$(ls $candidate/*.typelib 2>/dev/null)" ] || l="$candidate:$l"
done
[ -z "$l" ] || echo GI_TYPELIB_PATH="$l:${GI_TYPELIB_PATH:-}"
# udev rules
if [ -n "$(ls $d/lib/udev/rules.d/*.rules 2>/dev/null)" ] && [ -w /run/udev ]; then
mkdir -p /run/udev/rules.d
cp $d/lib/udev/rules.d/*.rules /run/udev/rules.d/
udevadm control --reload
udevadm trigger || true
fi
''' % {'t': self.scratch, 'pkgs': ' '.join(to_install)}
(rc, out, _) = self.execute(['sh', '-euc', script],
stdout=subprocess.PIPE, kind='install')
if rc != 0:
self.bomb('failed to download and unpack test dependencies')
self.install_tmp_env = [l.strip() for l in out.splitlines() if l]
adtlog.debug('install_tmp: env is now %s' % self.install_tmp_env)

def install_click(self, clickpath):
# copy click into testbed
tp = Path(self, clickpath, os.path.join(
self.scratch, os.path.basename(clickpath)))
tp.copydown()
# install it
clickopts = ['--all-users']
if 'ADT_CLICK_NO_FRAMEWORK_CHECK' in os.environ:
# this is mostly for testing
clickopts.append('--force-missing-framework')
if 'root-on-testbed' in self.caps:
rc = self.execute(['click', 'install', '--allow-unauthenticated'] +
clickopts + [tp.tb], kind='install')[0]
else:
rc = self.execute(['pkcon', 'install-local', '--allow-untrusted',
tp.tb], kind='install')[0]
if rc != 0:
self.badpkg('click install failed with status %i' % rc)

# work around https://launchpad.net/bugs/1333215
# we don't want su -l here which resets the environment from
# self.execute(); so emulate the parts that we want
# FIXME: move "run as user" as an argument of execute()/check_exec() and run with -l
self.check_exec(['su', '--shell=/bin/sh', self.user, '-c',
('export USER=%s;' % self.user) +
'. /etc/profile >/dev/null 2>&1 || true; '
' . ~/.profile >/dev/null 2>&1 || true; '
'[ -z "$UPSTART_SESSION" ] || /sbin/initctl --user start click-user-hooks'])

def apparmor_click(self, clickpkgs, installed_clicks):
'''Update AppArmor rules for click tests
Return True if anything was modified and apparmor_restore_click()
needs to be called.
'''
# check if we are in a click+AppArmor environment
if self.execute(['sh', '-ec',
'[ -d /var/cache/apparmor -a -d /var/lib/apparmor/clicks -a ! -e /var/cache/apparmor/click-ap.rules ] && '
'type aa-clickhook >/dev/null 2>&1'])[0] != 0:
adtlog.debug('testbed does not have AppArmor/click or already has Autopilot click rules, no need to adjust rules')
return False
adtlog.debug('testbed has AppArmor/click')

if 'root-on-testbed' not in self.caps:
adtlog.warning('Cannot adjust AppArmor rules without root/sudo '
'privileges; Autopilot tests will fail and test '
'dependencies will not be available!')
return False

rules = 'dbus (receive, send) bus=session path=/com/canonical/Autopilot/**,'
for e in self.install_tmp_env:
if e.startswith('QT_PLUGIN_PATH='):
for p in e.split('=', 1)[1].split(':'):
p = p.strip()
if p:
rules += ' %s/** r,' % p
break

script = '''echo '%s' > /var/cache/apparmor/click-ap.rules; ''' % rules

if clickpkgs or installed_clicks:
adtlog.info('Updating AppArmor rules to allow autopilot introspection for tested clicks')
script += 'for c in %s; do ' \
' info=$(click info %s %s/$(basename "$c")); ' \
''' name=$(echo "$info" | sed -rn '/"name"/ {s/^.*: *"([^"]+)",/\\1/; p}'); ''' \
''' version=$(echo "$info" | sed -rn '/"version"/ {s/^.*: *"([^"]+)",/\\1/; p}'); ''' \
' touch -h /var/lib/apparmor/clicks/${name}_*_${version}.json >/dev/null || true; '\
'done; ' \
'for c in %s; do ' \
' touch -h /var/lib/apparmor/clicks/${c}_*.json 2>/dev/null || true; ' \
'done; ' \
'aa-clickhook --include=/var/cache/apparmor/click-ap.rules' % (
' '.join(clickpkgs),
self.user and ('--user ' + self.user) or '',
self.scratch,
' '.join(installed_clicks))
else:
adtlog.info('Updating AppArmor rules to allow autopilot introspection for all clicks (will take a minute)...')
script += 'aa-clickhook --force --include=/var/cache/apparmor/click-ap.rules'

if self.execute(['sh', adtlog.verbosity >= 2 and '-exc' or '-ec', script], kind='install')[0] != 0:
self.bomb('Failed to update click AppArmor rules')

return True

def apparmor_restore_click(self, clickpkgs, installed_clicks):
'''Restore AppArmor rules after click tests'''

adtlog.info('Restoring click package AppArmor rules')
# if we only modified some clicks above, --force will be fast, so it's
# ok to always do that
script = 'rm -f /var/cache/apparmor/click-ap.rules; aa-clickhook --force'
if self.execute(['sh', adtlog.verbosity >= 2 and '-exc' or '-ec', script], kind='install')[0] != 0:
self.bomb('Failed to update click AppArmor rules')

def satisfy_dependencies_string(self, deps, what, recommends=False,
build_dep=False, shell_on_failure=False):
'''Install dependencies from a string into the testbed'''

adtlog.debug('%s: satisfying %s' % (what, deps))

# ignore ":native" tags, apt cannot parse them and deps_parse() does
# not seem to have an option to get rid of them; we always test on the
# native platform
deps = deps.replace(':native', '')

# resolve arch specific dependencies; don't use universal_newlines
# here, it's broken for stdin on Python 3.2
if build_dep:
extra_args = ', reduce_profiles => $supports_profiles, build_dep => 1'
else:
extra_args = ''
perl = subprocess.Popen(['perl', '-'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
code = '''use Dpkg::Deps;
$supports_profiles = ($Dpkg::Deps::VERSION gt '1.04' or 0);
$origdeps = '%s';
$origdeps =~ s/(^|,)[^<,]+<[^!,>]+>//g if (!$supports_profiles);
$dep = deps_parse($origdeps, reduce_arch => 1, host_arch => '%s' %s);
$out = $dep->output();
# fall back to ignoring build profiles
$out =~ s/ <![^ >]+>//g if (!$supports_profiles);
print $out, "\\n";
''' % (deps, self.dpkg_arch, extra_args)
deps = perl.communicate(code.encode('UTF-8'))[0].decode('UTF-8').strip()
if perl.returncode != 0:
self.bomb('failed to run perl for parsing dependencies')
adtlog.debug('%s: architecture resolved: %s' % (what, deps))

# check if we can use apt-get
can_apt_get = False
if 'root-on-testbed' in self.caps:
rc = self.execute(['test', '-w', '/var/lib/dpkg/status'])[0]
if rc == 0:
can_apt_get = True
adtlog.debug('can use apt-get on testbed: %s' % can_apt_get)

if can_apt_get:
self.install_apt(deps, recommends, shell_on_failure)
else:
self.install_tmp(deps, recommends)

def run_shell(self, cwd=None, extra_env=[]):
'''Run shell in testbed for debugging tests'''

adtlog.info(' - - - - - - - - - - running shell - - - - - - - - - -')
self.command('shell', [cwd or '/'] + self.install_tmp_env + extra_env)

def run_test(self, tree, test, extra_env=[], shell_on_failure=False,
shell=False, build_parallel=None):
'''Run given test in testbed
tree (a Path) is the source tree root.
'''
def _info(m):
adtlog.info('test %s: %s' % (test.name, m))

self.last_test_name = test.name

if test.path and not os.path.exists(os.path.join(tree.host, test.path)):
self.badpkg('%s does not exist' % test.path)

for c in test.clicks:
self.install_click(c)
need_click_restore = self.apparmor_click(test.clicks, test.installed_clicks)

# record installed package versions
if self.output_dir and self.execute(['which', 'dpkg-query'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)[0] == 0:
pkglist = TempPath(self, test.name + '-packages.all', autoclean=False)
self.check_exec([
'sh', '-ec', "dpkg-query --show -f '${Package}\\t${Version}\\n' > %s" % pkglist.tb])
pkglist.copyup()

# filter out packages from the base system
with open(pkglist.host[:-4], 'w') as out:
join = subprocess.Popen(['join', '-v2', '-t\t',
os.path.join(self.output_dir, 'testbed-packages'), pkglist.host],
stdout=out, env={})
join.communicate()
if join.returncode != 0:
self.badpkg('failed to call join for test specific package list, code %d' % join.returncode)
os.unlink(pkglist.host)

# ensure our tests are in the testbed
tree.copydown(check_existing=True)

# stdout/err files in testbed
so = TempPath(self, test.name + '-stdout', autoclean=False)
se = TempPath(self, test.name + '-stderr', autoclean=False)

# create script to run test
test_artifacts = '%s/%s-artifacts' % (self.scratch, test.name)
adttmp = '%s/adttmp' % (self.scratch)
assert self.nproc is not None
script = 'set -e; ' \
'export USER=`id -nu`; ' \
'. /etc/profile >/dev/null 2>&1 || true; ' \
' . ~/.profile >/dev/null 2>&1 || true; ' \
'buildtree="%(t)s"; ' \
'mkdir -p -m 1777 -- "%(a)s"; ' \
'export ADT_ARTIFACTS="%(a)s"; ' \
'mkdir -p -m 755 "%(tmp)s"; export ADTTMP="%(tmp)s" ' \
'export DEBIAN_FRONTEND=noninteractive; ' \
'export LANG=C.UTF-8; ' \
'''export DEB_BUILD_OPTIONS=parallel=%(cpu)s; ''' \
'unset LANGUAGE LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE '\
' LC_MONETARY LC_MESSAGES LC_PAPER LC_NAME LC_ADDRESS '\
' LC_TELEPHONE LC_MEASUREMENT LC_IDENTIFICATION LC_ALL;' \
'rm -f /tmp/adt_test_script_pid; set -C; echo $$ > /tmp/adt_test_script_pid; set +C; ' \
'trap "rm -f /tmp/adt_test_script_pid" EXIT INT QUIT PIPE; '\
'cd "$buildtree"; '\
% {'t': tree.tb, 'a': test_artifacts, 'tmp': adttmp,
'cpu': build_parallel or self.nproc}

for e in extra_env:
script += 'export \'%s\'; ' % e
# there's no way to tell su to not reset $PATH, for install-tmp mode;
# we also need it to amend fixed values in /etc/environment
for e in self.install_tmp_env:
script += 'export %s; ' % e
# if we have an user upstart session, poke the environment into it
if self.install_tmp_env:
script += 'if [ -n "$UPSTART_SESSION" ]; then '
for e in self.install_tmp_env:
script += ' initctl --user set-env "%s"; ' % e
script += 'fi; '

if test.path:
test_cmd = os.path.join(tree.tb, test.path)
script += 'chmod +x %s; ' % test_cmd
else:
test_cmd = "bash -ec '%s'" % test.command

script += 'touch %(o)s %(e)s; ' \
'%(t)s 2> >(tee -a %(e)s >&2) > >(tee -a %(o)s); ' \
% {'t': test_cmd, 'o': so.tb, 'e': se.tb}

if 'needs-root' not in test.restrictions and self.user is not None:
if 'root-on-testbed' not in self.caps:
self.bomb('cannot change to user %s without root-on-testbed' % self.user,
adtlog.AutopkgtestError)
# we don't want -l here which resets the environment from
# self.execute(); so emulate the parts that we want
# FIXME: move "run as user" as an argument of execute()/check_exec() and run with -l
test_argv = ['su', '-s', '/bin/bash', self.user, '-c']

if 'rw-build-tree' in test.restrictions:
self.check_exec(['chown', '-R', self.user, tree.tb])
else:
# this ensures that we have a PAM/logind session for root tests as
# well; with some interfaces like ttyS1 or lxc_attach we don't log
# in to the testbed
if 'root-on-testbed' in self.caps:
test_argv = ['su', '-s', '/bin/bash', 'root', '-c']
else:
test_argv = ['bash', '-c']

# run test script
if test.command:
_info(test.command)
_info('[-----------------------')

# tests may reboot, so we might need to run several times
self.last_reboot_marker = ''
timeout = False
while True:
if self.last_reboot_marker:
script_prefix = 'export ADT_REBOOT_MARK="%s"; ' % self.last_reboot_marker
else:
script_prefix = ''
try:
rc = self.execute(test_argv + [script_prefix + script], kind='test')[0]
except VirtSubproc.Timeout:
rc = 1
timeout = True
break

# did the test invoke autopkgtest-reboot?
if os.WIFSIGNALED(rc) and os.WTERMSIG(rc) == signal.SIGKILL and 'reboot' in self.caps:
adtlog.debug('test process SIGKILLed, checking for reboot marker')
(code, reboot_marker, err) = self.execute(
['cat', '/run/autopkgtest-reboot-mark'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if code == 0:
self.last_reboot_marker = reboot_marker.strip()
adtlog.info('test process requested reboot with marker %s' % self.last_reboot_marker)
self.reboot()
continue

adtlog.debug('test process SIGKILLed, checking for prepare-reboot marker')
(code, reboot_marker, err) = self.execute(
['cat', '/run/autopkgtest-reboot-prepare-mark'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if code == 0:
self.last_reboot_marker = reboot_marker.strip()
adtlog.info('test process requested preparation for reboot with marker %s' % self.last_reboot_marker)
self.reboot(prepare_only=True)
continue

adtlog.debug('no reboot marker, considering a failure')
break

# give the setup_trace() cats some time to catch up
sys.stdout.flush()
sys.stderr.flush()
time.sleep(0.3)
_info('-----------------------]')
adtlog.debug('testbed executing test finished with exit status %i' % rc)

# copy stdout/err files to host
so.copyup()
se.copyup()
se_size = os.path.getsize(se.host)

# avoid mixing up stdout (from report) and stderr (from logging) in output
sys.stdout.flush()
sys.stderr.flush()
time.sleep(0.1)

_info(' - - - - - - - - - - results - - - - - - - - - -')

if timeout:
test.failed('timed out')
elif rc != 0:
test.failed('non-zero exit status %d' % rc)
elif se_size != 0 and 'allow-stderr' not in test.restrictions:
with open(se.host, encoding='UTF-8') as f:
stderr_top = f.readline().rstrip('\n \t\r')
test.failed('stderr: %s' % stderr_top)
else:
test.passed()

sys.stdout.flush()
sys.stderr.flush()

if os.path.getsize(so.host) == 0:
# don't produce empty -stdout files in --output-dir
so.autoclean = True

if se_size != 0 and 'allow-stderr' not in test.restrictions:
# give tee processes some time to catch up, to avoid mis-ordered logs
time.sleep(0.2)
_info(' - - - - - - - - - - stderr - - - - - - - - - -')
with open(se.host, 'rb') as f:
while True:
block = f.read1(1000000)
if not block:
break
sys.stderr.buffer.write(block)
sys.stderr.buffer.flush()
else:
# don't produce empty -stderr files in --output-dir
if se_size == 0:
se.autoclean = True

# copy artifacts to host, if we have --output-dir
if self.output_dir:
ap = Path(self, os.path.join(self.output_dir, 'artifacts'),
test_artifacts, is_dir=True)
ap.copyup()
# don't keep an empty artifacts dir around
if not os.listdir(ap.host):
os.rmdir(ap.host)

if shell or (shell_on_failure and not test.result):
self.run_shell(tree.tb, ['ADT_ARTIFACTS="%s"' % test_artifacts,
'ADTTMP="%s"' % adttmp])

# clean up artifacts and ADTTMP dirs
self.check_exec(['rm', '-rf', test_artifacts, adttmp])

if need_click_restore:
self.apparmor_restore_click(test.clicks, test.installed_clicks)
else:
adtlog.debug('no need to restore click AppArmor profiles')

#
# helper methods
#

def _create_apt_pinning_for_packages(self, pocket, pkglist):
'''Create apt pinning for --apt-pocket=pocket=pkglist'''

# sort pkglist into source and binary packages
binpkgs = []
srcpkgs = []
for i in pkglist.split(','):
i = i.strip()
if i.startswith('src:'):
srcpkgs.append(i[4:])
else:
binpkgs.append(i)

# get release name
script = 'SRCS=$(ls /etc/apt/sources.list /etc/apt/sources.list.d/*.list 2>/dev/null|| true); '
script += '''REL=$(sed -rn '/^(deb|deb-src) .*(ubuntu.com|debian.org|ftpmaster|file:\/\/\/tmp\/testarchive)/ { s/^[^ ]+ +(\[.*\] *)?[^ ]* +([^ -]+) +.*$/\\2/p}' $SRCS | head -n1); '''

script += 'mkdir -p /etc/apt/preferences.d; '
script += 'PKGS="%s"; ' % ' '.join(binpkgs)

# translate src:name entries into binaries of that source
if srcpkgs:
script += 'PKGS="$PKGS $(apt-cache showsrc %s | ' \
'''awk '/^Package-List:/ { show=1; next } (/^ / && show==1) { print $1; next } { show=0 }' |''' \
'''sort -u | tr '\\n' ' ')"; ''' % \
' '.join(srcpkgs)

# prefer given packages from pocket, other packages from
# default $REL (prio 900), but make $REL-pocket available for
# dependency resolution (prio 800)
script += 'printf "Package: $PKGS\\nPin: release a=${REL}-%(pocket)s\\nPin-Priority: 990\\n\\nPackage: *\\nPin: release a=$REL\\nPin-Priority: 900\\n\\nPackage: *\\nPin: release a=${REL}-updates\\nPin-Priority: 900\\n\\nPackage: *\\nPin: release a=${REL}-%(pocket)s\\nPin-Priority: 800\\n" > /etc/apt/preferences.d/autopkgtest-${REL}-%(pocket)s; ' % \
{'pocket': pocket}
self.check_exec(['sh', '-ec', script])
self.apt_pin_for_pockets.append(pocket)