Skip to content

Commit

Permalink
Clean up session when session.start() throws exception
Browse files Browse the repository at this point in the history
**Description**
When session start() throws exception, session delete() is invoked when
session status is not in NOT_STARTED_SESSION_STATUS to clean up resource.
This makes it safe to add the session to session manager only when session
is started properly.

**Testing Done**
Added unit test cases.
Also tested in a notebook.
  • Loading branch information
edwardps committed Jun 7, 2021
1 parent 1b2288b commit b6c89eb
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 7 deletions.
3 changes: 3 additions & 0 deletions sparkmagic/sparkmagic/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ def endpoint(self):
def is_final_status(status):
return status in constants.FINAL_STATUS

def is_posted(self):
return self.status != constants.NOT_STARTED_SESSION_STATUS

def delete(self):
session_id = self.id
self._spark_events.emit_session_deletion_start_event(self.guid, self.kind, session_id, self.status)
Expand Down
12 changes: 10 additions & 2 deletions sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,16 @@ def add_session(self, name, endpoint, skip_if_exists, properties):
return
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, properties, self.ipython_display)
session.start()
self.session_manager.add_session(name, session)

try:
session.start()
except:
if session.is_posted():
session.delete()
raise
else:
self.session_manager.add_session(name, session)


def get_session_id_for_client(self, name):
return self.session_manager.get_session_id_for_client(name)
Expand Down
10 changes: 10 additions & 0 deletions sparkmagic/sparkmagic/tests/test_livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,13 @@ def test_spark_session_and_sql_context_unavailable(self):
self.http_client.get_statement.return_value = self.ready_statement_failed_json
session = self._create_session()
session.start()

def test_is_posted(self):
self.http_client.post_session.return_value = self.session_create_json
self.http_client.get_session.return_value = self.ready_sessions_json
self.http_client.get_statement.return_value = self.ready_statement_json

session = self._create_session()
assert not session.is_posted()
session.start()
assert session.is_posted()
73 changes: 68 additions & 5 deletions sparkmagic/sparkmagic/tests/test_sparkcontroller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from mock import MagicMock, patch
from nose.tools import with_setup, assert_equals, raises
import json

from sparkmagic.livyclientlib.sparkcontroller import SparkController
from mock import MagicMock
from nose.tools import with_setup, assert_equals, raises
from sparkmagic.livyclientlib.endpoint import Endpoint
from sparkmagic.livyclientlib.exceptions import SessionManagementException, HttpClientException
import sparkmagic.utils.configuration as conf

from sparkmagic.livyclientlib.sparkcontroller import SparkController

client_manager = None
controller = None
Expand Down Expand Up @@ -260,3 +258,68 @@ def test_add_session_throws_when_session_start_fails():
assert str(ex) == str(e)
session.start.assert_called_once()
controller.session_manager.add_session.assert_not_called

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_posted_to_livy():
pass

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_posted_to_livy():
_do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy=True)

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_not_posted_to_livy():
_do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy=False)

def _do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy):
name = "name"
properties = {"kind": "spark"}
endpoint = Endpoint("http://location:port", None)
session = MagicMock()

controller._livy_session = MagicMock(return_value=session)
controller._http_client = MagicMock(return_value=MagicMock())
e = RuntimeError("Time out while post session to livy")
session.start = MagicMock(side_effect=e)

if is_session_posted_to_livy:
session.is_posted = MagicMock(return_value=True)
else:
session.is_posted = MagicMock(return_value=False)

try:
controller.add_session(name, endpoint, False, properties)
assert False
except RuntimeError as ex:
assert str(ex) == str(e)
session.start.assert_called_once()
controller.session_manager.add_session.assert_not_called

if is_session_posted_to_livy:
session.delete.assert_called_once()
else:
session.delete.assert_not_called()

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_session_delete_throws():
name = "name"
properties = {"kind": "spark"}
endpoint = Endpoint("http://location:port", None)
session = MagicMock()

controller._livy_session = MagicMock(return_value=session)
controller._http_client = MagicMock(return_value=MagicMock())
e = RuntimeError("Time out while post session to livy")
session.start = MagicMock(side_effect=e)
session.is_posted = MagicMock(return_value=True)

error_from_cleanup = RuntimeError("Error while clean up session")
session.delete = MagicMock(side_effect=error_from_cleanup)
try:
controller.add_session(name, endpoint, False, properties)
assert False
except Exception as ex:
# in the exception chain mechanism, original exception will be set as context.
assert str(ex) == str(error_from_cleanup)
assert str(ex.__context__) == str(e)
controller.session_manager.add_session.assert_not_called

0 comments on commit b6c89eb

Please sign in to comment.