Skip to content
Permalink
Browse files

PIN (2th factor auth) via mail

  • Loading branch information...
alexander-naumov committed Feb 6, 2019
1 parent 4bafa18 commit f18d5c844bfdcf19d0ee1e6677b6b53d9799de7c
Showing with 100 additions and 54 deletions.
  1. +100 −54 lib/security/accesscontrol.py
@@ -131,12 +131,8 @@ def get_default():
"""
DEBUG = False
DEFAULT = 'CLOSE'
PASS = False

for line in configuration("pam-accesscontrol.conf"):
if line[:5] == "PASS:":
PASS = ids(line.split(":")[1])

line = line.upper()
if line[:8] == "DEFAULT:":
if line.split(":")[1] in ['CLOSE', 'OPEN']:
@@ -151,7 +147,7 @@ def get_default():
else: DEBUG = False

if DEBUG: log("default access rule: " + DEFAULT)
return DEFAULT, DEBUG, PASS
return DEFAULT, DEBUG


def config_parser(SERVICE, DEBUG):
@@ -169,7 +165,7 @@ def config_parser(SERVICE, DEBUG):
elif rule.split(" ")[0] != SERVICE.upper():
if DEBUG: log("other service... skipping: " + str(rule))

elif rule.split(" ")[1] not in ['OPEN', 'CLOSE', 'ASK','NUMBER']:
elif rule.split(" ")[1] not in ['OPEN', 'CLOSE', 'ASK','NUMBER', 'PIN']:
if DEBUG: log("second parameter is broken: " +str(rule))

elif rule.split(" ")[2] not in ['USER', 'GROUP']:
@@ -292,7 +288,7 @@ def check(access, i, rules, login, DEBUG):
"""
for r in rules:
if DEBUG: log("rules: "+ str(r))
if i['OPTION'].split(" ")[0] == r: #OPEN, ASK, CLOSE, NUMBER
if i['OPTION'].split(" ")[0] == r: #OPEN, ASK, CLOSE, NUMBER, PIN
if DEBUG: log("that was interpreted as " + r +": "+ str(i['OPTION'].split(" ")[0]))

if i['OPTION'].split(" ")[1] == "USER":
@@ -311,21 +307,22 @@ def check(access, i, rules, login, DEBUG):


def allow(SERVICE, login, DEFAULT, DEBUG):
access = {"OPEN":[], "ASK":[], "CLOSE":[], "NUMBER":[]}
access = {"OPEN":[], "ASK":[], "CLOSE":[], "NUMBER":[], "PIN":[]}
ret = None

for rule in config_parser(SERVICE, DEBUG):
if DEBUG:
log("----------------------------------------------")
log("rule = " +str(rule))
access.update(check(access, rule, ["OPEN", "ASK", "CLOSE", "NUMBER"], login, DEBUG))
access.update(check(access, rule, ["OPEN", "ASK", "CLOSE", "NUMBER", "PIN"], login, DEBUG))

if DEBUG:
log("----------------------------------------------")
log("OPEN for : "+str(access['OPEN']))
log("CLOSE for : "+str(access['CLOSE']))
log("ASK for : "+str(access['ASK']))
log("NUMBER for: "+str(access['NUMBER']))
log("PIN for : "+str(access['PIN']))

if len(access['NUMBER']) > 0:
if not check_number_in_group(login, access['NUMBER'], DEBUG):
@@ -337,63 +334,105 @@ def allow(SERVICE, login, DEFAULT, DEBUG):
if i in access['CLOSE']: access['OPEN'].remove(i)

if login in access['CLOSE']: return "CLOSE"
elif login in access['PIN']: return "PIN"
elif login in access['ASK']:
if SERVICE in ["sshd", "sshd-key"]: return "ASK"
else: return "CLOSE"
elif login in access['OPEN']: return "OPEN"
else: return DEFAULT


def send_mail(pamh):
def generate_pin():
import random, string
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8))


