Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3383] Rotate fernet keys. #4225

Merged
merged 1 commit into from Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion airflow/bin/cli.py
Expand Up @@ -600,6 +600,17 @@ def next_execution(args):
print(None)


@cli_utils.action_logging
def rotate_fernet_key(args):
session = settings.Session()
for conn in session.query(Connection).filter(
Connection.is_encrypted | Connection.is_extra_encrypted):
conn.rotate_fernet_key()
for var in session.query(Variable).filter(Variable.is_encrypted):
var.rotate_fernet_key()
session.commit()


@cli_utils.action_logging
def list_dags(args):
dagbag = DagBag(process_subdir(args.subdir))
Expand Down Expand Up @@ -2068,7 +2079,14 @@ class CLIFactory(object):
'func': next_execution,
'help': "Get the next execution datetime of a DAG.",
'args': ('dag_id', 'subdir')
}
},
{
'func': rotate_fernet_key,
'help': 'Rotate all encrypted connection credentials and variables; see '
'https://airflow.readthedocs.io/en/stable/howto/secure-connections.html'
'#rotating-encryption-keys.',
'args': (),
},
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
dag_subparsers = (
Expand Down
12 changes: 10 additions & 2 deletions airflow/models/__init__.py
Expand Up @@ -159,7 +159,7 @@ def get_fernet():
if _fernet:
return _fernet
try:
from cryptography.fernet import Fernet, InvalidToken
from cryptography.fernet import Fernet, MultiFernet, InvalidToken
global InvalidFernetToken
InvalidFernetToken = InvalidToken

Expand All @@ -178,7 +178,10 @@ def get_fernet():
)
_fernet = NullFernet()
else:
_fernet = Fernet(fernet_key.encode('utf-8'))
_fernet = MultiFernet([
Fernet(fernet_part.encode('utf-8'))
for fernet_part in fernet_key.split(',')
])
_fernet.is_encrypted = True
except (ValueError, TypeError) as ve:
raise AirflowException("Could not create Fernet object: {}".format(ve))
Expand Down Expand Up @@ -4477,6 +4480,11 @@ def set(cls, key, value, serialize_json=False, session=None):
session.add(Variable(key=key, val=stored_value))
session.flush()

def rotate_fernet_key(self):
fernet = get_fernet()
if self._val and self.is_encrypted:
self._val = fernet.rotate(self._val.encode('utf-8')).decode()


