Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into develop
Browse files Browse the repository at this point in the history
* upstream/develop: (37 commits)
  Use ipaddress to validate ipv4/ipv6 (saltstack#33956)
  Updated winrepo_test (saltstack#34227)
  Whitespace fix for saltstack#34235 (saltstack#34250)
  message_format was not set in the correct function (saltstack#34235)
  Typo in dockerio doc (saltstack#34244)
  Fixup test failure
  Pass through update_holds to pkg.install
  fix regression from saltstack#33681 which causes pulling a list of s3 objects via s3.query to fail (saltstack#34208)
  fix regression in s3.query from saltstack#33599 that causes pulling a file list from s3 to fail (saltstack#34207)
  utils/parsers.py: disable minion multiprocessing logging if only running one process
  Fix a pair of gitfs bugs (saltstack#34218)
  rsync state: Removed source existance check saltstack#25251
  Linted saltstack#34200
  Allow specifying memory, cpu and vcpu as kwargs for OpenNebula (saltstack#34203)
  Fixes saltstack#34181 no more newlines in long yaml encodes
  Clarify pkg.list_repo_pkgs docstring for held packages (saltstack#34188)
  Update saltutil.wheel docs to specify remote vs local minion behavior
  fix regression in s3.query from saltstack#33682
  Change target for dockerng assuming default status to Nitrogen release (saltstack#34206)
  Correct the docstrings formatting in pkgbuild modules and state (saltstack#34194)
  ...
  • Loading branch information
jojohans committed Jun 23, 2016
2 parents 59df394 + 8f5fa5c commit 405e355
Show file tree
Hide file tree
Showing 29 changed files with 917 additions and 612 deletions.
6 changes: 1 addition & 5 deletions salt/cli/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,7 @@ def prepare(self):
# Late import so logging works correctly
import salt.minion
self.daemonize_if_required()
# if its a multisyndic, do so
if isinstance(self.config.get('master'), list):
self.syndic = salt.minion.MultiSyndic(self.config)
else:
self.syndic = salt.minion.Syndic(self.config)
self.syndic = salt.minion.SyndicManager(self.config)
self.set_pidfile()

def start(self):
Expand Down
31 changes: 27 additions & 4 deletions salt/cloud/clouds/opennebula.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,12 +847,28 @@ def create(vm_):
vm\_
The dictionary use to create a VM.
Optional vm_ dict options for overwriting template:
region_id
Optional - OpenNebula Zone ID
memory
Optional - In MB
cpu
Optional - Percent of host CPU to allocate
vcpu
Optional - Amount of vCPUs to allocate
CLI Example:
.. code-block:: bash
salt-cloud -p my-opennebula-profile vm_name
salt-cloud -p my-opennebula-profile vm_name memory=16384 cpu=2.5 vcpu=16
'''
try:
# Check for required profile parameters before sending any API calls.
Expand Down Expand Up @@ -901,17 +917,24 @@ def create(vm_):
{'kwargs': kwargs},
)

region = ''
if kwargs['region_id'] is not None:
region = 'SCHED_REQUIREMENTS="ID={0}"'.format(kwargs['region_id'])
template = []
if kwargs.get('region_id'):
template.append('SCHED_REQUIREMENTS="ID={0}"'.format(kwargs.get('region_id')))
if vm_.get('memory'):
template.append('MEMORY={0}'.format(vm_.get('memory')))
if vm_.get('cpu'):
template.append('CPU={0}'.format(vm_.get('cpu')))
if vm_.get('vcpu'):
template.append('VCPU={0}'.format(vm_.get('vcpu')))
template_args = "\n".join(template)
try:
server, user, password = _get_xml_rpc()
auth = ':'.join([user, password])
cret = server.one.template.instantiate(auth,
int(kwargs['template_id']),
kwargs['name'],
False,
region)
template_args)
if not cret[0]:
log.error(
'Error creating {0} on OpenNebula\n\n'
Expand Down
12 changes: 4 additions & 8 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,9 @@
'preserve_minion_cache': bool,
'syndic_master': (string_types, list),

# The behaviour of the multisyndic when connection to a master of masters failed. Can specify
# 'random' (default) or 'ordered'. If set to 'random' masters will be iterated in random order
# if 'ordered' the configured order will be used.
# The behaviour of the multimaster syndic when connection to a master of masters failed. Can
# specify 'random' (default) or 'ordered'. If set to 'random' masters will be iterated in random
# order if 'ordered' the configured order will be used.
'syndic_failover': str,
'runner_dirs': list,
'client_acl': dict,
Expand Down Expand Up @@ -737,9 +737,6 @@
# The number of seconds for a syndic to poll for new messages that need to be forwarded
'syndic_event_forward_timeout': float,

# The number of seconds for the syndic to spend polling the event bus
'syndic_max_event_process_time': float,

# The length that the syndic event queue must hit before events are popped off and forwarded
'syndic_jid_forward_cache_hwm': int,

Expand Down Expand Up @@ -1285,7 +1282,6 @@
'transport': 'zeromq',
'gather_job_timeout': 10,
'syndic_event_forward_timeout': 0.5,
'syndic_max_event_process_time': 0.5,
'syndic_jid_forward_cache_hwm': 100,
'ssh_passwd': '',
'ssh_port': '22',
Expand Down Expand Up @@ -2920,7 +2916,7 @@ def get_id(opts, cache_minion_id=False):
log.debug('Found minion id from generate_minion_id(): {0}'.format(newid))
if cache_minion_id and opts.get('minion_id_caching', True):
_cache_id(newid, id_cache)
is_ipv4 = newid.count('.') == 3 and not any(c.isalpha() for c in newid)
is_ipv4 = salt.utils.network.is_ipv4(newid)
return newid, is_ipv4


Expand Down
8 changes: 4 additions & 4 deletions salt/engines/sqs_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __virtual__():
log = logging.getLogger(__name__)


def _get_sqs_conn(profile, region=None, key=None, keyid=None, message_format=None):
def _get_sqs_conn(profile, region=None, key=None, keyid=None):
'''
Get a boto connection to SQS.
'''
Expand All @@ -91,8 +91,6 @@ def _get_sqs_conn(profile, region=None, key=None, keyid=None, message_format=Non
keyid = _profile.get('keyid', None)
region = _profile.get('region', None)

if not message_format:
message_format = __opts__.get('sqs.message_format', None)
if not region:
region = __opts__.get('sqs.region', 'us-east-1')
if not key:
Expand Down Expand Up @@ -121,6 +119,8 @@ def start(queue, profile=None, tag='salt/engine/sqs'):
else:
fire_master = None

message_format = __opts__.get('sqs.message_format', None)

def fire(tag, msg):
if fire_master:
fire_master(msg, tag)
Expand All @@ -140,7 +140,7 @@ def fire(tag, msg):
continue
msgs = q.get_messages(wait_time_seconds=20)
for msg in msgs:
if sqs.message_format == "json":
if message_format == "json":
fire(tag, {'message': json.loads(msg.get_body())})
else:
fire(tag, {'message': msg.get_body()})
Expand Down
124 changes: 16 additions & 108 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2205,7 +2205,7 @@ def timeout_handler(*args):
callback=lambda _: None,
**kwargs)

def _fire_master_syndic_start(self):
def fire_master_syndic_start(self):
# Send an event to the master that the minion is live
self._fire_master(
'Syndic {0} started at {1}'.format(
Expand All @@ -2224,45 +2224,6 @@ def _fire_master_syndic_start(self):
sync=False,
)

# Syndic Tune In
@tornado.gen.coroutine
def tune_in(self, start=True):
'''
Lock onto the publisher. This is the main event loop for the syndic
'''
log.debug('Syndic \'{0}\' trying to tune in'.format(self.opts['id']))

if start:
self.sync_connect_master()

# Instantiate the local client
self.local = salt.client.get_local_client(
self.opts['_minion_conf_file'], io_loop=self.io_loop)
self.local.event.subscribe('')
self.local.opts['interface'] = self._syndic_interface

# add handler to subscriber
self.pub_channel.on_recv(self._process_cmd_socket)

# register the event sub to the poller
self._reset_event_aggregation()
self.local.event.set_event_handler(self._process_event)

# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
self.opts['syndic_event_forward_timeout'] * 1000,
io_loop=self.io_loop)
self.forward_events.start()

# Send an event to the master that the minion is live
self._fire_master_syndic_start()

# Make sure to gracefully handle SIGUSR1
enable_sigusr1_handler()

if start:
self.io_loop.start()

# TODO: clean up docs
def tune_in_no_block(self):
'''
Expand All @@ -2285,49 +2246,6 @@ def _process_cmd_socket(self, payload):
# In the future, we could add support for some clearfuncs, but
# the syndic currently has no need.

def _reset_event_aggregation(self):
self.jids = {}
self.raw_events = []

def _process_event(self, raw):
# TODO: cleanup: Move down into event class
mtag, data = self.local.event.unpack(raw, self.local.event.serial)
event = {'data': data, 'tag': mtag}
log.trace('Got event {0}'.format(event['tag'])) # pylint: disable=no-member
tag_parts = event['tag'].split('/')
if len(tag_parts) >= 4 and tag_parts[1] == 'job' and \
salt.utils.jid.is_jid(tag_parts[2]) and tag_parts[3] == 'ret' and \
'return' in event['data']:
if 'jid' not in event['data']:
# Not a job return
return
jdict = self.jids.setdefault(event['data']['jid'], {})
if not jdict:
jdict['__fun__'] = event['data'].get('fun')
jdict['__jid__'] = event['data']['jid']
jdict['__load__'] = {}
fstr = '{0}.get_load'.format(self.opts['master_job_cache'])
# Only need to forward each load once. Don't hit the disk
# for every minion return!
if event['data']['jid'] not in self.jid_forward_cache:
jdict['__load__'].update(
self.mminion.returners[fstr](event['data']['jid'])
)
self.jid_forward_cache.add(event['data']['jid'])
if len(self.jid_forward_cache) > self.opts['syndic_jid_forward_cache_hwm']:
# Pop the oldest jid from the cache
tmp = sorted(list(self.jid_forward_cache))
tmp.pop(0)
self.jid_forward_cache = set(tmp)
if 'master_id' in event['data']:
# __'s to make sure it doesn't print out on the master cli
jdict['__master_id__'] = event['data']['master_id']
jdict[event['data']['id']] = event['data']['return']
else:
# Add generic event aggregation here
if 'retcode' not in event['data']:
self.raw_events.append(event)

@tornado.gen.coroutine
def _return_pub_multi(self, values):
for value in values:
Expand All @@ -2336,19 +2254,6 @@ def _return_pub_multi(self, values):
timeout=self._return_retry_timer(),
sync=False)

def _forward_events(self):
log.trace('Forwarding events') # pylint: disable=no-member
if self.raw_events:
events = self.raw_events
self.raw_events = []
self._fire_master(events=events,
pretag=tagify(self.opts['id'], base='syndic'),
sync=False)
if self.jids and (self.pub_future is None or self.pub_future.done()):
values = self.jids.values()
self.jids = {}
self.pub_future = self._return_pub_multi(values)

@tornado.gen.coroutine
def reconnect(self):
if hasattr(self, 'pub_channel'):
Expand Down Expand Up @@ -2382,11 +2287,10 @@ def destroy(self):
self.forward_events.stop()


# TODO: consolidate syndic classes together?
# need a way of knowing if the syndic connection is busted
class MultiSyndic(MinionBase):
# TODO: need a way of knowing if the syndic connection is busted
class SyndicManager(MinionBase):
'''
Make a MultiSyndic minion, this minion will handle relaying jobs and returns from
Make a MultiMaster syndic minion, this minion will handle relaying jobs and returns from
all minions connected to it to the list of masters it is connected to.
Modes (controlled by `syndic_mode`:
Expand All @@ -2409,7 +2313,7 @@ class MultiSyndic(MinionBase):

def __init__(self, opts, io_loop=None):
opts['loop_interval'] = 1
super(MultiSyndic, self).__init__(opts)
super(SyndicManager, self).__init__(opts)
self.mminion = salt.minion.MasterMinion(opts)
# sync (old behavior), cluster (only returns and publishes)
self.syndic_mode = self.opts.get('syndic_mode', 'sync')
Expand Down Expand Up @@ -2443,7 +2347,10 @@ def _spawn_syndics(self):
Spawn all the coroutines which will sign in the syndics
'''
self._syndics = OrderedDict() # mapping of opts['master'] -> syndic
for master in self.opts['master']:
masters = self.opts['master']
if not isinstance(masters, list):
masters = [masters]
for master in masters:
s_opts = copy.copy(self.opts)
s_opts['master'] = master
self._syndics[master] = self._connect_syndic(s_opts)
Expand All @@ -2466,6 +2373,10 @@ def _connect_syndic(self, opts):
yield syndic.connect_master()
# set up the syndic to handle publishes (specifically not event forwarding)
syndic.tune_in_no_block()

# Send an event to the master that the minion is live
syndic.fire_master_syndic_start()

log.info('Syndic successfully connected to {0}'.format(opts['master']))
break
except SaltClientError as exc:
Expand Down Expand Up @@ -2563,10 +2474,6 @@ def iter_master_options(self, master_id=None):
break
master_id = masters.pop(0)

def _reset_event_aggregation(self):
self.job_rets = {}
self.raw_events = []

# Syndic Tune In
def tune_in(self):
'''
Expand All @@ -2578,10 +2485,11 @@ def tune_in(self):
self.opts['_minion_conf_file'], io_loop=self.io_loop)
self.local.event.subscribe('')

log.debug('MultiSyndic \'{0}\' trying to tune in'.format(self.opts['id']))
log.debug('SyndicManager \'{0}\' trying to tune in'.format(self.opts['id']))

# register the event sub to the poller
self._reset_event_aggregation()
self.job_rets = {}
self.raw_events = []
self.local.event.set_event_handler(self._process_event)

# forward events every syndic_event_forward_timeout
Expand Down
17 changes: 14 additions & 3 deletions salt/modules/cmdmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2309,10 +2309,21 @@ def exec_code_all(lang, code, cwd=None):
salt '*' cmd.exec_code_all ruby 'puts "cheese"'
'''
codefile = salt.utils.mkstemp()
with salt.utils.fopen(codefile, 'w+t') as fp_:
powershell = lang.lower().startswith("powershell")

if powershell:
codefile = salt.utils.mkstemp(suffix=".ps1")
else:
codefile = salt.utils.mkstemp()

with salt.utils.fopen(codefile, 'w+t', binary=False) as fp_:
fp_.write(code)
cmd = [lang, codefile]

if powershell:
cmd = [lang, "-File", codefile]
else:
cmd = [lang, codefile]

ret = run_all(cmd, cwd=cwd, python_shell=False)
os.remove(codefile)
return ret
Expand Down
Loading

0 comments on commit 405e355

Please sign in to comment.