From b7318d2ccd9ceab4d294c48a3a7cf2d67c2c7c59 Mon Sep 17 00:00:00 2001 From: Alan F Date: Fri, 4 Jun 2021 00:53:50 +0100 Subject: [PATCH] tests for session locking option --- lib/tlitest/config.py | 7 +++-- lib/tlitest/misc.py | 10 ++++--- lib/tlitest/test_tlog_rec_session.py | 39 ++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/lib/tlitest/config.py b/lib/tlitest/config.py index 5176cbbe..91a312d7 100644 --- a/lib/tlitest/config.py +++ b/lib/tlitest/config.py @@ -19,7 +19,7 @@ DEFAULT_TLOG_REC_SESSION_SHELL = "/bin/bash" DEFAULT_TLOG_REC_SESSION_NOTICE = "ATTENTION: Your session is being recorded!" DEFAULT_TLOG_REC_SESSION_WRITER = "journal" - +DEFAULT_TLOG_REC_SESSION_SESSION_LOCKING = True DEFAULT_TLOG_PLAY_READER = "file" DEFAULT_TLOG_PLAY_PERSIST = False DEFAULT_TLOG_PLAY_LAX = False @@ -198,9 +198,11 @@ def __init__(self, shell=DEFAULT_TLOG_REC_SESSION_SHELL, journal_priority=DEFAULT_TLOG_REC_JOURNAL_PRIORITY, journal_augment=DEFAULT_TLOG_REC_JOURNAL_AUGMENT, syslog_facility=DEFAULT_TLOG_REC_SYSLOG_FACILITY, - syslog_priority=DEFAULT_TLOG_REC_SYSLOG_PRIORITY): + syslog_priority=DEFAULT_TLOG_REC_SYSLOG_PRIORITY, + session_locking=DEFAULT_TLOG_REC_SESSION_SESSION_LOCKING): self.shell = shell self.notice = notice + self.session_locking = session_locking super().__init__(latency, payload, log_input, log_output, log_window, limit_rate, limit_burst, limit_action, writer, file_writer_path, journal_priority, @@ -212,6 +214,7 @@ def _setup_base_session_config(self): tlog_rec_session_config = { "shell": self.shell, "notice": self.notice, + "session_locking": self.session_locking } return tlog_rec_session_config diff --git a/lib/tlitest/misc.py b/lib/tlitest/misc.py index aa5b1a9b..c93a3453 100644 --- a/lib/tlitest/misc.py +++ b/lib/tlitest/misc.py @@ -99,12 +99,16 @@ def check_recording_missing(shell, pattern, filename=None): assert out == 0 -def ssh_pexpect(username, password, hostname, encoding='utf-8'): +def ssh_pexpect(username, password, hostname, encoding='utf-8', control_persist=False): """ Setup an ssh connection to remote host """ - ssh = pxssh(options={ + opts = { "StrictHostKeyChecking": "no", "UserKnownHostsFile": "/dev/null" - }, encoding=encoding, codec_errors='replace') + } + if control_persist: + opts['ControlMaster'] = 'auto', + opts['ControlPath'] = "~/.ssh/control-%h-%p-%r" + ssh = pxssh(options=opts, encoding=encoding, codec_errors='replace') ssh.force_password = True ssh.login(hostname, username, password) ssh.sendline('echo loggedin') diff --git a/lib/tlitest/test_tlog_rec_session.py b/lib/tlitest/test_tlog_rec_session.py index eba34d83..413f7532 100644 --- a/lib/tlitest/test_tlog_rec_session.py +++ b/lib/tlitest/test_tlog_rec_session.py @@ -59,6 +59,45 @@ def test_session_record_to_journal(self): check_recording(shell, myname) shell.close() + @pytest.mark.tier1 + def test_session_record_to_journal_locking_enabled(self): + """ + Check tlog-rec-session preserves session in journal + """ + myname = inspect.stack()[0][3] + logfile = mklogfile(self.tempdir) + sessionclass = TlogRecSessionConfig(writer="file", file_writer_path=logfile, session_locking=False) + sessionclass.generate_config(SYSTEM_TLOG_REC_SESSION_CONF) + shell = ssh_pexpect(self.user, 'Secret123', 'localhost', control_persist=True) + shell.sendline('echo {}_shell0'.format(myname)) + shell.sendline('exit') + check_recording(shell, "{}_shell0".format(myname, logfile)) + shell.close() + shell = ssh_pexpect(self.user, 'Secret123', 'localhost', control_persist=True) + shell.sendline('echo {}_shell1'.format(myname)) + shell.sendline('exit') + check_recording_missing(shell, "{}_shell1".format(myname), logfile) + shell.close() + + @pytest.mark.tier1 + def test_session_record_to_journal_locking_disabled(self): + """ + Check tlog-rec-session preserves session in journal + """ + myname = inspect.stack()[0][3] + sessionclass = TlogRecSessionConfig(writer="journal") + sessionclass.generate_config(SYSTEM_TLOG_REC_SESSION_CONF) + shell = ssh_pexpect(self.user, 'Secret123', 'localhost', control_persist=True) + shell.sendline('echo {}_shell0'.format(myname)) + shell.sendline('exit') + check_recording(shell, "{}_shell0".format(myname)) + shell.close() + shell = ssh_pexpect(self.user, 'Secret123', 'localhost', control_persist=True) + shell.sendline('echo {}_shell1'.format(myname)) + shell.sendline('exit') + check_recording(shell, "{}_shell1".format(myname)) + shell.close() + @pytest.mark.tier1 def test_session_record_to_syslog(self): """