Skip to content

Commit

Permalink
keyrings/cryptfile/file{,_base}.py: from keyring
Browse files Browse the repository at this point in the history
  • Loading branch information
Hans-Peter Jansen committed Sep 29, 2020
1 parent 8d06a0e commit ae871c0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 73 deletions.
83 changes: 37 additions & 46 deletions keyrings/cryptfile/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,38 @@
import sys
import json
import getpass

import configparser

from keyring.util import properties
from .escape import escape as escape_for_ini

from keyrings.cryptfile.file_base import Keyring, decodebytes, encodebytes

from keyrings.cryptfile.file_base import (
Keyring, decodebytes, encodebytes,
)
from keyrings.cryptfile._escape import escape as escape_for_ini

class PlaintextKeyring(Keyring):
"""Simple File Keyring with no encryption"""

priority = .5
priority = 0.5
"Applicable for all platforms, but not recommended"

filename = 'keyring_pass.cfg'
scheme = 'no encyption'
version = '1.0'

def encrypt(self, password, assoc = None):
"""Directly return the password itself, ignore associated data.
"""
def encrypt(self, password, assoc=None):
"""Directly return the password itself, ignore associated data."""
return password

def decrypt(self, password_encrypted, assoc = None):
"""Directly return encrypted password, ignore associated data.
"""
def decrypt(self, password_encrypted, assoc=None):
"""Directly return encrypted password, ignore associated data."""
return password_encrypted


class Encrypted(object):
"""
PyCrypto-backed Encryption support
"""

scheme = '[PBKDF2] AES256.CFB'
version = '1.0'
block_size = 32
Expand All @@ -49,15 +46,15 @@ def _create_cipher(self, password, salt, IV):
"""
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Cipher import AES

pw = PBKDF2(password, salt, dkLen=self.block_size)
return AES.new(pw[:self.block_size], AES.MODE_CFB, IV)
return AES.new(pw[: self.block_size], AES.MODE_CFB, IV)

def _get_new_password(self):
while True:
password = getpass.getpass(
"Please set a password for your new keyring: ")
password = getpass.getpass("Please set a password for your new keyring: ")
confirm = getpass.getpass('Please confirm the password: ')
if password != confirm: # pragma: no cover
if password != confirm: # pragma: no cover
sys.stderr.write("Error: Your passwords didn't match\n")
continue
if '' == password.strip(): # pragma: no cover
Expand All @@ -81,12 +78,11 @@ def priority(self):
__import__('Crypto.Cipher.AES')
__import__('Crypto.Protocol.KDF')
__import__('Crypto.Random')
except ImportError: # pragma: no cover
except ImportError: # pragma: no cover
raise RuntimeError("PyCrypto required")
if not json: # pragma: no cover
raise RuntimeError("JSON implementation such as simplejson "
"required.")
return .6
if not json: # pragma: no cover
raise RuntimeError("JSON implementation such as simplejson required.")
return 0.6

@properties.NonDataProperty
def keyring_key(self):
Expand All @@ -104,15 +100,11 @@ def _init_file(self):
self.keyring_key = self._get_new_password()
# set a reference password, used to check that the password provided
# matches for subsequent checks.
self.set_password('keyring-setting',
'password reference',
'password reference value')
self._write_config_value('keyring-setting',
'scheme',
self.scheme)
self._write_config_value('keyring-setting',
'version',
self.version)
self.set_password(
'keyring-setting', 'password reference', 'password reference value'
)
self._write_config_value('keyring-setting', 'scheme', self.scheme)
self._write_config_value('keyring-setting', 'version', self.version)

def _check_file(self):
"""
Expand All @@ -125,8 +117,7 @@ def _check_file(self):
config.read(self.file_path)
try:
config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('password reference'),
escape_for_ini('keyring-setting'), escape_for_ini('password reference')
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
Expand All @@ -146,8 +137,7 @@ def _check_scheme(self, config):
"""
try:
scheme = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('scheme'),
escape_for_ini('keyring-setting'), escape_for_ini('scheme')
)
except (configparser.NoSectionError, configparser.NoOptionError):
raise AttributeError("Encryption scheme missing")
Expand All @@ -157,8 +147,10 @@ def _check_scheme(self, config):
scheme = scheme[9:]

if scheme != self.scheme:
raise ValueError("Encryption scheme mismatch "
"(exp.: %s, found: %s)" % (self.scheme, scheme))
raise ValueError(
"Encryption scheme mismatch "
"(exp.: %s, found: %s)" % (self.scheme, scheme)
)

def _check_version(self, config):
"""
Expand All @@ -169,8 +161,7 @@ def _check_version(self, config):
"""
try:
self.file_version = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('version'),
escape_for_ini('keyring-setting'), escape_for_ini('version')
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
Expand All @@ -182,7 +173,8 @@ def _unlock(self):
user.
"""
self.keyring_key = getpass.getpass(
'Please enter password for encrypted keyring: ')
'Please enter password for encrypted keyring: '
)
try:
ref_pw = self.get_password('keyring-setting', 'password reference')
assert ref_pw == 'password reference value'
Expand All @@ -196,30 +188,29 @@ def _lock(self):
"""
del self.keyring_key

def encrypt(self, password, assoc = None):
def encrypt(self, password, assoc=None):
# encrypt password, ignore associated data
from Crypto.Random import get_random_bytes

salt = get_random_bytes(self.block_size)
from Crypto.Cipher import AES

IV = get_random_bytes(AES.block_size)
cipher = self._create_cipher(self.keyring_key, salt, IV)
password_encrypted = cipher.encrypt(self.pw_prefix + password)
# Serialize the salt, IV, and encrypted password in a secure format
data = dict(
salt=salt, IV=IV, password_encrypted=password_encrypted,
)
data = dict(salt=salt, IV=IV, password_encrypted=password_encrypted)
for key in data:
# spare a few bytes: throw away newline from base64 encoding
data[key] = encodebytes(data[key]).decode()[:-1]
return json.dumps(data).encode()

def decrypt(self, password_encrypted, assoc = None):
def decrypt(self, password_encrypted, assoc=None):
# unpack the encrypted payload, ignore associated data
data = json.loads(password_encrypted.decode())
for key in data:
data[key] = decodebytes(data[key].encode())
cipher = self._create_cipher(self.keyring_key, data['salt'],
data['IV'])
cipher = self._create_cipher(self.keyring_key, data['salt'], data['IV'])
plaintext = cipher.decrypt(data['password_encrypted'])
assert plaintext.startswith(self.pw_prefix)
return plaintext[3:]
Expand Down
46 changes: 19 additions & 27 deletions keyrings/cryptfile/file_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,13 @@

import os
import abc
import base64

import configparser
from base64 import encodebytes, decodebytes

from keyring.errors import PasswordDeleteError
from keyring.backend import KeyringBackend
from keyring.util import platform_, properties

from keyrings.cryptfile._escape import escape as escape_for_ini

try:
encodebytes = base64.encodebytes
except AttributeError: # pragma: no cover
encodebytes = base64.encodestring

try:
decodebytes = base64.decodebytes
except AttributeError: # pragma: no cover
decodebytes = base64.decodestring
from .escape import escape as escape_for_ini


class FileBacked(object):
Expand Down Expand Up @@ -60,8 +48,10 @@ def file_version(self):
return None

def __repr__(self):
tmpl = "<{self.__class__.__name__} with {self.scheme} " \
"v.{self.version} at {self.file_path}>"
tmpl = (
"<{self.__class__.__name__} with {self.scheme} "
"v.{self.version} at {self.file_path}>"
)
return tmpl.format(**locals())


Expand All @@ -76,7 +66,7 @@ class Keyring(FileBacked, KeyringBackend):
"""

