Skip to content

Commit

Permalink
HELP-31507_master: a stop command for tasks (#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenollp authored and icehess committed Aug 15, 2017
1 parent 1389dec commit 7df92ec
Show file tree
Hide file tree
Showing 15 changed files with 709 additions and 159 deletions.
10 changes: 10 additions & 0 deletions applications/crossbar/doc/ref/tasks.md
Expand Up @@ -85,3 +85,13 @@ curl -v -X GET \
http://{SERVER}:8000/v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/input
```

#### Patch

> PATCH /v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/stop
```shell
curl -v -X PATCH \
-H "X-Auth-Token: {AUTH_TOKEN}" \
http://{SERVER}:8000/v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/stop
```

70 changes: 70 additions & 0 deletions applications/crossbar/doc/tasks.md
Expand Up @@ -475,6 +475,76 @@ curl -v -X PATCH \
}
```


#### Stop a running task

Tasks that are processing can be stopped.

Note that they cannot be started again.

> PATCH /v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/stop
```shell
curl -v -X PATCH \
-H "X-Auth-Token: {AUTH_TOKEN}" \
http://{SERVER}:8000/v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/stop
```

##### Success

```json
{
"auth_token": "{AUTH_TOKEN}",
"data": {
"_read_only": {
"account_id": "{ACCOUNT_ID}",
"action": "list_all",
"auth_account_id": "{AUTH_ACCOUNT_ID}",
"category": "number_management",
"created": 63669534312,
"end_timestamp": 63669534747,
"failure_count": 0,
"id": "{TASK_ID}",
"start_timestamp": 63669534746,
"status": "stopped",
"success_count": 0
}
},
"node": "{NODE}",
"page_size": 1,
"request_id": "{REQUEST_ID}",
"revision": "{REV}",
"status": "success",
"timestamp": "{TIMESTAMP}",
"version": "{VERSION}"
}
```

##### Task is not running

A task that was not yet started or that has already finished cannot be stopped.

```json
{
"auth_token": "{AUTH_TOKEN}",
"data": {
"cause": "{TASK_ID}",
"message": "bad identifier",
"reason": "task is not running"
},
"error": "404",
"message": "bad_identifier",
"node": "{NODE}",
"page_size": 1,
"request_id": "{REQUEST_ID}",
"status": "error",
"timestamp": "{TIMESTAMP}",
"version": "{VERSION}"
}
```



#### Retrieve a task's CSVs

When you `GET /v2/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}`, the JSON will include a `"csvs":[...]" array with input and output CSVs as appropriate. Use the name(s) in the array to specify which you would like to receive.
Expand Down
68 changes: 68 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Expand Up @@ -20934,6 +20934,54 @@
],
"type": "object"
},
"kapi.tasks.stop_req": {
"description": "AMQP API for tasks.stop_req",
"properties": {
"Event-Category": {
"enum": [
"tasks"
],
"type": "string"
},
"Event-Name": {
"enum": [
"stop_req"
],
"type": "string"
},
"Task-ID": {
"type": "string"
}
},
"required": [
"Task-ID"
],
"type": "object"
},
"kapi.tasks.stop_resp": {
"description": "AMQP API for tasks.stop_resp",
"properties": {
"Event-Category": {
"enum": [
"tasks"
],
"type": "string"
},
"Event-Name": {
"enum": [
"stop_resp"
],
"type": "string"
},
"Reply": {
"type": "string"
}
},
"required": [
"Reply"
],
"type": "object"
},
"kapi.xmpp.event": {
"description": "AMQP API for xmpp.event",
"properties": {
Expand Down Expand Up @@ -39409,6 +39457,26 @@
}
}
},
"/accounts/{ACCOUNT_ID}/tasks/{TASK_ID}/stop": {
"patch": {
"parameters": [
{
"$ref": "#/parameters/auth_token_header"
},
{
"$ref": "#/parameters/TASK_ID"
},
{
"$ref": "#/parameters/ACCOUNT_ID"
}
],
"responses": {
"200": {
"description": "request succeeded"
}
}
}
},
"/accounts/{ACCOUNT_ID}/temporal_rules": {
"get": {
"parameters": [
Expand Down
@@ -0,0 +1,26 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"_id": "kapi.tasks.stop_req",
"description": "AMQP API for tasks.stop_req",
"properties": {
"Event-Category": {
"enum": [
"tasks"
],
"type": "string"
},
"Event-Name": {
"enum": [
"stop_req"
],
"type": "string"
},
"Task-ID": {
"type": "string"
}
},
"required": [
"Task-ID"
],
"type": "object"
}
@@ -0,0 +1,26 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"_id": "kapi.tasks.stop_resp",
"description": "AMQP API for tasks.stop_resp",
"properties": {
"Event-Category": {
"enum": [
"tasks"
],
"type": "string"
},
"Event-Name": {
"enum": [
"stop_resp"
],
"type": "string"
},
"Reply": {
"type": "string"
}
},
"required": [
"Reply"
],
"type": "object"
}
48 changes: 35 additions & 13 deletions applications/crossbar/src/modules/cb_tasks.erl
Expand Up @@ -17,7 +17,7 @@
,content_types_provided/1, content_types_provided/2, content_types_provided/3
,validate/1, validate/2, validate/3
,put/1
,patch/2
,patch/2, patch/3
,delete/2

,to_csv/1
Expand All @@ -33,6 +33,7 @@
-define(RD_RECORDS, <<"records">>).
-define(RV_FILENAME, <<"file_name">>).

-define(PATH_STOP, <<"stop">>).
-define(PATH_OUTPUT, <<"output">>).
-define(PATH_INPUT, <<"input">>).

Expand Down Expand Up @@ -70,9 +71,7 @@ init() ->
%%--------------------------------------------------------------------
-spec authenticate(cb_context:context()) -> boolean().
authenticate(Context) ->
case {cb_context:req_verb(Context)
,cb_context:req_nouns(Context)
} of
case {cb_context:req_verb(Context), cb_context:req_nouns(Context)} of
{?HTTP_GET, [{<<"tasks">>, []}]} -> 'true';
{?HTTP_PUT, [{<<"tasks">>, []}]} ->
cb_context:is_superduper_admin(Context);
Expand Down Expand Up @@ -111,6 +110,8 @@ allowed_methods() ->
[?HTTP_GET, ?HTTP_PUT].
allowed_methods(_TaskId) ->
[?HTTP_GET, ?HTTP_PATCH, ?HTTP_DELETE].
allowed_methods(_TaskId, ?PATH_STOP) ->
[?HTTP_PATCH];
allowed_methods(_TaskId, ?PATH_INPUT) ->
[?HTTP_GET];
allowed_methods(_TaskId, ?PATH_OUTPUT) ->
Expand All @@ -129,6 +130,7 @@ allowed_methods(_TaskId, ?PATH_OUTPUT) ->
-spec resource_exists(path_token(), path_token()) -> 'true'.
resource_exists() -> 'true'.
resource_exists(_TaskId) -> 'true'.
resource_exists(_TaskId, ?PATH_STOP) -> 'true';
resource_exists(_TaskId, ?PATH_INPUT) -> 'true';
resource_exists(_TaskId, ?PATH_OUTPUT) -> 'true'.

Expand Down Expand Up @@ -234,16 +236,19 @@ download_filename(_Context, Name) ->
-spec validate(cb_context:context(), path_token(), path_token()) -> cb_context:context().
validate(Context) ->
validate_tasks(Context, cb_context:req_verb(Context)).

validate(Context, PathToken) ->
validate_tasks(Context, PathToken, cb_context:req_verb(Context)).

validate(Context, TaskId, ?PATH_STOP) ->
validate_tasks(Context, TaskId, cb_context:req_verb(Context));
validate(Context, TaskId, CSV) ->
CSVFile = csv_path_to_file(CSV),
QS = cb_context:query_string(Context),
AdjustedQS = kz_json:set_values([{<<"csv_name">>, CSVFile}
,{<<"accept">>, <<"text/csv">>}
]
,QS
),
Values = [{<<"csv_name">>, CSVFile}
,{<<"accept">>, <<"text/csv">>}
],
AdjustedQS = kz_json:set_values(Values, QS),
AdjustedContext = cb_context:set_query_string(Context, AdjustedQS),
validate(AdjustedContext, TaskId).

Expand Down Expand Up @@ -352,6 +357,8 @@ task_account_id(Context) ->
%% @end
%%--------------------------------------------------------------------
-spec patch(cb_context:context(), path_token()) -> cb_context:context().
-spec patch(cb_context:context(), path_token(), path_token()) -> cb_context:context().

patch(Context, TaskId) ->
Req = [{<<"Task-ID">>, TaskId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
Expand All @@ -371,6 +378,24 @@ patch(Context, TaskId) ->
crossbar_util:response(Task, Context)
end.

patch(Context, TaskId, ?PATH_STOP) ->
Req = [{<<"Task-ID">>, TaskId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
{ok, Resp} = kz_amqp_worker:call(Req
,fun kapi_tasks:publish_stop_req/1
,fun kapi_tasks:stop_resp_v/1
),
case kapi_tasks:reply(Resp) =:= <<"not_running">> of
false -> crossbar_util:response(kapi_tasks:reply(Resp), Context);
true ->
Msg = kz_json:from_list(
[{<<"reason">>, <<"task is not running">>}
,{<<"cause">>, TaskId}
]),
cb_context:add_system_error(bad_identifier, Msg, Context)
end.

%%--------------------------------------------------------------------
%% @public
%% @doc
Expand Down Expand Up @@ -508,10 +533,7 @@ read_attachment(TaskId, Context, AccountId) ->

-spec requested_attachment_name(cb_context:context()) -> ne_binary().
requested_attachment_name(Context) ->
cb_context:req_value(Context
,<<"csv_name">>
,?KZ_TASKS_ANAME_OUT
).
cb_context:req_value(Context, <<"csv_name">>, ?KZ_TASKS_ANAME_OUT).

-spec csv_path_to_file(ne_binary()) -> ne_binary().
csv_path_to_file(?PATH_INPUT) ->
Expand Down
2 changes: 0 additions & 2 deletions applications/tasks/priv/couchdb/views/tasks.json
Expand Up @@ -20,7 +20,6 @@
" file_name: doc.pvt_file_name,",
" created: doc.pvt_created,",
" status: doc.pvt_status,",
" node: doc.pvt_worker_node,",
" start_timestamp: doc.pvt_started_at,",
" end_timestamp: doc.pvt_ended_at,",
" failure_count: doc.pvt_total_rows_failed,",
Expand Down Expand Up @@ -49,7 +48,6 @@
" file_name: doc.pvt_file_name,",
" created: doc.pvt_created,",
" status: doc.pvt_status,",
" node: doc.pvt_worker_node,",
" start_timestamp: doc.pvt_started_at,",
" end_timestamp: doc.pvt_ended_at,",
" failure_count: doc.pvt_total_rows_failed,",
Expand Down
3 changes: 1 addition & 2 deletions applications/tasks/src/kz_task_worker.erl
Expand Up @@ -26,7 +26,7 @@
-type state() :: #state{}.

-define(IN, 'csv_in').
-define(OUT(TaskId), <<"/tmp/task_out.", (TaskId)/binary, ".csv">>).
-define(OUT(TaskId), kz_tasks_scheduler:output_path(TaskId)).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -145,7 +145,6 @@ teardown(API, IterValue, #state{task_id = TaskId
_ = kz_tasks_scheduler:worker_finished(TaskId
,TotalSucceeded
,TotalFailed
,?OUT(TaskId)
,Columns
),
_ = erase(?IN),
Expand Down

0 comments on commit 7df92ec

Please sign in to comment.