Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cryptography.fernet import InvalidToken as InvalidFernetToken
from sqlalchemy import Boolean, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import synonym
from sqlalchemy.orm import Session, synonym

from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
Expand All @@ -32,6 +32,10 @@


class Variable(Base, LoggingMixin):
"""
Variables are a generic way to store and retrieve arbitrary content or settings
as a simple key value store within Airflow.
"""
__tablename__ = "variable"
__NO_DEFAULT_SENTINEL = object()

Expand All @@ -45,29 +49,37 @@ def __repr__(self):
return '{} : {}'.format(self.key, self._val)

def get_val(self):
"""
Get Airflow Variable from Metadata DB and decode it using the Fernet Key
"""
if self._val is not None and self.is_encrypted:
try:
fernet = get_fernet()
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
except InvalidFernetToken:
self.log.error("Can't decrypt _val for key=%s, invalid token or value", self.key)
return None
except Exception:
except Exception: # pylint: disable=broad-except
self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key)
return None
else:
return self._val

def set_val(self, value):
"""
Encode the specified value with Fernet Key and store it in Variables Table.
"""
if value is not None:
fernet = get_fernet()
self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
self.is_encrypted = fernet.is_encrypted

@declared_attr
def val(cls):
return synonym('_val',
descriptor=property(cls.get_val, cls.set_val))
def val(cls): # pylint: disable=no-self-argument
"""
Get Airflow Variable from Metadata DB and decode it using the Fernet Key
"""
return synonym('_val', descriptor=property(cls.get_val, cls.set_val))

@classmethod
def setdefault(cls, key, default, deserialize_json=False):
Expand Down Expand Up @@ -102,6 +114,13 @@ def get(
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
):
"""
Sets a value for an Airflow Key

:param key: Variable Key
:param default_var: Default value of the Variable if the Variable doesn't exists
:param deserialize_json: Deserialize the value to a Python dict
"""
var_val = get_variable(key=key)
if var_val is None:
if default_var is not cls.__NO_DEFAULT_SENTINEL:
Expand All @@ -121,24 +140,39 @@ def set(
key: str,
value: Any,
serialize_json: bool = False,
session=None
session: Session = None
):
"""
Sets a value for an Airflow Variable with a given Key

:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: SQL Alchemy Sessions
"""

if serialize_json:
stored_value = json.dumps(value, indent=2)
else:
stored_value = str(value)

Variable.delete(key, session=session)
session.add(Variable(key=key, val=stored_value)) # type: ignore
session.add(Variable(key=key, val=stored_value)) # type: ignore # pylint: disable=E1123
session.flush()

@classmethod
@provide_session
def delete(cls, key, session=None) -> int:
def delete(cls, key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key

:param key: Variable Key
:param session: SQL Alchemy Sessions
"""
return session.query(cls).filter(cls.key == key).delete()

def rotate_fernet_key(self):
""" Rotate Fernet Key """
fernet = get_fernet()
if self._val and self.is_encrypted:
self._val = fernet.rotate(self._val.encode('utf-8')).decode()
1 change: 0 additions & 1 deletion scripts/ci/pylint_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
./airflow/models/pool.py
./airflow/models/slamiss.py
./airflow/models/taskinstance.py
./airflow/models/variable.py
./airflow/models/xcom.py
./airflow/stats.py
./airflow/www/blueprints.py
Expand Down