Permalink
| #!/usr/bin/env python | |
| # Copyright (c) 2003-2016 CORE Security Technologies | |
| # | |
| # This software is provided under under a slightly modified version | |
| # of the Apache Software License. See the accompanying LICENSE file | |
| # for more information. | |
| # | |
| # Description: Mini shell using some of the SMB funcionality of the library | |
| # | |
| # Author: | |
| # Alberto Solino (@agsolino) | |
| # | |
| # | |
| # Reference for: | |
| # SMB DCE/RPC | |
| # | |
| import sys | |
| import time | |
| import logging | |
| import argparse | |
| import cmd | |
| import os | |
| from impacket.examples import logger | |
| from impacket import version | |
| from impacket.dcerpc.v5 import samr, transport, srvs | |
| from impacket.dcerpc.v5.dtypes import NULL | |
| from impacket.smbconnection import * | |
| # If you wanna have readline like functionality in Windows, install pyreadline | |
| try: | |
| import pyreadline as readline | |
| except ImportError: | |
| import readline | |
| class MiniImpacketShell(cmd.Cmd): | |
| def __init__(self, smbClient,tcpShell=None): | |
| #If the tcpShell parameter is passed (used in ntlmrelayx), | |
| # all input and output is redirected to a tcp socket | |
| # instead of to stdin / stdout | |
| if tcpShell is not None: | |
| cmd.Cmd.__init__(self,stdin=tcpShell,stdout=tcpShell) | |
| sys.stdout = tcpShell | |
| sys.stdin = tcpShell | |
| sys.stderr = tcpShell | |
| self.use_rawinput = False | |
| self.shell = tcpShell | |
| else: | |
| cmd.Cmd.__init__(self) | |
| self.shell = None | |
| self.prompt = '# ' | |
| self.smb = smbClient | |
| self.username, self.password, self.domain, self.lmhash, self.nthash, self.aesKey, self.TGT, self.TGS = smbClient.getCredentials() | |
| self.tid = None | |
| self.intro = 'Type help for list of commands' | |
| self.pwd = '' | |
| self.share = None | |
| self.loggedIn = True | |
| self.last_output = None | |
| self.completion = [] | |
| def emptyline(self): | |
| pass | |
| def precmd(self,line): | |
| # switch to unicode | |
| return line.decode('utf-8') | |
| def onecmd(self,s): | |
| retVal = False | |
| try: | |
| retVal = cmd.Cmd.onecmd(self,s) | |
| except Exception, e: | |
| #import traceback | |
| #print traceback.print_exc() | |
| logging.error(e) | |
| return retVal | |
| def do_exit(self,line): | |
| if self.shell is not None: | |
| self.shell.close() | |
| return True | |
| def do_shell(self, line): | |
| output = os.popen(line).read() | |
| print output | |
| self.last_output = output | |
| def do_help(self,line): | |
| print """ | |
| open {host,port=445} - opens a SMB connection against the target host/port | |
| login {domain/username,passwd} - logs into the current SMB connection, no parameters for NULL connection. If no password specified, it'll be prompted | |
| kerberos_login {domain/username,passwd} - logs into the current SMB connection using Kerberos. If no password specified, it'll be prompted. Use the DNS resolvable domain name | |
| login_hash {domain/username,lmhash:nthash} - logs into the current SMB connection using the password hashes | |
| logoff - logs off | |
| shares - list available shares | |
| use {sharename} - connect to an specific share | |
| cd {path} - changes the current directory to {path} | |
| lcd {path} - changes the current local directory to {path} | |
| pwd - shows current remote directory | |
| password - changes the user password, the new password will be prompted for input | |
| ls {wildcard} - lists all the files in the current directory | |
| rm {file} - removes the selected file | |
| mkdir {dirname} - creates the directory under the current path | |
| rmdir {dirname} - removes the directory under the current path | |
| put {filename} - uploads the filename into the current path | |
| get {filename} - downloads the filename from the current path | |
| info - returns NetrServerInfo main results | |
| who - returns the sessions currently connected at the target host (admin required) | |
| close - closes the current SMB Session | |
| exit - terminates the server process (and this session) | |
| """ | |
| def do_password(self, line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| from getpass import getpass | |
| newPassword = getpass("New Password:") | |
| rpctransport = transport.SMBTransport(self.smb.getRemoteHost(), filename = r'\samr', smb_connection = self.smb) | |
| dce = rpctransport.get_dce_rpc() | |
| dce.connect() | |
| dce.bind(samr.MSRPC_UUID_SAMR) | |
| samr.hSamrUnicodeChangePasswordUser2(dce, '\x00', self.username, self.password, newPassword, self.lmhash, self.nthash) | |
| self.password = newPassword | |
| self.lmhash = None | |
| self.nthash = None | |
| def do_open(self,line): | |
| l = line.split(' ') | |
| port = 445 | |
| if len(l) > 0: | |
| host = l[0] | |
| if len(l) > 1: | |
| port = int(l[1]) | |
| if port == 139: | |
| self.smb = SMBConnection('*SMBSERVER', host, sess_port=port) | |
| else: | |
| self.smb = SMBConnection(host, host, sess_port=port) | |
| dialect = self.smb.getDialect() | |
| if dialect == SMB_DIALECT: | |
| logging.info("SMBv1 dialect used") | |
| elif dialect == SMB2_DIALECT_002: | |
| logging.info("SMBv2.0 dialect used") | |
| elif dialect == SMB2_DIALECT_21: | |
| logging.info("SMBv2.1 dialect used") | |
| else: | |
| logging.info("SMBv3.0 dialect used") | |
| self.share = None | |
| self.tid = None | |
| self.pwd = '' | |
| self.loggedIn = False | |
| self.password = None | |
| self.lmhash = None | |
| self.nthash = None | |
| self.username = None | |
| def do_login(self,line): | |
| if self.smb is None: | |
| logging.error("No connection open") | |
| return | |
| l = line.split(' ') | |
| username = '' | |
| password = '' | |
| domain = '' | |
| if len(l) > 0: | |
| username = l[0] | |
| if len(l) > 1: | |
| password = l[1] | |
| if username.find('/') > 0: | |
| domain, username = username.split('/') | |
| if password == '' and username != '': | |
| from getpass import getpass | |
| password = getpass("Password:") | |
| self.smb.login(username, password, domain=domain) | |
| self.password = password | |
| self.username = username | |
| if self.smb.isGuestSession() > 0: | |
| logging.info("GUEST Session Granted") | |
| else: | |
| logging.info("USER Session Granted") | |
| self.loggedIn = True | |
| def do_kerberos_login(self,line): | |
| if self.smb is None: | |
| logging.error("No connection open") | |
| return | |
| l = line.split(' ') | |
| username = '' | |
| password = '' | |
| domain = '' | |
| if len(l) > 0: | |
| username = l[0] | |
| if len(l) > 1: | |
| password = l[1] | |
| if username.find('/') > 0: | |
| domain, username = username.split('/') | |
| if domain == '': | |
| logging.error("Domain must be specified for Kerberos login") | |
| return | |
| if password == '' and username != '': | |
| from getpass import getpass | |
| password = getpass("Password:") | |
| self.smb.kerberosLogin(username, password, domain=domain) | |
| self.password = password | |
| self.username = username | |
| if self.smb.isGuestSession() > 0: | |
| logging.info("GUEST Session Granted") | |
| else: | |
| logging.info("USER Session Granted") | |
| self.loggedIn = True | |
| def do_login_hash(self,line): | |
| if self.smb is None: | |
| logging.error("No connection open") | |
| return | |
| l = line.split(' ') | |
| domain = '' | |
| if len(l) > 0: | |
| username = l[0] | |
| if len(l) > 1: | |
| hashes = l[1] | |
| else: | |
| logging.error("Hashes needed. Format is lmhash:nthash") | |
| return | |
| if username.find('/') > 0: | |
| domain, username = username.split('/') | |
| lmhash, nthash = hashes.split(':') | |
| self.smb.login(username, '', domain,lmhash=lmhash, nthash=nthash) | |
| self.username = username | |
| self.lmhash = lmhash | |
| self.nthash = nthash | |
| if self.smb.isGuestSession() > 0: | |
| logging.info("GUEST Session Granted") | |
| else: | |
| logging.info("USER Session Granted") | |
| self.loggedIn = True | |
| def do_logoff(self, line): | |
| if self.smb is None: | |
| logging.error("No connection open") | |
| return | |
| self.smb.logoff() | |
| del self.smb | |
| self.share = None | |
| self.smb = None | |
| self.tid = None | |
| self.pwd = '' | |
| self.loggedIn = False | |
| self.password = None | |
| self.lmhash = None | |
| self.nthash = None | |
| self.username = None | |
| def do_info(self, line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| rpctransport = transport.SMBTransport(self.smb.getRemoteHost(), filename = r'\srvsvc', smb_connection = self.smb) | |
| dce = rpctransport.get_dce_rpc() | |
| dce.connect() | |
| dce.bind(srvs.MSRPC_UUID_SRVS) | |
| resp = srvs.hNetrServerGetInfo(dce, 102) | |
| print "Version Major: %d" % resp['InfoStruct']['ServerInfo102']['sv102_version_major'] | |
| print "Version Minor: %d" % resp['InfoStruct']['ServerInfo102']['sv102_version_minor'] | |
| print "Server Name: %s" % resp['InfoStruct']['ServerInfo102']['sv102_name'] | |
| print "Server Comment: %s" % resp['InfoStruct']['ServerInfo102']['sv102_comment'] | |
| print "Server UserPath: %s" % resp['InfoStruct']['ServerInfo102']['sv102_userpath'] | |
| print "Simultaneous Users: %d" % resp['InfoStruct']['ServerInfo102']['sv102_users'] | |
| def do_who(self, line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| rpctransport = transport.SMBTransport(self.smb.getRemoteHost(), filename = r'\srvsvc', smb_connection = self.smb) | |
| dce = rpctransport.get_dce_rpc() | |
| dce.connect() | |
| dce.bind(srvs.MSRPC_UUID_SRVS) | |
| resp = srvs.hNetrSessionEnum(dce, NULL, NULL, 10) | |
| for session in resp['InfoStruct']['SessionInfo']['Level10']['Buffer']: | |
| print "host: %15s, user: %5s, active: %5d, idle: %5d" % ( | |
| session['sesi10_cname'][:-1], session['sesi10_username'][:-1], session['sesi10_time'], | |
| session['sesi10_idle_time']) | |
| def do_shares(self, line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| resp = self.smb.listShares() | |
| for i in range(len(resp)): | |
| print resp[i]['shi1_netname'][:-1] | |
| def do_use(self,line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| self.share = line | |
| self.tid = self.smb.connectTree(line) | |
| self.pwd = '\\' | |
| self.do_ls('', False) | |
| def complete_cd(self, text, line, begidx, endidx): | |
| return self.complete_get(text, line, begidx, endidx, include = 2) | |
| def do_cd(self, line): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| p = string.replace(line,'/','\\') | |
| oldpwd = self.pwd | |
| if p[0] == '\\': | |
| self.pwd = line | |
| else: | |
| self.pwd = ntpath.join(self.pwd, line) | |
| self.pwd = ntpath.normpath(self.pwd) | |
| # Let's try to open the directory to see if it's valid | |
| try: | |
| fid = self.smb.openFile(self.tid, self.pwd, creationOption = FILE_DIRECTORY_FILE \ | |
| , desiredAccess = FILE_READ_DATA | FILE_LIST_DIRECTORY \ | |
| , shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE \ | |
| ) | |
| self.smb.closeFile(self.tid,fid) | |
| except SessionError: | |
| self.pwd = oldpwd | |
| raise | |
| def do_lcd(self, s): | |
| print s | |
| if s == '': | |
| print os.getcwd() | |
| else: | |
| os.chdir(s) | |
| def do_pwd(self,line): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| print self.pwd | |
| def do_ls(self, wildcard, display = True): | |
| if self.loggedIn is False: | |
| logging.error("Not logged in") | |
| return | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| if wildcard == '': | |
| pwd = ntpath.join(self.pwd,'*') | |
| else: | |
| pwd = ntpath.join(self.pwd, wildcard) | |
| self.completion = [] | |
| pwd = string.replace(pwd,'/','\\') | |
| pwd = ntpath.normpath(pwd) | |
| for f in self.smb.listPath(self.share, pwd): | |
| if display is True: | |
| print "%crw-rw-rw- %10d %s %s" % ( | |
| 'd' if f.is_directory() > 0 else '-', f.get_filesize(), time.ctime(float(f.get_mtime_epoch())), | |
| f.get_longname()) | |
| self.completion.append((f.get_longname(), f.is_directory())) | |
| def do_rm(self, filename): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| f = ntpath.join(self.pwd, filename) | |
| file = string.replace(f,'/','\\') | |
| self.smb.deleteFile(self.share, file) | |
| def do_mkdir(self, path): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| p = ntpath.join(self.pwd, path) | |
| pathname = string.replace(p,'/','\\') | |
| self.smb.createDirectory(self.share,pathname) | |
| def do_rmdir(self, path): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| p = ntpath.join(self.pwd, path) | |
| pathname = string.replace(p,'/','\\') | |
| self.smb.deleteDirectory(self.share, pathname) | |
| def do_put(self, pathname): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| src_path = pathname | |
| dst_name = os.path.basename(src_path) | |
| fh = open(pathname, 'rb') | |
| f = ntpath.join(self.pwd,dst_name) | |
| finalpath = string.replace(f,'/','\\') | |
| self.smb.putFile(self.share, finalpath, fh.read) | |
| fh.close() | |
| def complete_get(self, text, line, begidx, endidx, include = 1): | |
| # include means | |
| # 1 just files | |
| # 2 just directories | |
| p = string.replace(line,'/','\\') | |
| if p.find('\\') < 0: | |
| items = [] | |
| if include == 1: | |
| mask = 0 | |
| else: | |
| mask = 0x010 | |
| for i in self.completion: | |
| if i[1] == mask: | |
| items.append(i[0]) | |
| if text: | |
| return [ | |
| item for item in items | |
| if item.upper().startswith(text.upper()) | |
| ] | |
| else: | |
| return items | |
| def do_get(self, filename): | |
| if self.tid is None: | |
| logging.error("No share selected") | |
| return | |
| filename = string.replace(filename,'/','\\') | |
| fh = open(ntpath.basename(filename),'wb') | |
| pathname = ntpath.join(self.pwd,filename) | |
| try: | |
| self.smb.getFile(self.share, pathname, fh.write) | |
| except: | |
| fh.close() | |
| os.remove(filename) | |
| raise | |
| fh.close() | |
| def do_close(self, line): | |
| self.do_logoff(line) | |
| def main(): | |
| # Init the example's logger theme | |
| logger.init() | |
| print version.BANNER | |
| parser = argparse.ArgumentParser(add_help = True, description = "SMB client implementation.") | |
| parser.add_argument('target', action='store', help='[[domain/]username[:password]@]<targetName or address>') | |
| parser.add_argument('-file', type=argparse.FileType('r'), help='input file with commands to execute in the mini shell') | |
| parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') | |
| group = parser.add_argument_group('authentication') | |
| group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') | |
| group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') | |
| group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file ' | |
| '(KRB5CCNAME) based on target parameters. If valid credentials ' | |
| 'cannot be found, it will use the ones specified in the command ' | |
| 'line') | |
| group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication ' | |
| '(128 or 256 bits)') | |
| group = parser.add_argument_group('connection') | |
| group.add_argument('-dc-ip', action='store', metavar="ip address", | |
| help='IP Address of the domain controller. If ommited it use the domain part (FQDN) specified in ' | |
| 'the target parameter') | |
| group.add_argument('-target-ip', action='store', metavar="ip address", | |
| help='IP Address of the target machine. If ommited it will use whatever was specified as target. ' | |
| 'This is useful when target is the NetBIOS name and you cannot resolve it') | |
| group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", | |
| help='Destination port to connect to SMB Server') | |
| if len(sys.argv)==1: | |
| parser.print_help() | |
| sys.exit(1) | |
| options = parser.parse_args() | |
| if options.debug is True: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| else: | |
| logging.getLogger().setLevel(logging.INFO) | |
| import re | |
| domain, username, password, address = re.compile('(?:(?:([^/@:]*)/)?([^@:]*)(?::([^@]*))?@)?(.*)').match( | |
| options.target).groups('') | |
| #In case the password contains '@' | |
| if '@' in address: | |
| password = password + '@' + address.rpartition('@')[0] | |
| address = address.rpartition('@')[2] | |
| if options.target_ip is None: | |
| options.target_ip = address | |
| if domain is None: | |
| domain = '' | |
| if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None: | |
| from getpass import getpass | |
| password = getpass("Password:") | |
| if options.aesKey is not None: | |
| options.k = True | |
| if options.hashes is not None: | |
| lmhash, nthash = options.hashes.split(':') | |
| else: | |
| lmhash = '' | |
| nthash = '' | |
| try: | |
| smbClient = SMBConnection(address, options.target_ip, sess_port=int(options.port)) | |
| if options.k is True: | |
| smbClient.kerberosLogin(username, password, domain, lmhash, nthash, options.aesKey, options.dc_ip ) | |
| else: | |
| smbClient.login(username, password, domain, lmhash, nthash) | |
| shell = MiniImpacketShell(smbClient) | |
| if options.file is not None: | |
| logging.info("Executing commands from %s" % options.file.name) | |
| for line in options.file.readlines(): | |
| if line[0] != '#': | |
| print "# %s" % line, | |
| shell.onecmd(line) | |
| else: | |
| print line, | |
| else: | |
| shell.cmdloop() | |
| except Exception, e: | |
| #import traceback | |
| #print traceback.print_exc() | |
| logging.error(str(e)) | |
| if __name__ == "__main__": | |
| main() | |