From 60ace51611f0187829055e169ccc6f7855243ade Mon Sep 17 00:00:00 2001 From: feluelle Date: Thu, 11 Jul 2019 17:16:38 +0200 Subject: [PATCH] [AIRFLOW-1740] Fix xcom creation and update via RBAC UI --- airflow/models/xcom.py | 31 ++++++++++++++++++------------- airflow/www/views.py | 15 +++++++++++++++ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index ca049c837b2e5..034b69bc6ef99 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -102,19 +102,7 @@ def set( """ session.expunge_all() - enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling') - if enable_pickling: - value = pickle.dumps(value) - else: - try: - value = json.dumps(value).encode('UTF-8') - except ValueError: - log = LoggingMixin().log - log.error("Could not serialize the XCOM value into JSON. " - "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config.") - raise + value = XCom.serialize_value(value) # remove any duplicate XComs session.query(cls).filter( @@ -229,3 +217,20 @@ def delete(cls, xcoms, session=None): ) session.delete(xcom) session.commit() + + @staticmethod + def serialize_value(value): + # TODO: "pickling" has been deprecated and JSON is preferred. + # "pickling" will be removed in Airflow 2.0. + if configuration.getboolean('core', 'enable_xcom_pickling'): + return pickle.dumps(value) + + try: + return json.dumps(value).encode('UTF-8') + except ValueError: + log = LoggingMixin().log + log.error("Could not serialize the XCOM value into JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") + raise diff --git a/airflow/www/views.py b/airflow/www/views.py index 728c700f5b3ab..71786d6378fa4 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2043,6 +2043,13 @@ class XComModelView(AirflowModelView): base_filters = [['dag_id', DagFilter, lambda: []]] + formatters_columns = { + 'task_id': wwwutils.task_instance_link, + 'execution_date': wwwutils.datetime_f('execution_date'), + 'timestamp': wwwutils.datetime_f('timestamp'), + 'dag_id': wwwutils.dag_link, + } + @action('muldelete', 'Delete', "Are you sure you want to delete selected records?", single=False) def action_muldelete(self, items): @@ -2050,6 +2057,14 @@ def action_muldelete(self, items): self.update_redirect() return redirect(self.get_redirect()) + def pre_add(self, item): + item.execution_date = timezone.make_aware(item.execution_date) + item.value = XCom.serialize_value(item.value) + + def pre_update(self, item): + item.execution_date = timezone.make_aware(item.execution_date) + item.value = XCom.serialize_value(item.value) + class ConnectionModelView(AirflowModelView): route_base = '/connection'