-
-
Notifications
You must be signed in to change notification settings - Fork 739
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Kirill Izotov
committed
Jul 29, 2016
1 parent
23497fb
commit 61d972d
Showing
4 changed files
with
206 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
--- | ||
name: "init" | ||
runner_type: "python-script" | ||
description: "Creates a pack and places it in the local content repository." | ||
enabled: true | ||
entry_point: "pack_mgmt/init.py" | ||
parameters: | ||
pack_name: | ||
type: "string" | ||
required: true | ||
abs_repo_base: | ||
type: "string" | ||
default: "/opt/stackstorm/packs/" | ||
immutable: true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Licensed to the StackStorm, Inc ('StackStorm') under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import grp | ||
import os | ||
import stat | ||
|
||
from git.repo import Repo | ||
import yaml | ||
|
||
from st2actions.runners.pythonrunner import Action | ||
|
||
MANIFEST_FILE = 'pack.yaml' | ||
PACK_RESERVE_CHARACTER = '.' | ||
|
||
PACK_GROUP_CFG_KEY = 'pack_group' | ||
EXCHANGE_URL_KEY = 'exchange_url' | ||
|
||
|
||
class InitGitRepoAction(Action): | ||
def run(self, pack_name, abs_repo_base): | ||
if PACK_RESERVE_CHARACTER in pack_name: | ||
raise Exception('Pack name "%s" contains reserve character "%s"' % | ||
(pack_name, PACK_RESERVE_CHARACTER)) | ||
|
||
pack_path = os.path.join(abs_repo_base, pack_name) | ||
|
||
Repo.init(pack_path, mkdir=True) | ||
|
||
meta = { | ||
'name': pack_name | ||
} | ||
|
||
with open(os.path.join(pack_path, MANIFEST_FILE), 'w') as manifest_fp: | ||
yaml.dump(meta, manifest_fp) | ||
|
||
self._apply_pack_permissions(pack_path) | ||
|
||
def _apply_pack_permissions(self, pack_path): | ||
""" | ||
Will recursively apply permission 770 to pack and its contents. | ||
""" | ||
# -1 means don't change | ||
uid = -1 | ||
gid = -1 | ||
pack_group = self.config.get(PACK_GROUP_CFG_KEY, None) | ||
if pack_group: | ||
try: | ||
gid = grp.getgrnam(pack_group).gr_gid | ||
except KeyError: | ||
self.logger.warn('Group not found: %s', pack_group) | ||
# These mask is same as mode = 775 | ||
mode = stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH | ||
os.chmod(pack_path, mode) | ||
|
||
# Yuck! Since os.chmod does not support chmod -R walk manually. | ||
for root, dirs, files in os.walk(pack_path): | ||
for d in dirs: | ||
path = os.path.join(root, d) | ||
os.chown(path, uid, gid) | ||
os.chmod(path, mode) | ||
for f in files: | ||
path = os.path.join(root, f) | ||
os.chown(path, uid, gid) | ||
os.chmod(path, mode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import eventlet | ||
|
||
from kombu import Connection | ||
from kombu.mixins import ConsumerMixin | ||
|
||
from st2common.constants.action import LIVEACTION_COMPLETED_STATES | ||
from st2common import log as logging | ||
from st2common.models.api.execution import ActionExecutionAPI | ||
from st2common.transport import execution, publishers | ||
from st2common.transport import utils as transport_utils | ||
|
||
__all__ = [ | ||
'get_listener', | ||
'get_listener_if_set' | ||
] | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
_listener = None | ||
|
||
|
||
class Listener(ConsumerMixin): | ||
|
||
def __init__(self, connection): | ||
self.connection = connection | ||
self.listeners = [] | ||
|
||
def get_consumers(self, consumer, channel): | ||
queue = execution.get_queue(routing_key=publishers.ANY_RK, | ||
exclusive=True) | ||
return [ | ||
consumer(queues=[queue], | ||
accept=['pickle'], | ||
callbacks=[self.processor]) | ||
] | ||
|
||
def processor(self, body, message): | ||
try: | ||
body = ActionExecutionAPI.from_model(body) | ||
|
||
for listener in self.listeners: | ||
if listener['id'] == body.id and body.status in LIVEACTION_COMPLETED_STATES: | ||
listener['event'].send(body) | ||
self.listeners.remove(listener) | ||
finally: | ||
message.ack() | ||
|
||
def listen(self, id, event): | ||
self.listeners.append({ | ||
'id': id, | ||
'event': event | ||
}) | ||
|
||
|
||
def listen(listener): | ||
try: | ||
listener.run() | ||
finally: | ||
listener.shutdown() | ||
This comment has been minimized.
Sorry, something went wrong. |
||
|
||
|
||
def get_listener(): | ||
global _listener | ||
if not _listener: | ||
with Connection(transport_utils.get_messaging_urls()) as conn: | ||
_listener = Listener(conn) | ||
eventlet.spawn_n(listen, _listener) | ||
return _listener | ||
|
||
|
||
def get_listener_if_set(): | ||
global _listener | ||
return _listener |
1 comment
on commit 61d972d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This commit illustrates how we can deal with calling an action we realistically expect to fit 30 second window for API call. We're basically block request (green)thread until we get a response back from actionrunner.
@Kami WDYT? We don't really expect this connections to be long-living, so I don't see a problem of running a listener here and interrupt them once in a while if they timeout or the service needs to be restarted. But since you're the one who worked on moving st2stream away, I'd like to know your perspective.
Missed, we don't have\need shutdown anymore. Just a reminder for myself to remove it.