-
Notifications
You must be signed in to change notification settings - Fork 1
/
answersprocessor.py
56 lines (47 loc) · 1.9 KB
/
answersprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from argparse import Namespace
from typing import Sequence, List
from pip._vendor.packaging.markers import Op
import puppeter
from puppeter import container
from puppeter.domain.facter import Facter
from puppeter.domain.gateway.answers import AnswersProcessor
from puppeter.domain.gateway.installer import InstallerGateway
from puppeter.domain.model import osfacts
from puppeter.domain.model.answers import Answers
from puppeter.domain.model.configurer import Configurer
from puppeter.presentation.app import Options
class AnswersProcessorImpl(AnswersProcessor):
def __init__(self, options):
self.options = options # type: Options
self.__log = puppeter.get_logger(AnswersProcessorImpl)
def process(self, answers):
configurers = []
configurers.extend(self.__perform_installation(answers))
commands = self.__collect_commands(configurers)
for line in commands:
if self.options.execute():
self.__log.warning('EXECUTING: %s', line)
else:
print(line)
@staticmethod
def __perform_installation(answers):
# type: (Answers) -> Sequence[Configurer]
osfamily = Facter.get(osfacts.OsFamily)
gateway = container.get_named(InstallerGateway, osfamily.name.lower())
return gateway.configurers(answers.installer())
@staticmethod
def __collect_commands(configurers):
# type: (List[Configurer]) -> Sequence[str]
sorted_cfg = sorted(configurers, key=AnswersProcessorImpl.__sort_by_order)
commands = []
for configurer in sorted_cfg:
commands.extend(configurer.produce_commands())
return commands
@staticmethod
def __sort_by_order(configurer):
# type: (Configurer) -> int
try:
# noinspection PyUnresolvedReferences
return configurer.order()
except AttributeError:
return 500