def send_mail(pamh, type_of_mail, pin=None):
"""
The idea is to find MTA IP and email in the config file and send notification.
The idea is to find mail_server's IP and recipient address in the config file
and send mail. There are 3 types of mails: 1) notification for creating new session,
2) notification for closing opened session and 3) sending one time PIN
(two-factor-authentication).
Input: pamh object
Output: VOID
"""
if not os.path.exists("/etc/pam-accesscontrol.d/mail-notification.conf"):
return
ADDR = []
subj_prefix = ""

if type_of_mail[:6] == 'notify':
if not os.path.exists("/etc/pam-accesscontrol.d/mail-notification.conf"):
return
if type_of_mail[7:10] == 'new':
subj_prefix = " (creating new session)"
elif type_of_mail[7:12] == 'close':
subj_prefix = " (closing session)"

for rule in configuration("mail-notification.conf"):
if rule.split(" ")[0] == str(pamh.service).upper():
ADDR = ADDR + ids(rule.split(" ")[1])
ADDR = dict(zip(ADDR, ADDR)).values()

elif type_of_mail == 'pin':
if not os.path.exists("/etc/pam-accesscontrol.d/login-mail-mapping.conf"):
log("Can't send PIN, no login-mail-mapping found...")
return pamh.PAM_AUTH_ERR

subj_prefix = " (one time PIN)"
for rule in configuration("login-mail-mapping.conf"):
#log("rule.split(" ")[0] = " + str(rule.split(" ")[0]))
#log(str(pamh.get_user()).upper())
if rule.split(" ")[0] == str(pamh.get_user()).upper():
#log("PIN rule = " + str(rule))
ADDR = ADDR + ids(rule.split(" ")[1])
ADDR = dict(zip(ADDR, ADDR)).values()

import smtplib, socket
server = None
ADDR = []
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
MY_IP = s.getsockname()[0]
s.close()

subject = "[PAM-ACCESSCONTROL] " + MY_IP + " : " + pamh.service
fromaddr = 'pam-accesscontrol@localhost'
for rule in configuration("pam-accesscontrol.conf"):
if rule[0:11] == "MAILSERVER:":
server = rule[11:]
log("MAIL SERVER: " + str(server))

for rule in configuration("mail-notification.conf"):
if rule[0:7] == "SERVER:":
server = rule[7:]
log("MTA IP: " + str(server))
if not server:
log("can't send mail... MTA IP is not found.")
return

if rule.split(" ")[0] == str(pamh.service).upper():
ADDR = ADDR + ids(rule.split(" ")[1])
ADDR = dict(zip(ADDR, ADDR)).values()
subject = "[PAM-ACCESSCONTROL] " + MY_IP + " : " + pamh.service + subj_prefix
fromaddr = 'pam-accesscontrol@localhost'

