Permalink
Browse files

Added ability to cancel queries

  • Loading branch information...
1 parent 9371197 commit a3d21b87f6a45b5e3af82a1d4d98df2cf77cdf67 @ankane committed Sep 2, 2016
@@ -25,15 +25,41 @@ $( function () {
});
});
+function uuid() {
+ return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
+ var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
+ return v.toString(16);
+ });
+}
+
function cancelQuery(runningQuery) {
runningQuery.canceled = true;
var xhr = runningQuery.xhr;
if (xhr) {
xhr.abort();
}
+ remoteCancelQuery(runningQuery);
queryComplete();
}
+function csrfProtect(payload) {
+ var param = $("meta[name=csrf-param]").attr("content");
+ var token = $("meta[name=csrf-token]").attr("content");
+ if (param && token) payload[param] = token;
+ return new Blob([JSON.stringify(payload)], {type : "application/json; charset=utf-8"});
+}
+
+function remoteCancelQuery(runningQuery) {
+ var path = window.cancelQueriesPath;
+ var data = {run_id: runningQuery.run_id, data_source: runningQuery.data_source};
+ if (navigator.sendBeacon) {
+ navigator.sendBeacon(path, csrfProtect(data));
+ } else {
+ // TODO make sync
+ $.post(path, data);
+ }
+}
+
var queriesQueue = [];
var runningQueries = 0;
var maxQueries = 3;
@@ -61,6 +87,9 @@ function queryComplete() {
function runQuery(data, success, error, runningQuery) {
queueQuery( function () {
+ runningQuery = runningQuery || {};
+ runningQuery.run_id = data.run_id = uuid();
+ runningQuery.data_source = data.data_source;
return runQueryHelper(data, success, error, runningQuery);
});
}
@@ -103,9 +103,9 @@ def run
continue_run
end
elsif @success
- @run_id = Blazer.async ? SecureRandom.uuid : nil
+ @run_id = blazer_run_id
- options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id}
+ options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id, async: Blazer.async}
if Blazer.async && request.format.symbol != :csv
result = []
Blazer::RunStatementJob.perform_async(result, @data_source, @statement, options)
@@ -176,6 +176,11 @@ def schema
@schema = Blazer.data_sources[params[:data_source]].schema
end
+ def cancel
+ Blazer.data_sources[params[:data_source]].cancel(blazer_run_id)
+ render json: {}
+ end
+
private
def continue_run
@@ -307,5 +312,9 @@ def blazer_time_value(data_source, k, v)
data_source.local_time_suffix.any? { |s| k.ends_with?(s) } ? v.to_s.sub(" UTC", "") : v.in_time_zone(Blazer.time_zone)
end
helper_method :blazer_time_value
+
+ def blazer_run_id
+ params[:run_id].to_s.gsub(/[^a-z0-9\-]/i, "")
+ end
end
end
@@ -131,6 +131,7 @@
function queryDone() {
+ runningQuery = null
$("#run").removeClass("hide")
$("#cancel").addClass("hide")
}
@@ -144,6 +145,12 @@
$("#results").html("")
})
+ $(window).unload(function() {
+ if (runningQuery) {
+ remoteCancelQuery(runningQuery)
+ }
+ })
+
$("#run").click( function (e) {
e.preventDefault();
@@ -9,6 +9,7 @@
<%= javascript_include_tag "blazer/application" %>
<script>
var runQueriesPath = <%= run_queries_path.to_json.html_safe %>;
+ var cancelQueriesPath = <%= cancel_queries_path.to_json.html_safe %>;
</script>
<% if blazer_maps? %>
<%= stylesheet_link_tag "https://api.mapbox.com/mapbox.js/v2.4.0/mapbox.css" %>
View
@@ -1,6 +1,7 @@
Blazer::Engine.routes.draw do
resources :queries do
post :run, on: :collection # err on the side of caution
+ post :cancel, on: :collection
post :refresh, on: :member
get :tables, on: :collection
get :schema, on: :collection
@@ -35,6 +35,10 @@ def explain(statement)
# optional
end
+ def cancel(run_id)
+ # optional
+ end
+
protected
def settings
@@ -72,8 +72,23 @@ def explain(statement)
nil
end
+ def cancel(run_id)
+ if postgresql?
+ execute("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE '%,run_id:#{run_id}%'")
+ elsif redshift?
+ first_row = connection_model.connection.select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE '%,run_id:#{run_id}%'").first
+ if first_row
+ execute("CANCEL #{first_row["pid"].to_i}")
+ end
+ end
+ end
+
protected
+ def execute(statement)
+ connection_model.connection.execute(statement)
+ end
+
def postgresql?
["PostgreSQL", "PostGIS"].include?(adapter_name)
end
@@ -6,7 +6,7 @@ class DataSource
attr_reader :id, :settings, :adapter, :adapter_instance
- def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain
+ def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
def initialize(id, settings)
@id = id
@@ -109,6 +109,7 @@ def delete_results(run_id)
def run_statement(statement, options = {})
run_id = options[:run_id]
+ async = options[:async]
result = nil
if cache_mode != "off" && !options[:refresh_cache]
result = read_cache(statement_cache_key(statement))
@@ -129,7 +130,10 @@ def run_statement(statement, options = {})
if options[:check]
comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
end
- result = run_statement_helper(statement, comment, options[:run_id])
+ if options[:run_id]
+ comment << ",run_id:#{options[:run_id]}"
+ end
+ result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
end
result

0 comments on commit a3d21b8

Please sign in to comment.