Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Changes

* Added constants for session related flags under `ssh2.session`.
* Added `ssh2.session.Session.flag` function for enabling/disabling session flags like compression support.
* Added `ssh2.session.userauth_keyboardinteractive_callback` for authentication using Python callback function,
for example for Oauth and other two-factor (2FA) or more factor authentication. Thanks @MattCatz .


1.1.2
Expand Down
5 changes: 5 additions & 0 deletions ci/integration_tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from unittest.mock import MagicMock
import os
import socket
from unittest import skipUnless
Expand Down Expand Up @@ -335,3 +336,7 @@ def test_non_blocking_multi_chan(self):
self._wait_eagain(chan.wait_closed)
chan = self._wait_eagain(self.session.open_session)
self.assertIsInstance(chan, Channel)

def test_userauth_kb_with_callback(self):
my_cb = MagicMock()
self.assertRaises(AuthenticationError, self.session.userauth_keyboardinteractive_callback, self.user, my_cb)
61 changes: 61 additions & 0 deletions examples/keyboard_interactive_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/python

"""Example script for authentication with Python callback function using an OAuth token"""

from __future__ import print_function

import argparse
import socket
import os
import pwd
import functools


from ssh2.session import Session


USERNAME = pwd.getpwuid(os.geteuid()).pw_name

parser = argparse.ArgumentParser()

parser.add_argument('password', help="User password")
parser.add_argument('oauth', help="OAUTH key to use for authentication")
parser.add_argument('cmd', help="Command to run")
parser.add_argument('--host', dest='host',
default='localhost',
help='Host to connect to')
parser.add_argument('--port', dest='port', default=22, help="Port to connect on", type=int)
parser.add_argument('-u', dest='user', default=USERNAME, help="User name to authenticate as")


def oauth_handler(name, instruction, prompts, password, oauth):
responses = []

for prompt in prompts:
if "Password:" in prompt:
responses.append(password)
if "One-time password (OATH) for" in prompt:
responses.append(oauth)

return responses

def main():
args = parser.parse_args()

callback = functools.partial(oauth_handler,password=args.password,oauth=args.oauth)

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((args.host, args.port))
s = Session()
s.handshake(sock)
s.userauth_keyboardinteractive_callback(args.user, callback)
chan = s.open_session()
chan.execute(args.cmd)
size, data = chan.read()
while size > 0:
print(data)
size, data = chan.read()


if __name__ == "__main__":
main()
Loading