Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue/95/scheduler-calls-missing: Implement missing scheduler calls #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 185 additions & 125 deletions src/meson/client/impl/master/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
(defn start
""
([this]
(start this {}))
(start this {}))
([this opts]
(log/debug "Starting connection manager for the scheduler ...")
(->> opts
(into default-scheduler-opts)
(http-conn-mgr/make-reusable-conn-manager)
(assoc this :conn-mgr))))
(log/debug "Starting connection manager for the scheduler ...")
(->> opts
(into default-scheduler-opts)
(http-conn-mgr/make-reusable-conn-manager)
(assoc this :conn-mgr))))

(defn stop
""
Expand All @@ -35,187 +35,247 @@
Some API endpoints may break the 1 to 1 pattern of :type
and payload key."
[^Keyword type]
(condp = type
:request (str (util/keyword->lower type) "s")
(util/keyword->lower type)))
(condp = type
:request (str (util/keyword->lower type) "s")
(util/keyword->lower type)))

(comment
(payload-key :request) ; plural expected
(payload-key :accept) ; singular expected
(payload-key :subscribe)) ; singular expected
(payload-key :subscribe) ; singular expected
(payload-key :acknowledge_operation_status))

(defn call
([this ^Keyword type]
(call this type nil nil))
(call this type nil nil))
([this ^Keyword type payload framework-id]
(call this type payload framework-id http/json-content-type))
(call this type payload framework-id http/json-content-type))
([this ^Keyword type payload framework-id content-type]
(call this type payload framework-id content-type {}))
(call this type payload framework-id content-type {}))
([this ^Keyword type payload framework-id content-type opts]
(let [data {:type (util/keyword->upper type)
(payload-key type) payload}]
(http/post
this
scheduler-path
:body (json/write-str (merge data (or framework-id {})))
:opts (into opts {:content-type content-type
:accept content-type})))))
(let [data {:type (util/keyword->upper type)
(payload-key type) payload}]
(http/post
this
scheduler-path
:body (json/write-str (merge data (or framework-id {})))
:opts (into opts {:content-type content-type
:accept content-type})))))

(defn accept
([this payload stream-id framework-id]
(accept this payload stream-id framework-id http/json-content-type))
(accept this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:accept
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:accept
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn acknowledge
([this payload stream-id framework-id]
(acknowledge this payload stream-id framework-id http/json-content-type))
(acknowledge this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:acknowledge
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:acknowledge
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn acknowledge-operation-status
([this payload stream-id framework-id]
(acknowledge-operation-status this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:acknowledge_operation_status
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn decline
([this payload stream-id framework-id]
(decline this payload stream-id framework-id http/json-content-type))
(decline this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:decline
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:decline
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn kill-task
([this payload stream-id framework-id]
(kill-task this payload stream-id framework-id http/json-content-type))
(kill-task this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:kill
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:kill
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn message
([this payload stream-id framework-id]
(message this payload stream-id framework-id http/json-content-type))
(message this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:message
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:message
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn reconcile
([this payload stream-id framework-id]
(reconcile this payload stream-id framework-id http/json-content-type))
(reconcile this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:reconcile
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:reconcile
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn reconcile-operations
([this payload stream-id framework-id]
(reconcile-operations this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:reconcile_operations
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn request
([this payload stream-id framework-id]
(request this payload stream-id framework-id http/json-content-type))
(request this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:request ; type is request, json keyword is requests (plural)
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:request ; type is request, json keyword is requests (plural)
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn revive
([this payload stream-id framework-id]
(revive this payload stream-id framework-id http/json-content-type))
(revive this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:revive
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:revive
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn shutdown-executor
([this payload stream-id framework-id]
(shutdown-executor this payload stream-id framework-id http/json-content-type))
(shutdown-executor this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:shutdown
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:shutdown
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn subscribe
([this payload]
(subscribe this payload http/json-content-type))
(subscribe this payload http/json-content-type))
([this payload content-type]
(call
this
:subscribe
payload
nil
content-type
{:as :stream
:streaming? true
:chunked? true
:connection http/keep-alive
:connection-manager (:conn-mgr this)})))
(call
this
:subscribe
payload
nil
content-type
{:as :stream
:streaming? true
:chunked? true
:connection http/keep-alive
:connection-manager (:conn-mgr this)})))

(defn suppress
([this payload stream-id framework-id]
(suppress this payload stream-id framework-id http/json-content-type))
([this payload stream-id framework-id content-type]
(call
this
:suppress
payload
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn teardown
([this stream-id framework-id]
(teardown this stream-id framework-id http/json-content-type))
(teardown this stream-id framework-id http/json-content-type))
([this stream-id framework-id content-type]
(call
this
:teardown
nil
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))
(call
this
:teardown
nil
framework-id
content-type
{:connection-manager (:conn-mgr this)
:headers {:mesos-stream-id stream-id}})))

(defn update-framework
([this payload]
(update-framework this payload http/json-content-type))
([this payload content-type]
(call
this
:update_framework
payload
nil
content-type
{:as :stream
:streaming? true
:chunked? true
:connection http/keep-alive
:connection-manager (:conn-mgr this)})))

(def behaviour
{:accept accept
:acknowledge acknowledge
:acknowledge-operation-status acknowledge-operation-status
:decline decline
:kill-task kill-task
:message message
:reconcile reconcile
:reconcile-operations reconcile-operations
:request request
:revive revive
:subscribe subscribe
:shutdown-executor shutdown-executor
:teardown teardown})
:subscribe subscribe
:suppress suppress
:teardown teardown
:update-framework update-framework})
Loading