@abc.abstractmethod
def encrypt(self, password, assoc = None):
def encrypt(self, password, assoc=None):
"""
Given a password (byte string) and assoc (byte string, optional),
return an encrypted byte string.
Expand All @@ -85,7 +75,7 @@ def encrypt(self, password, assoc = None):
"""

@abc.abstractmethod
def decrypt(self, password_encrypted, assoc = None):
def decrypt(self, password_encrypted, assoc=None):
"""
Given a password encrypted by a previous call to `encrypt`, and assoc
(byte string, optional), return the original byte string.
Expand Down Expand Up @@ -122,8 +112,12 @@ def get_password(self, service, username):
return password

def set_password(self, service, username, password):
"""Write the password in the file.
"""
"""Write the password in the file."""
if not username:
# https://github.com/jaraco/keyrings.alt/issues/21
raise ValueError("Username cannot be blank.")
if not isinstance(password, str):
raise TypeError("Password should be a unicode string, not bytes.")
assoc = self._generate_assoc(service, username)
# encrypt the password
password_encrypted = self.encrypt(password.encode('utf-8'), assoc)
Expand All @@ -133,10 +127,8 @@ def set_password(self, service, username, password):
self._write_config_value(service, username, password_base64)

def _generate_assoc(self, service, username):
"""Generate tamper resistant bytestring of associated data

This comment has been minimized.

Copy link
@frispete

frispete Nov 20, 2022

Owner

This commit caused great havoc: the innocent-looking change changed the separation of service and username from a simple null byte string '\0' to the multi-character string: backslash zero r'\0'. Mea culpa.

"""
return (escape_for_ini(service) + '\0' +
escape_for_ini(username)).encode()
"""Generate tamper resistant bytestring of associated data"""
return (escape_for_ini(service) + r'\0' + escape_for_ini(username)).encode()

def _write_config_value(self, service, key, value):
# ensure the file exists
Expand Down Expand Up @@ -164,7 +156,8 @@ def _ensure_file_path(self):
If it doesn't, create it with "go-rwx" permissions.
"""
storage_root = os.path.dirname(self.file_path)
if storage_root and not os.path.isdir(storage_root): # pragma: no cover
needs_storage_root = storage_root and not os.path.isdir(storage_root)
if needs_storage_root: # pragma: no cover
os.makedirs(storage_root)
if not os.path.isfile(self.file_path):
# create the file without group/world permissions
Expand All @@ -174,8 +167,7 @@ def _ensure_file_path(self):
os.chmod(self.file_path, user_read_write)

def delete_password(self, service, username):
"""Delete the password for the username of the service.
"""
"""Delete the password for the username of the service."""
service = escape_for_ini(service)
username = escape_for_ini(username)
config = configparser.RawConfigParser()
Expand Down

0 comments on commit ae871c0

Please sign in to comment.