Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
windreamer committed Sep 19, 2016
1 parent aafdb14 commit 04501d1
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 76 deletions.
27 changes: 16 additions & 11 deletions pymesos/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ class MesosExecutorDriver(Process, ExecutorDriver):
def __init__(self, executor):
env = os.environ
self.local = bool(env.get('MESOS_LOCAL'))
slave_pid = env.get('MESOS_SLAVE_PID')
assert '@' in slave_pid
addr = slave_pid.split('@', 2)[1]
agent_endpoint = env['MESOS_AGENT_ENDPOINT']
framework_id = env['MESOS_FRAMEWORK_ID']
assert framework_id
self.framework_id = dict(value=framework_id)
Expand All @@ -41,7 +39,7 @@ def __init__(self, executor):
self.tasks = {}
self.updates = {}
self._conn = None
super(MesosExecutorDriver, self).__init__(master=addr)
super(MesosExecutorDriver, self).__init__(master=agent_endpoint)

def _delay_kill(self):
def _():
Expand Down Expand Up @@ -86,12 +84,17 @@ def on_close(self):
self.abort()

def on_event(self, event):
logging.error('event:%s', event)

if 'type' in event:
_type = event['type'].lower()
if _type == 'shutdown':
self.on_shutdown()
return

if _type == 'heartbeat':
return