class XCom(Base, LoggingMixin):
"""
Expand Down
7 changes: 7 additions & 0 deletions airflow/models/connection.py
Expand Up @@ -173,6 +173,13 @@ def extra(cls):
return synonym('_extra',
descriptor=property(cls.get_extra, cls.set_extra))

def rotate_fernet_key(self):
fernet = get_fernet()
if self._password and self.is_encrypted:
self._password = fernet.rotate(self._password.encode('utf-8')).decode()
if self._extra and self.is_extra_encrypted:
self._extra = fernet.rotate(self._extra.encode('utf-8')).decode()

def get_hook(self):
try:
if self.conn_type == 'mysql':
Expand Down
15 changes: 15 additions & 0 deletions docs/howto/secure-connections.rst
Expand Up @@ -48,3 +48,18 @@ variable over the value in ``airflow.cfg``:

4. Restart Airflow webserver.
5. For existing connections (the ones that you had defined before installing ``airflow[crypto]`` and creating a Fernet key), you need to open each connection in the connection admin UI, re-type the password, and save it.

Rotating encryption keys
========================

Once connection credentials and variables have been encrypted using a fernet
key, changing the key will cause decryption of existing credentials to fail. To
rotate the fernet key without invalidating existing encrypted values, prepend
the new key to the ``fernet_key`` setting, run
``airflow rotate_fernet_key``, and then drop the original key from
``fernet_keys``:

1. Set ``fernet_key`` to ``new_fernet_key,old_fernet_key``.
2. Run ``airflow rotate_fernet_key`` to reencrypt existing credentials
with the new fernet key.
3. Set ``fernet_key`` to ``new_fernet_key``.
105 changes: 99 additions & 6 deletions tests/models.py
Expand Up @@ -38,6 +38,7 @@
from mock import ANY, Mock, mock_open, patch
from parameterized import parameterized
from freezegun import freeze_time
from cryptography.fernet import Fernet

from airflow import AirflowException, configuration, models, settings
from airflow.contrib.sensors.python_sensor import PythonSensor
Expand All @@ -50,6 +51,7 @@
from airflow.models import State as ST
from airflow.models import TaskReschedule as TR
from airflow.models import XCom
from airflow.models import Variable
from airflow.models import clear_task_instances
from airflow.models.connection import Connection
from airflow.operators.bash_operator import BashOperator
Expand Down Expand Up @@ -3083,27 +3085,118 @@ def test_xcom_get_many(self):
self.assertEqual(result.value, json_obj)


class VariableTest(unittest.TestCase):
def setUp(self):
models._fernet = None

def tearDown(self):
models._fernet = None

@patch('airflow.models.configuration.conf.get')
def test_variable_no_encryption(self, mock_get):
"""
Test variables without encryption
"""
mock_get.return_value = ''
Variable.set('key', 'value')
session = settings.Session()
test_var = session.query(Variable).filter(Variable.key == 'key').one()
self.assertFalse(test_var.is_encrypted)
self.assertEqual(test_var.val, 'value')

@patch('airflow.models.configuration.conf.get')
def test_variable_with_encryption(self, mock_get):
"""
Test variables with encryption
"""
mock_get.return_value = Fernet.generate_key().decode()
Variable.set('key', 'value')
session = settings.Session()
test_var = session.query(Variable).filter(Variable.key == 'key').one()
self.assertTrue(test_var.is_encrypted)
self.assertEqual(test_var.val, 'value')

@patch('airflow.models.configuration.conf.get')
def test_var_with_encryption_rotate_fernet_key(self, mock_get):
"""
Tests rotating encrypted variables.
"""
key1 = Fernet.generate_key()
key2 = Fernet.generate_key()

mock_get.return_value = key1.decode()
Variable.set('key', 'value')
session = settings.Session()
test_var = session.query(Variable).filter(Variable.key == 'key').one()
self.assertTrue(test_var.is_encrypted)
self.assertEqual(test_var.val, 'value')
self.assertEqual(Fernet(key1).decrypt(test_var._val.encode()), b'value')

# Test decrypt of old value with new key
mock_get.return_value = ','.join([key2.decode(), key1.decode()])
models._fernet = None
self.assertEqual(test_var.val, 'value')

# Test decrypt of new value with new key
test_var.rotate_fernet_key()
self.assertTrue(test_var.is_encrypted)
self.assertEqual(test_var.val, 'value')
self.assertEqual(Fernet(key2).decrypt(test_var._val.encode()), b'value')


class ConnectionTest(unittest.TestCase):
@patch.object(configuration, 'get')
def setUp(self):
models._fernet = None

def tearDown(self):
models._fernet = None

@patch('airflow.models.configuration.conf.get')
def test_connection_extra_no_encryption(self, mock_get):
"""
Tests extras on a new connection without encryption. The fernet key
is set to a non-base64-encoded string and the extra is stored without
encryption.
"""
mock_get.return_value = ''
test_connection = Connection(extra='testextra')
self.assertFalse(test_connection.is_extra_encrypted)
self.assertEqual(test_connection.extra, 'testextra')

@patch.object(configuration, 'get')
@patch('airflow.models.configuration.conf.get')
def test_connection_extra_with_encryption(self, mock_get):
"""
Tests extras on a new connection with encryption. The fernet key
is set to a base64 encoded string and the extra is encrypted.
Tests extras on a new connection with encryption.
"""
mock_get.return_value = Fernet.generate_key().decode()
test_connection = Connection(extra='testextra')
self.assertTrue(test_connection.is_extra_encrypted)
self.assertEqual(test_connection.extra, 'testextra')

@patch('airflow.models.configuration.conf.get')
def test_connection_extra_with_encryption_rotate_fernet_key(self, mock_get):
"""
Tests rotating encrypted extras.
"""
# 'dGVzdA==' is base64 encoded 'test'
mock_get.return_value = 'dGVzdA=='
key1 = Fernet.generate_key()
key2 = Fernet.generate_key()

mock_get.return_value = key1.decode()
test_connection = Connection(extra='testextra')
self.assertTrue(test_connection.is_extra_encrypted)
self.assertEqual(test_connection.extra, 'testextra')
self.assertEqual(Fernet(key1).decrypt(test_connection._extra.encode()), b'testextra')

# Test decrypt of old value with new key
mock_get.return_value = ','.join([key2.decode(), key1.decode()])
models._fernet = None
self.assertEqual(test_connection.extra, 'testextra')

# Test decrypt of new value with new key
test_connection.rotate_fernet_key()
self.assertTrue(test_connection.is_extra_encrypted)
self.assertEqual(test_connection.extra, 'testextra')
self.assertEqual(Fernet(key2).decrypt(test_connection._extra.encode()), b'testextra')

def test_connection_from_uri_without_extras(self):
uri = 'scheme://user:password@host%2flocation:1234/schema'
Expand Down