Permalink
Fetching contributors…
Cannot retrieve contributors at this time
386 lines (330 sloc) 13 KB
#!/usr/bin/python
import logging
import subprocess
import os
import urllib
import sys
import datetime
import syslog
import platform
import json
from distutils.version import LooseVersion
import FoundationPlist
from Foundation import NSDate
from Foundation import CFPreferencesAppSynchronize
from Foundation import CFPreferencesCopyAppValue
from Foundation import CFPreferencesSetValue
from Foundation import kCFPreferencesAnyUser
from Foundation import kCFPreferencesCurrentHost
from SystemConfiguration import SCDynamicStoreCopyConsoleUser
BUNDLE_ID = 'com.grahamgilbert.crypt'
LOG_FILE = '/var/log/crypt.log'
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %I:%M:%S %p',
level=logging.DEBUG,
filename=LOG_FILE)
stdout_logging = logging.StreamHandler()
stdout_logging.setFormatter(logging.Formatter())
logging.getLogger().addHandler(stdout_logging)
def get_console_user():
"""returns the current console user via PyObjc"""
cfuser = SCDynamicStoreCopyConsoleUser(None, None, None)
return cfuser[0]
def get_os_version(only_major_minor=True, as_tuple=False):
"""Returns an OS version.
Args:
only_major_minor: Boolean. If True, only include major/minor versions.
as_tuple: Boolean. If True, return a tuple of ints, otherwise a string.
100%, completely stolen from Munki.
"""
os_version_tuple = platform.mac_ver()[0].split('.')
if only_major_minor:
os_version_tuple = os_version_tuple[0:2]
if as_tuple:
return tuple(map(int, os_version_tuple))
else:
return '.'.join(os_version_tuple)
def set_pref(pref_name, pref_value):
"""Sets a preference, writing it to
/Library/Preferences/com.grahamgilbert.crypt.plist.
This should normally be used only for 'bookkeeping' values;
values that control the behavior of crypt may be overridden
elsewhere (by MCX, for example)"""
try:
CFPreferencesSetValue(
pref_name, pref_value, BUNDLE_ID,
kCFPreferencesAnyUser, kCFPreferencesCurrentHost)
CFPreferencesAppSynchronize(BUNDLE_ID)
except Exception:
pass
def pref(pref_name):
"""Return a preference. Since this uses CFPreferencesCopyAppValue,
Preferences can be defined several places. Precedence is:
- MCX
- /var/root/Library/Preferences/com.grahamgilbert.crypt.plist
- /Library/Preferences/com.grahamgilbert.crypt.plist
- default_prefs defined here.
"""
default_prefs = {
'RemovePlist': True,
'RotateUsedKey': True,
'OutputPath': '/private/var/root/crypt_output.plist',
'ValidateKey': True,
'KeyEscrowInterval': 1
}
pref_value = CFPreferencesCopyAppValue(pref_name, BUNDLE_ID)
if pref_value is None:
pref_value = default_prefs.get(pref_name)
# we're using a default value. We'll write it out to
# /Library/Preferences/<BUNDLE_ID>.plist for admin
# discoverability
set_pref(pref_name, pref_value)
if isinstance(pref_value, NSDate):
# convert NSDate/CFDates to strings
pref_value = str(pref_value)
return pref_value
def GetMacName():
"""
Returns the name of the mac
"""
theprocess = ['scutil', '--get', 'ComputerName']
thename = subprocess.Popen(theprocess, stdin=subprocess.PIPE,
stdout=subprocess.PIPE).communicate()[0]
thename = thename.strip()
return thename
def escrow_key(plist):
logging.info('Attempting to Escrow Key...')
server_url = pref('ServerURL')
logging.debug('ServerURL Pref set to: {0}...'.format(server_url))
if server_url is None:
return False
if server_url.endswith("/"):
theurl = server_url+"checkin/"
else:
theurl = server_url+"/checkin/"
# In the future, we're going to submit the whole plist, but for now...
serial = plist['SerialNumber']
key = plist['RecoveryKey']
username = plist['EnabledUser']
macname = GetMacName()
mydata = [
('serial', serial), ('recovery_password', key),
('username', username), ('macname', macname)
]
mydata = urllib.urlencode(mydata)
cmd = ['/usr/bin/curl', '-fsSL', '--data', mydata, theurl]
task = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(output, error) = task.communicate()
if task.returncode == 0:
logging.info('Key escrow successful.')
server_initiated_rotation(output)
return True
else:
logging.error('Key escrow unsuccessful.')
return False
def server_initiated_rotation(output):
"""
Rotate the key if the server tells us to.
We need the old key to be present on disk and RotateUsedKey to be True
"""
try:
json_output = json.loads(output)
except ValueError:
return ''
if not pref('RotateUsedKey') or pref('RemovePlist'):
# Don't do anything if we don't care about the good stuff
return ''
output_plist = pref('OutputPath')
if not os.path.isfile(output_plist):
# Need this to be here too (which it should, but you never know..)
return ''
if json_output.get('rotation_required', False):
logging.info('Removing output plist for rotation at next login.')
os.remove(output_plist)
post_run_command()
def using_recovery_key():
"""Check if FileVault is currently unlocked using
the recovery key.
"""
cmd = ['/usr/bin/fdesetup', 'usingrecoverykey']
try:
using_key = subprocess.check_output(cmd).strip()
except Exception:
logging.warning('fdesetup usingrecoverykey failed to run correctly')
return False
if using_key == 'true':
logging.warning('Detected Recovery Key use.')
return True
else:
return False
def post_run_command():
run_command = pref('PostRunCommand')
output_plist = pref('OutputPath')
if run_command and not os.path.isfile(output_plist):
logging.info('Running {}'.format(run_command))
try:
output = subprocess.check_output(run_command)
logging.info(output)
except subprocess.CalledProcessError as e:
logging.error('Failed to run PostRunCommand: {}'.format(e))
def get_recovery_key(key_location):
"""Returns recovery key as a string... If we failed
to get the proper information, returns an empty string"""
# checks to see if recovery key preference is set
try:
keyplist = FoundationPlist.readPlist(key_location)
recovery_key = keyplist['RecoveryKey'].strip()
return recovery_key
except FoundationPlist.NSPropertyListSerializationException:
logging.info(
'We had trouble getting info from {0}...'.format(key_location))
return False
except KeyError:
logging.warning(
'Problem with Key: "RecoveryKey" in {0}...'.format(key_location))
return False
def rotate_invalid_key(plist_path):
"""
Will send the key (if present) for validation. If validation fails,
it will remove the plist so the key can be regenerated at next login.
Due to the bug that restricts the number of validations before reboot
in versions of macOS prior to 10.12.5, this will only run there.
"""
# a work aroud for https://github.com/grahamgilbert/crypt/issues/68
if not get_console_user():
logging.info('Skipping Validation, no user is logged in.')
return True
macos_version = get_os_version(only_major_minor=False, as_tuple=False)
if LooseVersion('10.12.5') > LooseVersion(macos_version):
logging.warning('macOS version is too old to run reliably')
return False
if os.path.isfile(plist_path):
recovery_key = get_recovery_key(plist_path)
else:
logging.warning('Recovery key is not present on disk')
return False
if recovery_key is not False:
key_is_valid = validate_key(recovery_key)
else:
logging.warning('Could not retrieve recovery key from plist')
return False
if not key_is_valid:
logging.info('Stored recovery key is not valid, removing from disk')
os.remove(plist_path)
return False
logging.info('Stored recovery key is valid.')
return True
def validate_key(current_key):
"""Validates the given recovery key against FileVault, returns True
or False accordingly"""
key = {'Password': current_key}
input_plist = FoundationPlist.writePlistToString(key)
cmd = subprocess.Popen(['/usr/bin/fdesetup', 'validaterecovery',
'-inputplist'],
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout_data, err = cmd.communicate(input=input_plist)
if err:
logging.error(err)
if stdout_data.strip() == 'true':
return True
else:
logging.error('Recovery Key could not be validated.')
logging.error('Failed with Error: {}'.format(stdout_data))
return False
def rotate_key(current_key, plist):
"""This rotates the recovery key to something new
by using the current recovery key"""
rotate_inputplist = {'Password': current_key}
input_plist = FoundationPlist.writePlistToString(rotate_inputplist)
cmd = subprocess.Popen(['/usr/bin/fdesetup', 'changerecovery', '-personal',
'-outputplist', '-inputplist'],
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout_data, err = cmd.communicate(input=input_plist)
logging.info('Attempting to rotate Recovery Key.')
try:
output_plist = FoundationPlist.readPlistFromString(stdout_data)
FoundationPlist.writePlist(output_plist, plist)
logging.info('Recovery Key rotated.')
except Exception:
if err:
logging.warning('Encountered error Key Rotation: {0}.'.format(err))
def get_enabled_user():
"""Crypt needs an enabled user in its plist that our normal output
doesn't give us so we need to add a user to the plist"""
if pref('SkipUsers'):
nonusers = pref('SkipUsers')
else:
nonusers = []
fde_users = subprocess.check_output(
["/usr/bin/fdesetup", "list"]).split('\n')
for user in fde_users:
if not user.split(',')[0] in nonusers:
cryptuser = user.split(',')[0]
break
return cryptuser
def rotate_if_used(key_path):
"""Checks to see if the recovery key was used to unlock the machine
if it was then use our current key to rotate it"""
if not using_recovery_key():
return ''
if not os.path.isfile(key_path):
logging.warning('Could not locate {0}'.format(key_path))
rotate_message = 'Recovery Key has been used.. Attempting to Rotate'
logging.info(rotate_message)
current_key = get_recovery_key(key_path)
valid_key = validate_key(current_key)
if not valid_key:
logging.error('Our current key is not valid')
return ''
rotate_key(current_key, key_path)
def main():
plist_path = pref('OutputPath')
logging.info('OutputPath Pref is set to: {}'.format(plist_path))
if pref('RotateUsedKey'):
rotate_if_used(plist_path)
if pref('RotateUsedKey') and pref('ValidateKey') and \
not pref('RemovePlist'):
rotate_invalid_key(plist_path)
post_run_command()
if os.path.isfile(plist_path):
plist = FoundationPlist.readPlist(plist_path)
# Exit if we've run this within the last hour
try:
enableduser = plist['EnabledUser']
except KeyError as e:
enableduser = get_console_user()
skippedusers = ['root', '_mbsetupuser']
if not enableduser or enableduser in skippedusers:
enableduser = get_enabled_user()
plist['EnabledUser'] = enableduser
if 'last_run' in plist:
try:
escrow_interval = int(pref('KeyEscrowInterval'))
except Exception:
escrow_interval = 1
logging.info('KeyEscrowInterval set to: {} hour(s)...'.format(
escrow_interval))
now = datetime.datetime.now()
hour_ago = now - datetime.timedelta(hours=escrow_interval)
if plist['last_run'] > hour_ago:
logging.info(
'We escrowed less than {} hour(s) ago. Skipping...'.format(
escrow_interval))
sys.exit(0)
escrow_result = escrow_key(plist=plist)
if escrow_result and os.path.isfile(plist_path):
remove_plist = pref('RemovePlist')
plist['escrow_success'] = True
plist['last_run'] = datetime.datetime.now()
FoundationPlist.writePlist(plist, plist_path)
if remove_plist is True:
os.remove(plist_path)
logging.info('Removing plist due to configuration.')
else:
os.chmod(plist_path, 0600)
logging.info('Ensuring permissions on plist.')
if __name__ == '__main__':
main()