if _type not in event:
logger.error(
'Missing `%s` in event %s' %
Expand All @@ -108,14 +111,11 @@ def on_event(self, event):
else:
logger.error('Unknown event:%s' % (event,))

def on_heartbeat(self, _):
pass

def on_subscribed(self, info):
executor_info = info['executor_info']
framework_info = info['framework_info']
agent_info = info.get('agent_info'. info['slave_info'])
assert executor_info['id'] == self.executor_id
agent_info = info['agent_info']
assert executor_info['executor_id'] == self.executor_id
assert framework_info['id'] == self.framework_id

if self.executor_info is None or self.framework_info is None:
Expand All @@ -128,7 +128,7 @@ def on_subscribed(self, info):

def on_launch(self, event):
task_info = event['task']
task_id = task_info['id']['value']
task_id = task_info['task_id']['value']
assert task_id not in self.tasks
self.tasks[task_id] = task_info
self.executor.launchTask(self, task_info)
Expand Down Expand Up @@ -191,7 +191,9 @@ def _send(self, body, path='/api/v1/executor', method='POST', headers={}):
raise

if resp.status < 200 or resp.status >= 300:
raise RuntimeError('Failed to send request %s' % (data,))
raise RuntimeError('Failed to send request code=%s, message=%s' % (
resp.status, resp.read()
))

result = resp.read()
if not result:
Expand All @@ -209,6 +211,9 @@ def sendStatusUpdate(self, status):
if 'uuid' not in status:
status['uuid'] = b2a_base64(uuid.uuid4().bytes)

if 'source' not in status:
status['source'] = 'SOURCE_EXECUTOR'

body = dict(
type='UPDATE',
executor_id=self.executor_id,
Expand Down
40 changes: 23 additions & 17 deletions pymesos/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,24 @@ def read(self):

captured = m.group(0)
length = int(captured.strip())
if len(self._response) >= len(captured) + length:
data = self._response[
len(captured):len(captured) + length]
self._response = self._response[
len(captured) + length:]
try:
event = json.loads(data.decode('utf-8'))
except Exception:
logger.exception('Failed parse json %s', data)
return False

try:
self._callback.on_event(event)
except Exception:
logger.exception('Failed to process event')
return False
if len(self._response) < len(captured) + length:
break

data = self._response[
len(captured):len(captured) + length]
self._response = self._response[
len(captured) + length:]
try:
event = json.loads(data.decode('utf-8'))
except Exception:
logger.exception('Failed parse json %s', data)
return False

try:
self._callback.on_event(event)
except Exception:
logger.exception('Failed to process event')
return False

if self._parser.is_message_complete():
return False
Expand Down Expand Up @@ -182,6 +184,10 @@ def stream_id(self, _stream_id):
with self._lock:
self._stream_id = _stream_id

@property
def connected(self):
return self.stream_id is not None

def gen_request(self):
raise NotImplementedError

Expand Down Expand Up @@ -309,7 +315,7 @@ def start(self):
self._io_thread.start()

def abort(self):
self.stop(failover=False)
self.stop()

def stop(self):
with self._lock:
Expand Down
30 changes: 11 additions & 19 deletions pymesos/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ def __init__(self, sched, framework, master_uri):
self._conn = None
self.version = None

@property
def connected(self):
return self.stream_id is not None

@property
def framework_id(self):
return self.framework.get('id')
Expand Down Expand Up @@ -192,6 +188,9 @@ def declineOffer(self, offer_ids, filters=None):
self._send(body)

def reviveOffers(self):
if not self.connected:
return

framework_id = self.framework_id
assert framework_id
body = dict(
Expand Down Expand Up @@ -220,11 +219,7 @@ def acknowledgeStatusUpdate(self, status):
framework_id = self.framework_id
assert framework_id
acknowledge = dict()
if 'agent_id' in status:
acknowledge['agent_id'] = status['agent_id']
else:
acknowledge['slave_id'] = status['slave_id']

acknowledge['agent_id'] = status['agent_id']
acknowledge['task_id'] = status['task_id']
acknowledge['uuid'] = status['uuid']
body = dict(
Expand Down Expand Up @@ -260,10 +255,7 @@ def sendFrameworkMessage(self, executor_id, agent_id, data):
data=a2b_base64(data),
)
version = map(int, version.split('.')[:2])
if version >= (1, 0):
message['agent_id'] = agent_id
else:
message['slave_id'] = agent_id
message['agent_id'] = agent_id

body = dict(
type='MESSAGE',
Expand Down Expand Up @@ -350,7 +342,7 @@ def on_subscribed(self, info):
def on_offers(self, event):
offers = event['offers']
for offer in offers:
agent_id = offer.get('agent_id', offer['slave_id'])['value']
agent_id = offer['agent_id']['value']
self.savedOffers[offer['id']['value']] = agent_id

self.sched.resourceOffers(self, offers)
Expand All @@ -367,12 +359,12 @@ def on_update(self, event):

def on_message(self, message):
executor_id = message['executor_id']
agent_id = message.get('agent_id', message['slave_id'])
agent_id = message['agent_id']
data = message['data']
self.sched.frameworkMessage(self, executor_id, agent_id, data)

def on_failure(self, failure):
agent_id = failure.get('agent_id', failure['slave_id'])
agent_id = failure['agent_id']
if 'executor_id' not in failure:
self.sched.slaveLost(self, agent_id)
else:
Expand All @@ -384,12 +376,12 @@ def on_error(self, event):
message = event['message']
self.sched.error(self, message)

def on_heartbeat(self, _):
pass

def on_event(self, event):
if 'type' in event:
_type = event['type'].lower()
if _type == 'heartbeat':
return

if _type not in event:
logger.error(
'Missing `%s` in event %s' %
Expand Down
5 changes: 2 additions & 3 deletions pymesos/subprocess/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ def abort(self):
Executor.abort(self)
self.cond.notify()

def reply_status(self, driver, proc_id, status, message='', data=tuple()):
def reply_status(self, driver, proc_id, state, message='', data=tuple()):
update = dict(
task_id=dict(value=str(proc_id)),
agent_id=self.agent_id,
slave_id=self.agent_id,
timestamp=time.time(),
status=status,
state=state,
)

if message:
Expand Down
50 changes: 24 additions & 26 deletions pymesos/subprocess/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self):
self.driver = MesosSchedulerDriver(self, self.framework, self.master)
self.procs_pending = {}
self.procs_launched = {}
self.slave_to_proc = {}
self.agent_to_proc = {}
self._lock = RLock()

def _init_framework(self):
Expand Down Expand Up @@ -96,10 +96,7 @@ def _init_task(self, proc, offer):
],
)

if 'agent_id' in offer:
task['agent_id'] = offer['agent_id']
else:
task['slave_id'] = offer['slave_id']
task['agent_id'] = offer['agent_id']

return task

Expand All @@ -123,9 +120,9 @@ def resourceOffers(self, driver, offers):
def get_resources(offer):
cpus, mem = 0.0, 0.0
for r in offer['resources']:
if r.name == 'cpus':
if r['name'] == 'cpus':
cpus = float(r['scalar']['value'])
elif r.name == 'mem':
elif r['name'] == 'mem':
mem = float(r['scalar']['value'])
return cpus, mem

Expand All @@ -136,7 +133,7 @@ def get_resources(offer):
logger.debug('Reject offers forever for no pending procs, '
'offers=%s' % (offers, ))
driver.declineOffer(
offer['id'], [], self._filters(FOREVER))
offer['id'], self._filters(FOREVER))
continue

cpus, mem = get_resources(offer)
Expand All @@ -154,7 +151,8 @@ def get_resources(offer):
logger.info('Accept offer for procs, offer=%s, '
'procs=%s, filter_time=%s' % (
offer,
[int(t.task_id.value) for t in tasks],
[int(t['task_id']['value'])
for t in tasks],
seconds))
driver.launchTasks(
offer['id'], tasks, self._filters(seconds))
Expand All @@ -164,14 +162,14 @@ def get_resources(offer):
offer, seconds))
driver.declineOffer(offer['id'], self._filters(seconds))

