Skip to content

Commit

Permalink
#41: Added endpoint to list all pipelines (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
amexboy authored and lispyclouds committed Oct 20, 2019
1 parent 09319ad commit 426bc2c
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 36 deletions.
32 changes: 32 additions & 0 deletions integration-tests/bob-tests.strest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,38 @@ requests:
- jsonpath: "content.message"
expect: "failed"

getPipelines:
request:
url: "http://bob:7777/api/pipelines"
method: "GET"
validate:
- jsonpath: "status"
expect: 200
- jsonpath: "content[0].name"
expect: "dev:test"
- jsonpath: "content[1].name"
expect: "dev:this-fails"

getPipelinesByStatus:
request:
url: "http://bob:7777/api/pipelines?status=failed"
method: "GET"
validate:
- jsonpath: "status"
expect: 200
- jsonpath: "content[0].name"
expect: "dev:this-fails"

getPipelinesByGroupAndName:
request:
url: "http://bob:7777/api/pipelines?group=dev&name=test"
method: "GET"
validate:
- jsonpath: "status"
expect: 200
- jsonpath: "content[0].name"
expect: "dev:test"

pipelineArtifacts:
request:
url: "http://bob:7777/api/pipelines/groups/dev/names/test/number/1/artifacts/store/local/name/afile"
Expand Down
14 changes: 14 additions & 0 deletions resources/sql/pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,17 @@ DO UPDATE SET "content" = "logs"."content" || E'\n' || EXCLUDED.content;
-- :name logs-of :query :1
SELECT "content" FROM "logs"
WHERE "run"=:run-id;

-- :name get-pipelines :query :many
SELECT p.name, p.image
FROM "pipelines" p
WHERE 1 = 1
--~ (when (not-empty (:pipeline params)) "AND p.name = :pipeline")
/*~ (when (not-empty (:status params)) "
AND EXISTS (SELECT r.status
FROM runs r
WHERE r.pipeline = p.name
AND :status = r.status
LIMIT 1)")
~*/

5 changes: 5 additions & 0 deletions resources/sql/resource.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ VALUES :tuple*:params;
-- :name invalid-external-resources :query :many
SELECT * FROM "resource_providers"
WHERE "name"=:name AND "url" IS NOT NULL;

-- :name resources-by-pipeline :query :many
SELECT * FROM "resources"
WHERE "pipeline"=:pipeline;

9 changes: 9 additions & 0 deletions src/bob/api/routes.clj
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@
:summary "Deletes a pipeline in a group with the specified name."
(p/remove-pipeline group name))

(rest/GET "/pipelines" []
:return schema/PipelinesResponse
:query-params [{group :- String nil}
{name :- String nil}
{status :- String nil}]
:summary "Returns all defined Pipelines. Search params are case sensitive :-)"
(p/get-pipelines group name status))


(rest/POST "/external-resources/:name" []
:return schema/SimpleResponse
:path-params [name
Expand Down
4 changes: 4 additions & 0 deletions src/bob/api/schemas.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
(s/optional-key :vars) {Keyword String}
(s/optional-key :resources) [Resource]})

(s/defschema PipelinesResponse (s/either {:message String}
[{:name String
:data Pipeline}]))

(s/defschema LogsResponse (s/either {:message [String]}
SimpleResponse))

Expand Down
120 changes: 84 additions & 36 deletions src/bob/pipeline/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
; along with Bob. If not, see <http://www.gnu.org/licenses/>.

(ns bob.pipeline.core
(:require [ring.util.response :as res]
(:require [ring.util.http-response :as res]
[manifold.deferred :as d]
[failjure.core :as f]
[taoensso.timbre :as log]
Expand Down Expand Up @@ -55,9 +55,9 @@
_ (doseq [resource resources]
(let [{:keys [name params type provider]} resource]
(log/debugf "Inserting resource:
name: %s
type: %s
provider: %s"
name: %s
type: %s
provider: %s"
name
type
provider)
Expand All @@ -77,9 +77,9 @@
artifact-path :path
artifact-store :store} :produces_artifact} step]
(log/debugf "Inserting step:
cmd: %s
needs resource: %s
produces artifact: %s"
cmd: %s
needs resource: %s
produces artifact: %s"
cmd
needs-resource
(:produces_artifact step))
Expand All @@ -90,14 +90,14 @@
:artifact_path artifact-path
:artifact_store artifact-store
:pipeline pipeline})))]
(u/respond "Ok")
(f/when-failed [err]
(log/errorf "Pipeline creation failed: %s." (f/message err))
;; TODO: See if this can be done in a txn instead
(when-not (clojure.string/includes? (f/message err) "duplicate key")
(f/try* (db/delete-pipeline states/db {:name pipeline})))
(res/bad-request {:message "Pipeline creation error: Check params or if its already created"})))]
result))
(u/respond "Ok")
(f/when-failed [err]
(log/errorf "Pipeline creation failed: %s." (f/message err))
;; TODO: See if this can be done in a txn instead
(when-not (clojure.string/includes? (f/message err) "duplicate key")
(f/try* (db/delete-pipeline states/db {:name pipeline})))
(res/bad-request {:message "Pipeline creation error: Check params or if its already created"})))]
result))

(defn start
"Asynchronously starts a pipeline in a group by name.
Expand All @@ -110,27 +110,27 @@
vars (->> (db/evars-by-pipeline states/db
{:pipeline pipeline})
(map #(hash-map
(keyword (:key %)) (:value %)))
(keyword (:key %)) (:value %)))
(into {}))]
(do (log/infof "Starting pipeline %s" pipeline)
(p/exec-steps image steps pipeline vars)
(u/respond "Ok"))
(f/when-failed [err]
(log/errorf "Error starting pipeline: %s" (f/message err))
(res/bad-request
{:message (f/message err)})))]
result))
(do (log/infof "Starting pipeline %s" pipeline)
(p/exec-steps image steps pipeline vars)
(u/respond "Ok"))
(f/when-failed [err]
(log/errorf "Error starting pipeline: %s" (f/message err))
(res/bad-request
{:message (f/message err)})))]
result))

(defn stop
"Stops a running pipeline with SIGKILL.
Returns Ok or any stopping errors."
[group name number]
(d/let-flow [pipeline (u/name-of group name)
result (p/stop-pipeline pipeline number)]
(if (nil? result)
(do (log/warn "Attempt to stop an invalid pipeline")
(res/not-found {:message "Pipeline not running"}))
(u/respond result))))
(if (nil? result)
(do (log/warn "Attempt to stop an invalid pipeline")
(res/not-found {:message "Pipeline not running"}))
(u/respond result))))

(defn status
"Fetches the status of a particular run of a pipeline.
Expand All @@ -142,10 +142,10 @@
:number number})
(:status)
(keyword)))]
(if (nil? status)
(do (log/warn "Attempt to fetch status for an invalid pipeline")
(res/not-found {:message "No such pipeline"}))
(u/respond status))))
(if (nil? status)
(do (log/warn "Attempt to fetch status for an invalid pipeline")
(res/not-found {:message "No such pipeline"}))
(u/respond status))))

(defn remove-pipeline
"Removes a pipeline.
Expand All @@ -155,7 +155,7 @@
_ (log/debugf "Deleting pipeline %s" pipeline)
_ (f/try* (db/delete-pipeline states/db
{:name pipeline}))]
(u/respond "Ok")))
(u/respond "Ok")))

(defn logs-of
"Handler to fetch logs for a particular run of a pipeline.
Expand All @@ -164,9 +164,57 @@
[group name number offset lines]
(d/let-flow [pipeline (u/name-of group name)
result (p/pipeline-logs pipeline number offset lines)]
(if (f/failed? result)
(res/bad-request {:message (f/message result)})
(u/respond result))))
(if (f/failed? result)
(res/bad-request {:message (f/message result)})
(u/respond result))))

(defn make-step
"Convertes step from the database to conform with the schema"
[{:keys [cmd needs_resource produces_artifact artifact_path artifact_store]}]
(merge {:cmd cmd}
(when (some? needs_resource) {:needs_resource needs_resource})
(when (some? produces_artifact)
{:produces_artifact
{:name produces_artifact
:path artifact_path
:store artifact_store}})))

(defn make-resource
"Convert and enrich resource from the databse to comform with schema"
[{:keys [name type provider pipeline]}]
{:name name
:type type
:provider provider
:params (ri/get-resource-params pipeline name)})

