Skip to content

Commit

Permalink
RQ: Missing currently executing queries view (#4558)
Browse files Browse the repository at this point in the history
* add meta information to executing queries

* add a table for running queries

* add pagination to queues table

* sort the queues table

* add pagination to all tables
  • Loading branch information
Omer Lachish committed Feb 3, 2020
1 parent 2de3895 commit 7a34a76
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 10 deletions.
59 changes: 56 additions & 3 deletions client/app/components/admin/RQStatus.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,19 @@ CounterCard.defaultProps = {

// Tables

const queryJobsColumns = [
{ title: "Queue", dataIndex: "origin" },
{ title: "Query ID", dataIndex: "meta.query_id" },
{ title: "Org ID", dataIndex: "meta.org_id" },
{ title: "Data Source ID", dataIndex: "meta.data_source_id" },
{ title: "User ID", dataIndex: "meta.user_id" },
Columns.custom(scheduled => scheduled.toString(), { title: "Scheduled", dataIndex: "meta.scheduled" }),
Columns.timeAgo({ title: "Start Time", dataIndex: "started_at" }),
Columns.timeAgo({ title: "Enqueue Time", dataIndex: "enqueued_at" }),
];

const otherJobsColumns = [
{ title: "Queue", dataIndex: "queue" },
{ title: "Queue", dataIndex: "origin" },
{ title: "Job Name", dataIndex: "name" },
Columns.timeAgo({ title: "Start Time", dataIndex: "started_at" }),
Columns.timeAgo({ title: "Enqueue Time", dataIndex: "enqueued_at" }),
Expand Down Expand Up @@ -88,13 +99,55 @@ export function WorkersTable({ loading, items }) {
WorkersTable.propTypes = TablePropTypes;

export function QueuesTable({ loading, items }) {
return <Table loading={loading} columns={queuesColumns} rowKey="name" dataSource={items} />;
return (
<Table
loading={loading}
columns={queuesColumns}
rowKey="name"
dataSource={items}
pagination={{
defaultPageSize: 25,
pageSizeOptions: ["10", "25", "50"],
showSizeChanger: true,
}}
/>
);
}

QueuesTable.propTypes = TablePropTypes;

export function QueryJobsTable({ loading, items }) {
return (
<Table
loading={loading}
columns={queryJobsColumns}
rowKey="id"
dataSource={items}
pagination={{
defaultPageSize: 25,
pageSizeOptions: ["10", "25", "50"],
showSizeChanger: true,
}}
/>
);
}

QueryJobsTable.propTypes = TablePropTypes;

export function OtherJobsTable({ loading, items }) {
return <Table loading={loading} columns={otherJobsColumns} rowKey="id" dataSource={items} />;
return (
<Table
loading={loading}
columns={otherJobsColumns}
rowKey="id"
dataSource={items}
pagination={{
defaultPageSize: 25,
pageSizeOptions: ["10", "25", "50"],
showSizeChanger: true,
}}
/>
);
}

OtherJobsTable.propTypes = TablePropTypes;
13 changes: 10 additions & 3 deletions client/app/pages/admin/Jobs.jsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { flatMap, values } from "lodash";
import { partition, flatMap, values } from "lodash";
import React from "react";
import { axios } from "@/services/axios";

Expand All @@ -7,7 +7,7 @@ import Tabs from "antd/lib/tabs";
import * as Grid from "antd/lib/grid";
import routeWithUserSession from "@/components/ApplicationArea/routeWithUserSession";
import Layout from "@/components/admin/Layout";
import { CounterCard, WorkersTable, QueuesTable, OtherJobsTable } from "@/components/admin/RQStatus";
import { CounterCard, WorkersTable, QueuesTable, QueryJobsTable, OtherJobsTable } from "@/components/admin/RQStatus";

import location from "@/services/location";
import recordEvent from "@/services/recordEvent";
Expand Down Expand Up @@ -79,6 +79,10 @@ class Jobs extends React.Component {

render() {
const { isLoading, error, queueCounters, startedJobs, overallCounters, workers, activeTab } = this.state;
const [startedQueryJobs, otherStartedJobs] = partition(startedJobs, [
"name",
"redash.tasks.queries.execution.execute_query",
]);

const changeTab = newTab => {
location.setHash(newTab);
Expand Down Expand Up @@ -108,8 +112,11 @@ class Jobs extends React.Component {
<Tabs.TabPane key="workers" tab="Workers">
<WorkersTable loading={isLoading} items={workers} />
</Tabs.TabPane>
<Tabs.TabPane key="queries" tab="Queries">
<QueryJobsTable loading={isLoading} items={startedQueryJobs} />
</Tabs.TabPane>
<Tabs.TabPane key="other" tab="Other Jobs">
<OtherJobsTable loading={isLoading} items={startedJobs} />
<OtherJobsTable loading={isLoading} items={otherStartedJobs} />
</Tabs.TabPane>
</Tabs>
</React.Fragment>
Expand Down
9 changes: 5 additions & 4 deletions redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ def rq_job_ids():
return flatten(started_jobs + queued_jobs)


def fetch_jobs(queue, job_ids):
def fetch_jobs(job_ids):
return [
{
"id": job.id,
"name": job.func_name,
"queue": queue.name,
"origin": job.origin,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"meta": job.meta,
}
for job in Job.fetch_many(job_ids, connection=rq_redis_connection)
if job is not None
Expand All @@ -88,10 +89,10 @@ def rq_queues():
return {
q.name: {
"name": q.name,
"started": fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
"started": fetch_jobs(StartedJobRegistry(queue=q).get_job_ids()),
"queued": len(q.job_ids),
}
for q in Queue.all()
for q in sorted(Queue.all(), key=lambda q: q.name)
}


Expand Down
7 changes: 7 additions & 0 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def enqueue_query(
scheduled_query_id=scheduled_query_id,
is_api_key=is_api_key,
job_timeout=time_limit,
meta={
"data_source_id": data_source.id,
"org_id": data_source.org_id,
"scheduled": scheduled_query_id is not None,
"query_id": metadata.get("Query ID"),
"user_id": user_id,
}
)

logging.info("[%s] Created new job: %s", query_hash, job.id)
Expand Down

0 comments on commit 7a34a76

Please sign in to comment.