Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up grid_data endpoint by 10x #24284

Merged
merged 4 commits into from
Jun 15, 2022

Conversation

ashb
Copy link
Member

@ashb ashb commented Jun 7, 2022

These changes make the endpoint go from almost 20s down to 1.5s and the
changes are two fold:

  1. Keep datetimes as objects for as long as possible

    Previously we were converting start/end dates for a task group to a
    string, and then in the parent parsing it back to a datetime to find
    the min and max of all the child nodes.

    The fix for that was to leave it as a datetime (or a
    pendulum.DateTime technically) and use the existing
    AirflowJsonEncoder class to "correctly" encode these objects on
    output.

  2. Reduce the number of DB queries from 1 per task to 1.

    The removed get_task_summaries function was called for each task,
    and was making a query to the database to find info for the given
    DagRuns.

    The helper function now makes just a single DB query for all
    tasks/runs and constructs a dict to efficiently look up the ti by
    run_id.

@boring-cyborg boring-cyborg bot added the area:webserver Webserver related Issues label Jun 7, 2022
@ashb ashb requested review from jedcunningham and removed request for ryanahamilton June 7, 2022 12:14
@ashb ashb added this to the Airflow 2.3.3 milestone Jun 7, 2022
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ it

@ashb
Copy link
Member Author

ashb commented Jun 7, 2022

(I'm still not thrilled at it taking 1.5s mind you, but it's a darn sight better)

@ashb
Copy link
Member Author

ashb commented Jun 7, 2022

Tests were done with a dag with 2000 dummy task grouped into 100 task groups

@potiuk
Copy link
Member

potiuk commented Jun 7, 2022

I guess just transferring and parsing the resulting json will take most of the time now. BTW. Are we using deflate there :D? AnNd there are quite a few tricks with Javascript VM optimization for JSON which we MIGHT take a look at :)

@ashb
Copy link
Member Author

ashb commented Jun 7, 2022

There are many much much slower points to optomize on the front end before we have to worry about time spent parsing date times on the client side.

@ashb ashb requested a review from ephraimbuddy June 7, 2022 12:37
@github-actions
Copy link

github-actions bot commented Jun 7, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jun 7, 2022
Copy link
Contributor

@norm norm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it.

@bbovenzi
Copy link
Contributor

I'm getting this error during auto-refresh:

TypeError: '<' not supported between instances of 'datetime.datetime' and 'NoneType'

@ashb
Copy link
Member Author

ashb commented Jun 13, 2022

Stack trace for that?

I didn't test with running DAGs. Oops! (Good thing we can add tests now)

@bbovenzi
Copy link
Contributor

Stack trace for that?

I didn't test with running DAGs. Oops! (Good thing we can add tests now)

TypeError
TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'

Traceback (most recent call last)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2464, in __call__
 
    def __call__(self, environ, start_response):
        """The WSGI server calls the Flask application object as the
        WSGI application. This calls :meth:`wsgi_app` which can be
        wrapped to applying middleware."""
        return self.wsgi_app(environ, start_response)
 
    def __repr__(self):
        return "<%s %r>" % (self.__class__.__name__, self.name)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2450, in wsgi_app
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1867, in handle_exception
            # if we want to repropagate the exception, we can attempt to
            # raise it with the whole traceback in case we can do that
            # (the function was actually called from the except part)
            # otherwise, we just raise the error again
            if exc_value is e:
                reraise(exc_type, exc_value, tb)
            else:
                raise e
 
        self.log_exception((exc_type, exc_value, tb))
        server_error = InternalServerError()
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    import collections.abc as collections_abc
 
    def reraise(tp, value, tb=None):
        if value.__traceback__ is not tb:
            raise value.with_traceback(tb)
        raise value
 
    implements_to_string = _identity
 
else:
    iterkeys = lambda d: d.iterkeys()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2447, in wsgi_app
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1952, in full_dispatch_request
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)
 
    def finalize_request(self, rv, from_error_handler=False):
        """Given the return value from a view function this finalizes
        the request by converting it into a response and invoking the
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1821, in handle_user_exception
            return self.handle_http_exception(e)
 
        handler = self._find_error_handler(e)
 
        if handler is None:
            reraise(exc_type, exc_value, tb)
        return handler(e)
 
    def handle_exception(self, e):
        """Handle an exception that did not have an error handler
        associated with it, or that was raised from an error handler.
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
    import collections.abc as collections_abc
 
    def reraise(tp, value, tb=None):
        if value.__traceback__ is not tb:
            raise value.with_traceback(tb)
        raise value
 
    implements_to_string = _identity
 
else:
    iterkeys = lambda d: d.iterkeys()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1950, in full_dispatch_request
        self.try_trigger_before_first_request_functions()
        try:
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)
 
    def finalize_request(self, rv, from_error_handler=False):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1936, in dispatch_request
            getattr(rule, "provide_automatic_options", False)
            and req.method == "OPTIONS"
        ):
            return self.make_default_options_response()
        # otherwise dispatch to the handler for that endpoint
        return self.view_functions[rule.endpoint](**req.view_args)
 
    def full_dispatch_request(self):
        """Dispatches the request and on top of that performs request
        pre and postprocessing as well as HTTP exception catching and
        error handling.
File "/opt/airflow/airflow/www/auth.py", line 43, in decorated
 
            dag_id = (
                request.args.get("dag_id") or request.form.get("dag_id") or (request.json or {}).get("dag_id")
            )
            if appbuilder.sm.check_authorization(permissions, dag_id):
                return func(*args, **kwargs)
            elif not g.user.is_anonymous and not g.user.perms:
                return (
                    render_template(
                        'airflow/no_roles_permissions.html',
                        hostname=socket.getfqdn()
File "/opt/airflow/airflow/www/views.py", line 3622, in grid_data
 
            dag_runs = query.order_by(DagRun.execution_date.desc()).limit(num_runs).all()
            dag_runs.reverse()
            encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
            data = {
                'groups': dag_to_grid(dag, dag_runs, session),
                'dag_runs': encoded_runs,
            }
 
        # avoid spaces to reduce payload size
        return (
File "/opt/airflow/airflow/www/views.py", line 396, in dag_to_grid
            'children': children,
            'tooltip': task_group.tooltip,
            'instances': group_summaries,
        }
 
    return task_group_to_grid(dag.task_group, dag_runs, grouped_tis)
 
 
def task_group_to_dict(task_item_or_group):
    """
    Create a nested dict representation of this TaskGroup and its children used to construct
