-
Notifications
You must be signed in to change notification settings - Fork 3
/
jobs.clj
156 lines (139 loc) · 5.35 KB
/
jobs.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
(ns mescal.agave-de-v2.jobs
(:use [clojure.java.io :only [file]]
[medley.core :only [remove-vals]])
(:require [clj-time.core :as t]
[clj-time.format :as tf]
[clojure.string :as string]
[mescal.agave-de-v2.app-listings :as app-listings]
[mescal.agave-de-v2.job-params :as params]
[mescal.agave-de-v2.constants :as c]
[mescal.util :as util]))
(def ^:private timestamp-formatter
(tf/formatter "yyyy-MM-dd-HH-mm-ss.S"))
(defn- add-param-prefix
[prefix param]
(if-not (string/blank? (str prefix))
(keyword (str prefix "_" (name param)))
param))
(defn- params-for
([config param-prefix app-section]
(params-for config param-prefix app-section identity))
([config param-prefix app-section preprocessing-fn]
(let [get-param-val (comp preprocessing-fn config (partial add-param-prefix param-prefix))]
(->> (map (comp keyword :id) app-section)
(map (juxt identity get-param-val))
(into {})
(remove-vals nil?)))))
(defn- prepare-params
[agave app param-prefix config]
{:inputs (params-for config param-prefix (app :inputs) #(.agaveUrl agave %))
:parameters (params-for config param-prefix (app :parameters) #(if (map? %) (:value %) %))})
(def ^:private submitted "Submitted")
(def ^:private running "Running")
(def ^:private failed "Failed")
(def ^:private completed "Completed")
(def ^:private job-status-translations
{"ACCEPTED" submitted
"PENDING" submitted
"STAGING_INPUTS" submitted
"CLEANING_UP" running
"ARCHIVING" running
"STAGING_JOB" submitted
"FINISHED" completed
"KILLED" failed
"FAILED" failed
"STOPPED" failed
"RUNNING" running
"PAUSED" running
"BLOCKED" running
"QUEUED" submitted
"SUBMITTING" submitted
"STAGED" submitted
"PROCESSING_INPUTS" submitted
"ARCHIVING_FINISHED" completed
"ARCHIVING_FAILED" failed})
(defn- job-notifications
[callback-url]
[{:url callback-url
:event "*"
:persistent true}])
(defn- build-job-name
[submission]
(format "%s_%04d" (:job_id submission) (:step_number submission 1)))
(defn prepare-submission
[agave app submission]
(->> (assoc (prepare-params agave app (:paramPrefix submission) (:config submission))
:name (build-job-name submission)
:appId (:app_id submission)
:archive true
:archivePath (.agaveFilePath agave (:output_dir submission))
:archiveSystem (.storageSystem agave)
:notifications (job-notifications (:callbackUrl submission)))
(remove-vals nil?)))
(defn- app-enabled?
[statuses jobs-enabled? listing]
(and jobs-enabled?
(:available listing)
(= "up" (statuses (:executionHost listing)))))
(defn- get-result-folder-id
[agave job]
(when-let [agave-path (or (:archivePath job) (get-in job [:_links :archiveData :href]))]
(.irodsFilePath agave agave-path)))
(defn format-job*
[agave app-id app-name app-description job]
{:id (str (:id job))
:app_id app-id
:app_description app-description
:app_name app-name
:description ""
:enddate (or (util/to-utc (:endTime job)) "")
:system_id c/hpc-system-id
:name (:name job)
:raw_status (:status job)
:resultfolderid (get-result-folder-id agave job)
:startdate (or (util/to-utc (:startTime job)) "")
:status (job-status-translations (:status job) "")
:wiki_url ""})
(defn format-job
([agave jobs-enabled? app-info-map {app-id :appId :as job}]
(let [app-info (app-info-map app-id {})]
(format-job* agave
app-id
(app-listings/get-app-name app-info)
(app-listings/get-app-description app-info)
job)))
([agave jobs-enabled? statuses app-info-map {app-id :appId :as job}]
(let [app-info (app-info-map app-id {})]
(assoc (format-job agave jobs-enabled? app-info-map job)
:app-disabled (not (app-enabled? statuses jobs-enabled? app-info))))))
(defn format-job-history
[job-status-updates]
(for [update job-status-updates]
{:status (:status update)
:message (:description update)
:timestamp (str (util/to-millis (:created update)))}))
(defn format-job-submisison-response
[agave submission job]
(format-job* agave
(:appId submission)
(:appName submission)
(:appDescription submission)
job))
(defn translate-job-status
[status]
(get job-status-translations status))
(defn regenerate-job-submission
[agave job]
(let [app-id (:appId job)
app (.getApp agave app-id)
job-params (:parameters (params/format-params agave job app-id app))
cfg-entry (juxt (comp keyword :param_id) (comp :value :param_value))]
{:system_id c/hpc-system-id
:app_id app-id
:name (:name job)
:debug false
:notify false
:output_dir (get-result-folder-id agave job)
:create_output_subdir true
:description ""
:config (into {} (map cfg-entry job-params))}))