def _call_finished(self, proc_id, success, message, data, slave_id=None):
def _call_finished(self, proc_id, success, message, data, agent_id=None):
with self._lock:
proc = self.procs_launched.pop(proc_id)
if slave_id is not None:
if slave_id in self.slave_to_proc:
self.slave_to_proc[slave_id].remove(proc_id)
if agent_id is not None:
if agent_id in self.agent_to_proc:
self.agent_to_proc[agent_id].remove(proc_id)
else:
for slave_id, procs in self.slave_to_proc.iteritems():
for agent_id, procs in self.agent_to_proc.iteritems():
if proc_id in procs:
procs.remove(proc_id)

Expand All @@ -182,12 +180,12 @@ def statusUpdate(self, driver, update):
proc_id = int(update['task_id']['value'])
logger.info('Status update for proc, id=%s, state=%s' % (
proc_id, update['state']))
agent_id = update.get('agent_id', update['slave_id'])['value']
agent_id = update['agent_id']['value']
if update['state'] == 'TASK_RUNNING':
if agent_id in self.slave_to_proc:
self.slave_to_proc[agent_id].add(proc_id)
if agent_id in self.agent_to_proc:
self.agent_to_proc[agent_id].add(proc_id)
else:
self.slave_to_proc[agent_id] = set([proc_id])
self.agent_to_proc[agent_id] = set([proc_id])

proc = self.procs_launched[proc_id]
proc._started()
Expand All @@ -196,7 +194,7 @@ def statusUpdate(self, driver, update):
'TASK_STAGING', 'TASK_STARTING', 'TASK_RUNNING'
}:
success = (update['state'] == 'TASK_FINISHED')
message = update['message']
message = update.get('message')
data = update.get('data')
if data:
data = pickle.loads(a2b_base64(data))
Expand All @@ -213,9 +211,9 @@ def offerRescinded(self, driver, offer_id):
def slaveLost(self, driver, agent_id):
agent_id = agent_id['value']
with self._lock:
for proc_id in self.slave_to_proc.pop(agent_id, []):
for proc_id in self.agent_to_proc.pop(agent_id, []):
self._call_finished(
proc_id, False, 'Slave lost', None, agent_id)
proc_id, False, 'Agent lost', None, agent_id)

def error(self, driver, message):
with self._lock:
Expand Down Expand Up @@ -259,22 +257,22 @@ def cancel(self, proc):
del self.procs_launched[proc.id]
self.driver.killTask(dict(value=str(proc.id)))

for slave_id, procs in self.slave_to_proc.items():
for agent_id, procs in self.agent_to_proc.items():
procs.pop(proc.id)
if not procs:
del self.slave_to_proc[slave_id]
del self.agent_to_proc[agent_id]

def send_data(self, pid, type, data):
if self.driver.aborted:
raise RuntimeError('driver already aborted')

msg = b2a_base64(pickle.dumps((pid, type, data)))
for slave_id, procs in self.slave_to_proc.iteritems():
for agent_id, procs in self.agent_to_proc.iteritems():
if pid in procs:
self.driver.sendFrameworkMessage(
self.executor['executor_id'],
dict(value=slave_id),
dict(value=agent_id),
msg)
return

raise RuntimeError('Cannot find slave for pid %s' % (pid,))
raise RuntimeError('Cannot find agent for pid %s' % (pid,))

0 comments on commit 04501d1

Please sign in to comment.