File "/opt/airflow/airflow/www/views.py", line 357, in task_group_to_grid
 
        # Task Group
        task_group = item
 
        children = [
            task_group_to_grid(child, dag_runs, grouped_tis) for child in task_group.topological_sort()
        ]
 
        def get_summary(dag_run, children):
            child_instances = [child['instances'] for child in children if 'instances' in child]
            child_instances = [
File "/opt/airflow/airflow/www/views.py", line 357, in <listcomp>
 
        # Task Group
        task_group = item
 
        children = [
            task_group_to_grid(child, dag_runs, grouped_tis) for child in task_group.topological_sort()
        ]
 
        def get_summary(dag_run, children):
            child_instances = [child['instances'] for child in children if 'instances' in child]
            child_instances = [
File "/opt/airflow/airflow/www/views.py", line 341, in task_group_to_grid
                if record:
                    set_overall_state(record)
                    yield record
 
            if item.is_mapped:
                instances = list(_mapped_summary(grouped_tis.get(item.task_id, [])))
            else:
                instances = list(map(_get_summary, grouped_tis.get(item.task_id, [])))
 
            return {
                'id': item.task_id,
File "/opt/airflow/airflow/www/views.py", line 333, in _mapped_summary
                            'end_date': ti_summary.end_date,
                            'mapped_states': {ti_summary.state: ti_summary.state_count},
                            'state': None,  # We change this before yielding
                        }
                        continue
                    record['start_date'] = min(record['start_date'], ti_summary.start_date)
                    record['end_date'] = max(record['end_date'], ti_summary.end_date)
                    record['mapped_states'][ti_summary.state] = ti_summary.state_count
                if record:
                    set_overall_state(record)
                    yield record
TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'

ashb added 3 commits June 14, 2022 15:36
These changes make the endpoint go from almost 20s down to 1.5s and the
changes are two fold:

1. Keep datetimes as objects for as long as possible

   Previously we were converting start/end dates for a task group to a
   string, and then in the parent parsing it back to a datetime to find
   the min and max of all the child nodes.

   The fix for that was to leave it as a datetime (or a
   pendulum.DateTime technically) and use the existing
   `AirflowJsonEncoder` class to "correctly" encode these objects on
   output.

2. Reduce the number of DB queries from 1 per task to 1.

   The removed `get_task_summaries` function was called for each task,
   and was making a query to the database to find info for the given
   DagRuns.

   The helper function now makes just a single DB query for all
   tasks/runs and constructs a dict to efficiently look up the ti by
   run_id.
Note that this possibly has incorrect behaviour, in that the end_date of
a TaskGroup is set to the max of all the children's end dates, even if
some are still running. (This is the existing behaviour and is not
changed or altered by this change - limiting it to just performance
fixes)
@ashb ashb force-pushed the quicker-grid-data-endpoint branch from d471f1f to b1bf967 Compare June 14, 2022 14:37
@ashb
Copy link
Member Author

ashb commented Jun 14, 2022

PTAL @bbovenzi I've fixed the issue

@ashb ashb requested review from potiuk and bbovenzi June 14, 2022 14:56
@bbovenzi
Copy link
Contributor

bbovenzi commented Jun 14, 2022

Looking good. But there are some linting issues. run yarn lint in airflow/www. They should have been caught in precommit. I'll look into that

@ashb
Copy link
Member Author

ashb commented Jun 15, 2022

Helm/Kube tests are not caused by this PR and are failing on main too. Merging.

@ashb ashb merged commit 451a6f4 into apache:main Jun 15, 2022
@ashb ashb deleted the quicker-grid-data-endpoint branch June 15, 2022 12:02
ephraimbuddy pushed a commit that referenced this pull request Jun 30, 2022
* Speed up grid_data endpoint by 10x

These changes make the endpoint go from almost 20s down to 1.5s and the
changes are two fold:

1. Keep datetimes as objects for as long as possible

   Previously we were converting start/end dates for a task group to a
   string, and then in the parent parsing it back to a datetime to find
   the min and max of all the child nodes.

   The fix for that was to leave it as a datetime (or a
   pendulum.DateTime technically) and use the existing
   `AirflowJsonEncoder` class to "correctly" encode these objects on
   output.

2. Reduce the number of DB queries from 1 per task to 1.

   The removed `get_task_summaries` function was called for each task,
   and was making a query to the database to find info for the given
   DagRuns.

   The helper function now makes just a single DB query for all
   tasks/runs and constructs a dict to efficiently look up the ti by
   run_id.

* Add support for mapped tasks in the grid data

* Don't fail when not all tasks have a finish date.

Note that this possibly has incorrect behaviour, in that the end_date of
a TaskGroup is set to the max of all the children's end dates, even if
some are still running. (This is the existing behaviour and is not
changed or altered by this change - limiting it to just performance
fixes)

(cherry picked from commit 451a6f4)
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Jun 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:webserver Webserver related Issues full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants