diff --git a/airflow/models/variable.py b/airflow/models/variable.py index ab7beb07828db..9369f26b5e54a 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -22,7 +22,7 @@ from cryptography.fernet import InvalidToken as InvalidFernetToken from sqlalchemy import Boolean, Column, Integer, String, Text from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import synonym +from sqlalchemy.orm import Session, synonym from airflow.models.base import ID_LEN, Base from airflow.models.crypto import get_fernet @@ -32,6 +32,10 @@ class Variable(Base, LoggingMixin): + """ + Variables are a generic way to store and retrieve arbitrary content or settings + as a simple key value store within Airflow. + """ __tablename__ = "variable" __NO_DEFAULT_SENTINEL = object() @@ -45,6 +49,9 @@ def __repr__(self): return '{} : {}'.format(self.key, self._val) def get_val(self): + """ + Get Airflow Variable from Metadata DB and decode it using the Fernet Key + """ if self._val is not None and self.is_encrypted: try: fernet = get_fernet() @@ -52,22 +59,27 @@ def get_val(self): except InvalidFernetToken: self.log.error("Can't decrypt _val for key=%s, invalid token or value", self.key) return None - except Exception: + except Exception: # pylint: disable=broad-except self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key) return None else: return self._val def set_val(self, value): + """ + Encode the specified value with Fernet Key and store it in Variables Table. + """ if value is not None: fernet = get_fernet() self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = fernet.is_encrypted @declared_attr - def val(cls): - return synonym('_val', - descriptor=property(cls.get_val, cls.set_val)) + def val(cls): # pylint: disable=no-self-argument + """ + Get Airflow Variable from Metadata DB and decode it using the Fernet Key + """ + return synonym('_val', descriptor=property(cls.get_val, cls.set_val)) @classmethod def setdefault(cls, key, default, deserialize_json=False): @@ -102,6 +114,13 @@ def get( default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False, ): + """ + Sets a value for an Airflow Key + + :param key: Variable Key + :param default_var: Default value of the Variable if the Variable doesn't exists + :param deserialize_json: Deserialize the value to a Python dict + """ var_val = get_variable(key=key) if var_val is None: if default_var is not cls.__NO_DEFAULT_SENTINEL: @@ -121,8 +140,16 @@ def set( key: str, value: Any, serialize_json: bool = False, - session=None + session: Session = None ): + """ + Sets a value for an Airflow Variable with a given Key + + :param key: Variable Key + :param value: Value to set for the Variable + :param serialize_json: Serialize the value to a JSON string + :param session: SQL Alchemy Sessions + """ if serialize_json: stored_value = json.dumps(value, indent=2) @@ -130,15 +157,22 @@ def set( stored_value = str(value) Variable.delete(key, session=session) - session.add(Variable(key=key, val=stored_value)) # type: ignore + session.add(Variable(key=key, val=stored_value)) # type: ignore # pylint: disable=E1123 session.flush() @classmethod @provide_session - def delete(cls, key, session=None) -> int: + def delete(cls, key: str, session: Session = None) -> int: + """ + Delete an Airflow Variable for a given key + + :param key: Variable Key + :param session: SQL Alchemy Sessions + """ return session.query(cls).filter(cls.key == key).delete() def rotate_fernet_key(self): + """ Rotate Fernet Key """ fernet = get_fernet() if self._val and self.is_encrypted: self._val = fernet.rotate(self._val.encode('utf-8')).decode() diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index ed6d8a815e844..466d033b6e231 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -13,7 +13,6 @@ ./airflow/models/pool.py ./airflow/models/slamiss.py ./airflow/models/taskinstance.py -./airflow/models/variable.py ./airflow/models/xcom.py ./airflow/stats.py ./airflow/www/blueprints.py