(defn get-pipelines
"Handler to fetch list of defined piplies"
[group name status]
(log/debugf "Fetching list of defined pipelines for group=%s name=%s status=%s" group name status)
(d/let-flow [pipeline-query (if (every? not-empty [group name])
(u/name-of group name)
(some not-empty [group name]))
query-params {:pipeline pipeline-query :status status}
result (f/try-all [pipelines (db/get-pipelines states/db query-params)
_ (log/debugf "Found pipelines %s" (vec pipelines))
result (mapv (fn [{:keys [name image]}]
(f/try-all [filter {:pipeline name}
steps (mapv make-step (db/ordered-steps states/db filter))
resources (mapv make-resource
(rdb/resources-by-pipeline states/db filter))]
{:name name
:data {:image image
:steps steps
:resources resources}}))
pipelines)]
(do
(log/debugf "Fetched pipelines: %s" result)
(res/ok result))
(f/when-failed [err]
(let [error (format "Failed to fetch pipelines %s " (f/message err))]
(log/warn error err)
(u/respond error))))]
result))

(comment
(create "test"
Expand Down
12 changes: 12 additions & 0 deletions src/bob/resource/internals.clj
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@
(some? (db/invalid-external-resources states/db
{:name (:provider resource)})))

(defn get-resource-params
"Fetches list of parameters associated with the resource"
[pipeline name]
(reduce
(fn
[r {:keys [key value]}]
(assoc r (keyword key) value))
{}
(db/resource-params-of states/db
{:name name
:pipeline pipeline})))

(comment
(def resource {:name "my-source"
:type "external"
Expand Down
85 changes: 85 additions & 0 deletions test/bob/pipeline/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,88 @@
(-> @(logs-of "dev" "test" 1 0 100)
:body
:message))))))

(deftest get-pipeleines
(testing "Filter pipelines"
(with-redefs-fn {#'db/get-pipelines (fn [_ filter]
(tu/check-and-fail
#(= {:pipeline nil
:status nil}
filter))
[{:name "test:Test" :image "test 1.7"}])
#'db/ordered-steps (fn [_ filter]
(tu/check-and-fail
#(= {:pipeline "test:Test"}
filter))
[{
:id 1
:cmd "echo hello"
:pipeline "dev:test"
:needs_resource nil
:produces_artifact "afile"
:artifact_path "test.txt"
:artifact_store "local"}
{:cmd "mkdir"}])
#'rdb/resources-by-pipeline (fn [_ filter]
(tu/check-and-fail
#(= {:pipeline "test:Test"}
filter))
[{:id 1
:provider "git"
:name "src"
:pipeline "test:Test"}])
#'rdb/resource-params-of (fn [_ filter]
(tu/check-and-fail
#(= {:name "src"
:pipeline "test:Test"}
filter))
[{
:name "git"
:key "env"
:value "dev"
:pipeline "test:Test" }])}
#(is (= [{:name "test:Test",
:data
{:image "test 1.7",
:steps
[{:cmd "echo hello",
:produces_artifact
{:name "afile", :path "test.txt", :store "local"}}
{:cmd "mkdir"}],
:resources [{:name "src"
:type nil
:provider "git"
:params {:env "dev"}}]}}]
(-> @(get-pipelines nil nil nil)
:body)))))

(testing "Empty result returns empty "
(with-redefs-fn {#'db/get-pipelines (fn [_ filter]
nil)}
#(is (= []
(-> @(get-pipelines "dev" "test" nil)
:body)))))

(testing "Test filters created correctly"
(with-redefs-fn {#'db/get-pipelines (fn [_ filter-map]
(tu/check-and-fail
#(= {:pipeline "dev:test"
:status nil}
filter-map))
nil)}
#(get-pipelines "dev" "test" nil))
(with-redefs-fn {#'db/get-pipelines (fn [_ filter]
(tu/check-and-fail
#(= {:name "dev:test"
:status "pass"}
filter))
nil)}
#(get-pipelines "dev" "test" "pass"))
(with-redefs-fn {#'db/get-pipelines (fn [_ filter]
(tu/check-and-fail
#(= {:name "test"
:status "pass"}
filter))
nil)}
#(get-pipelines nil "test" "pass"))))

0 comments on commit 426bc2c

Please sign in to comment.