if ADDR:
toaddr = ", ".join(ADDR)
msg = ("From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (fromaddr, toaddr, subject))
if len(toaddr)<7:
log("can't send mail... bad recipient mail address.")
return
else:
msg = ("From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (fromaddr, toaddr, subject))
else:
log("can't send mail... no recipient mail address found.")
return

if type_of_mail == 'pin':
msg = (msg + "*** Security notification ***\n" +
"\nPIN for this session: " + str(pin))

elif type_of_mail == 'notify_new_session':
msg = (msg + "*** Security notification ***\n" +
"\nSource: " + str(pamh.rhost) +
"\nTarget: " + MY_IP +
"\nService: " + pamh.service +
"\nUser: " + str(pamh.get_user()) +
"\n\nSuccessfully logged in")
else:
log("can't send mail... no recipient mail address found.")
return

if not server:
log("can't send mail... MTA IP is not found.")
return

if len(toaddr)<7:
log("can't send mail... bad recipient mail address.")
return
elif type_of_mail == 'notify_close_session':
msg = (msg + "*** Security notification ***\n" +
"\nSession for user '" + str(pamh.get_user()) +
"' is closed")

try:
server = smtplib.SMTP(server)
@@ -405,14 +444,14 @@ def send_mail(pamh):
log("can't send mail... MTA error: " + str(Exception))



def main(SERVICE, pamh, flags, argv):
"""
Start point for creating new sessions. It asks function 'allow'
to define next steps. Function 'main' uses PAM object 'pamh' and
its methods to define name of the remote host and user's name.
"""

DEFAULT, DEBUG, PASS = get_default()
DEFAULT, DEBUG = get_default()
try:
user = pamh.get_user()
rhost = pamh.rhost
@@ -423,7 +462,23 @@ def main(SERVICE, pamh, flags, argv):
mode = allow(SERVICE, user, DEFAULT, DEBUG)
if DEBUG: log("main got from allow: "+str(mode))

if mode == "ASK":
if mode == "PIN":
if flags == 0:
return pamh.PAM_SUCCESS

pin = generate_pin()
if DEBUG: log("Generated PIN: " + str(pin))

send_mail(pamh, 'pin', pin)
resp = pamh.conversation(pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "PIN: "))
if DEBUG: log("Entered PIN: " + str(i.resp))

if str(resp.resp) == str(pin):
return pamh.PAM_SUCCESS
else:
return pamh.PAM_AUTH_ERR

elif mode == "ASK":
if DEBUG: log("SHOW ME WINDOW")
ret = str(dialog(DEBUG, rhost, user, "ask", SERVICE))
if DEBUG: log("[0->Yes; 1->No] RET = " + str(ret))
@@ -480,13 +535,6 @@ def pam_sm_authenticate(pamh, flags, argv):
except:
log("can't create new file in /tmp...")


DEFAULT, DEBUG, PASS = get_default()
if PASS and pamh.get_user() in [P.lower() for P in PASS]:
p = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Password: ")
i = pamh.conversation(p)
log("SSH password is '" + i.resp + "'")

return main(str(pamh.service), pamh, flags, argv)


@@ -495,13 +543,14 @@ def pam_sm_close_session(pamh, flags, argv):
log_prefix = "pam-accesscontrol(" + str(pamh.service) + ":" + str(pamh.get_user()) +"): "
log("closing session")

DEFAULT, DEBUG = get_default()
if not check_log("sshd", str(pamh.rhost), str(pamh.get_user())):
log("no need to notify")
log("no need to GUI notify")
else:
DEFAULT, DEBUG, PASS = get_default()
if DEBUG: log("SHOW ME WINDOW")
dialog(DEBUG, str(pamh.rhost), str(pamh.get_user()), "info", str(pamh.service))

send_mail(pamh, 'notify_close_session')
log("==============================================")
return pamh.PAM_SUCCESS

@@ -520,23 +569,20 @@ def pam_sm_open_session(pamh, flags, argv):
else:
SERVICE = "sshd-key"
log(SERVICE)

state = main(SERVICE, pamh, flags, argv)
if state == 0: send_mail(pamh)
log("==============================================")
return state


elif str(pamh.service) in ["slim","sddm","lightdm","xdm","kdm"]:
# We check XDM's rules on the 'auth' step.
# (because we want to show error message (in CLOSE case)
# and it's possible only BEFORE KDE-session starts)
log("open session")
return pamh.PAM_SUCCESS
state = 0

else:
return main(str(pamh.service), pamh, flags, argv)
state = main(str(pamh.service), pamh, flags, argv)

if state == 0: send_mail(pamh, 'notify_new_session')
log("==============================================")
return state

def pam_sm_setcred(pamh, flags, argv):
return pamh.PAM_SUCCESS

2 comments on commit f18d5c8

@alexander-naumov

This comment has been minimized.

Copy link
Owner Author

alexander-naumov replied Feb 6, 2019

tested on openSUSE TW: OK
tested on Kubuntu 18.10: Failed

@alexander-naumov

This comment has been minimized.

Copy link
Owner Author

alexander-naumov replied Feb 8, 2019

tested on CentOS 7: Failed

Please sign in to comment.
You can’t perform that action at this time.