From d9e44f69576062b096e8749a0ac26c54d01c2323 Mon Sep 17 00:00:00 2001 From: Bobby Calderwood Date: Thu, 24 Nov 2016 03:07:28 -0500 Subject: [PATCH] Consolidate api ns into commander ns --- dev/user.clj | 6 +- src/com/capitalone/commander.clj | 329 ++++++++++++++++- src/com/capitalone/commander/api.clj | 345 ------------------ src/com/capitalone/commander/grpc.clj | 2 +- .../commander/indexer/component/indexer.clj | 2 +- .../commander/rest/endpoint/commander.clj | 2 +- src/com/capitalone/commander/rest/system.clj | 2 +- 7 files changed, 335 insertions(+), 353 deletions(-) delete mode 100644 src/com/capitalone/commander/api.clj diff --git a/dev/user.clj b/dev/user.clj index b0146ef..0d8f861 100644 --- a/dev/user.clj +++ b/dev/user.clj @@ -22,17 +22,17 @@ [meta-merge.core :refer [meta-merge]] [reloaded.repl :refer [system init start stop go reset]] [io.pedestal.log :as log] - [com.capitalone.clojure.runtime :as runtime] + [com.capitalone.commander.util :as util] [com.capitalone.commander.rest.config :as rest-config] [com.capitalone.commander.rest.system :as rest-system] [com.capitalone.commander.indexer.config :as indexer-config] [com.capitalone.commander.indexer.system :as indexer-system] [com.capitalone.commander.database :as database] - [com.capitalone.commander.api :as api])) + [com.capitalone.commander :as commander])) (stest/instrument) -(runtime/set-default-uncaught-exception-handler! +(util/set-default-uncaught-exception-handler! (fn [thread ex] (log/error ::default-uncaught-exception-handler thread :exception ex))) diff --git a/src/com/capitalone/commander.clj b/src/com/capitalone/commander.clj index d8180bc..2708783 100644 --- a/src/com/capitalone/commander.clj +++ b/src/com/capitalone/commander.clj @@ -12,7 +12,13 @@ ;; See the License for the specific language governing permissions and limitations under the License. (ns com.capitalone.commander - (:require [clojure.spec :as s])) + (:require [clojure.spec :as s] + [clojure.core.async :as a] + [com.stuartsierra.component :as c] + [io.pedestal.log :as log] + [clj-uuid :as uuid] + [com.capitalone.commander.database :as d] + [com.capitalone.commander.event-log :as e])) (s/def ::action keyword?) (s/def ::data (s/keys)) @@ -32,3 +38,324 @@ (s/def ::parent uuid?) (s/def ::event (s/merge ::command (s/keys :req-un [::parent]))) + +(defprotocol CommandService + (-create-command [this command-params] + "Creates a command from the command-params and records to the + Log. Returns the created command") + (-create-command-sync [this command-params sync-timeout-ms] + "Creates a command from the command-params and records to the Log. + Returns the newly created command, with a :children key whose + value is a vector containing the completion event id if + successful. If ") + (-list-commands [this offset limit] + "Returns a map of :commands, :limit, :offset, and :total, + where :commands is `limit` indexed commands, starting at `offset`. + If limit is 0, returns all indexed commands starting with + offset. :total is the total count of all commands.") + (-get-command-by-id [this id] + "Returns the indexed command with the given id, or nil if none + found.") + (-commands-ch [this ch] + "Returns ch, the given core.async channel that will convey all + commands (from time of call onward).")) + +(defprotocol CommandValidator + (-validate-command-params [this command-params] + "Returns true if valid, map of errors otherwise")) + +(defprotocol EventService + (-list-events [this offset limit] + "Returns a map of :events, :limit, :offset, and :total, + where :events is `limit` indexed events, starting at `offset`. + If limit is 0, returns all indexed events starting with + offset. :total is the total count of all events.") + (-get-event-by-id [this id] + "Returns the indexed event with the given id, or nil if none + found.") + (-events-ch [this ch] + "Returns ch, the given core.async channel that will convey all + events (from time of call onward).")) + +(defn create-command + "Creates a command by recording to the Log. If sync? is false (the + default if not given), writes to the Log and returns immediately. + If sync? is true, writes to the Log and waits for the command's + corresponding completion event to arrive on the Log before + returning. Returns the newly created command in either case." + ([api command-params] + (create-command api command-params false)) + ([api command-params sync?] + (log/info ::create-command [api command-params sync?]) + (if sync? + (-create-command-sync api command-params (:sync-timeout-ms api)) + (-create-command api command-params)))) + +(s/def ::CommandService (partial satisfies? CommandService)) + +(s/fdef create-command + :args (s/cat :api ::CommandService + :command-params ::command-params + :sync? (s/? (s/nilable boolean?))) + :ret ::command + :fn (s/and #(= (-> % :ret :action) (-> % :args :command-params :action)) + #(= (-> % :ret :data) (-> % :args :command-params :data)))) + +(defn list-commands + "Returns a map of :commands, :limit, :offset, and :total, + where :commands is `limit` indexed commands, starting at `offset`. + If limit is 0, returns all indexed commands starting with + offset. :total is the total count of all commands." + ([api] (list-commands api 0)) + ([api offset] (list-commands api offset 0)) + ([api offset limit] + (log/info ::list-commands [api offset limit]) + (-list-commands api (or offset 0) (or limit 0)))) + +(s/def ::commands (s/every ::command)) +(s/def ::total (s/int-in 0 Long/MAX_VALUE)) + +(s/fdef list-commands + :args (s/cat :api ::CommandService + :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) + :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) + :ret (s/keys :req-un [::commands ::limit ::offset ::total]) + :fn #(let [limit (-> % :args :limit)] + (if (pos? limit) + (= (-> % :ret count) limit) + true))) + +(defn get-command-by-id + "Returns the indexed command with the given id, or nil if none + found." + [api id] + (log/info ::get-command-by-id [api id]) + (-get-command-by-id api id)) + +(s/fdef get-command-by-id + :args (s/cat :api ::CommandService + :id ::id) + :ret ::command) + +(defn command-map + [{:keys [key value topic partition offset timestamp] :as command}] + (log/debug ::command-map [command]) + (let [{:keys [action data]} value] + {:id key + :action action + :data data + :timestamp timestamp + :topic topic + :partition partition + :offset offset})) + +(defn commands-ch + "Returns a core.async channel (ch if given, a sliding-buffer channel + of size 10 otherwise) that will convey all commands arriving from + the time of the call onward." + ([api] + (commands-ch api (a/chan (a/sliding-buffer 10)))) + ([api ch] + (log/info ::commands-ch [api ch]) + (-commands-ch api ch) + ch)) + +(defn validate-command-params + "Returns true if valid, a map of errors otherwise." + [api command-params] + (-validate-command-params api command-params)) + +(s/fdef validate-command-params + :args (s/cat :api ::CommandService + :command-params ::command-params) + :ret (s/or :valid true? + :invalid (s/keys))) + +(s/def ::EventService (partial satisfies? EventService)) + +(defn list-events + "Returns a map of :events, :limit, :offset, and :total, + where :events is `limit` indexed events, starting at `offset`. + If limit is 0, returns all indexed events starting with + offset. :total is the total count of all events." + ([api] (list-events api 0)) + ([api offset] (list-events api offset 0)) + ([api offset limit] + (log/info ::list-events [api offset limit]) + (-list-events api (or offset 0) (or limit 0)))) + +(s/def ::events (s/every ::event)) +(s/fdef list-events + :args (s/cat :api ::EventService + :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) + :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) + :ret (s/keys :req-un [::events ::limit ::offset ::total]) + :fn #(let [limit (-> % :args :limit)] + (if (pos? limit) + (= (-> % :ret count) limit) + true))) + +(defn get-event-by-id + "Returns the indexed event with the given id, or nil if none + found." + [api id] + (log/info ::get-event-by-id [api id]) + (-get-event-by-id api id)) + +(s/fdef get-event-by-id + :args (s/cat :api ::EventService + :id ::id) + :ret ::event) + +(defn event-map + [{:keys [key value topic partition offset timestamp] :as event}] + (log/debug ::event-map [event]) + (let [{:keys [action data parent]} value] + {:id key + :parent parent + :action action + :data data + :timestamp timestamp + :topic topic + :partition partition + :offset offset})) + +(defn events-ch + "Returns a core.async channel (ch if given, a sliding-buffer channel + of size 10 otherwise) that will convey all events arriving from + the time of the call onward." + ([api] + (events-ch api (a/chan (a/sliding-buffer 10)))) + ([api ch] + (log/info ::events-ch [api ch]) + (-events-ch api ch) + ch)) + +(defn- command-record + [topic id command] + {:topic topic + :key id + :value command}) + +(defn- send-command-and-await-result! + [kafka-producer command-topic id command] + (let [record (command-record command-topic id command) + ch (e/send! kafka-producer record)] + (if-some [ret (a/Commander {:commands-topic commands-topic + :events-topic events-topic + :sync-timeout-ms sync-timeout-ms})) diff --git a/src/com/capitalone/commander/api.clj b/src/com/capitalone/commander/api.clj deleted file mode 100644 index a762d48..0000000 --- a/src/com/capitalone/commander/api.clj +++ /dev/null @@ -1,345 +0,0 @@ -;; Copyright 2016 Capital One Services, LLC - -;; Licensed under the Apache License, Version 2.0 (the "License"); -;; you may not use this file except in compliance with the License. -;; You may obtain a copy of the License at - -;; http://www.apache.org/licenses/LICENSE-2.0 - -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and limitations under the License. - -(ns com.capitalone.commander.api - (:require [clojure.spec :as s] - [clojure.core.async :as a] - [com.stuartsierra.component :as c] - [io.pedestal.log :as log] - [clj-uuid :as uuid] - [com.capitalone.commander :as commander] - [com.capitalone.commander.database :as d] - [com.capitalone.commander.event-log :as e])) - -(set! *warn-on-reflection* true) - -(defprotocol CommandService - (-create-command [this command-params] - "Creates a command from the command-params and records to the - Log. Returns the created command") - (-create-command-sync [this command-params sync-timeout-ms] - "Creates a command from the command-params and records to the Log. - Returns the newly created command, with a :children key whose - value is a vector containing the completion event id if - successful. If ") - (-list-commands [this offset limit] - "Returns a map of :commands, :limit, :offset, and :total, - where :commands is `limit` indexed commands, starting at `offset`. - If limit is 0, returns all indexed commands starting with - offset. :total is the total count of all commands.") - (-get-command-by-id [this id] - "Returns the indexed command with the given id, or nil if none - found.") - (-commands-ch [this ch] - "Returns ch, the given core.async channel that will convey all - commands (from time of call onward).")) - -(defprotocol CommandValidator - (-validate-command-params [this command-params] - "Returns true if valid, map of errors otherwise")) - -(defprotocol EventService - (-list-events [this offset limit] - "Returns a map of :events, :limit, :offset, and :total, - where :events is `limit` indexed events, starting at `offset`. - If limit is 0, returns all indexed events starting with - offset. :total is the total count of all events.") - (-get-event-by-id [this id] - "Returns the indexed event with the given id, or nil if none - found.") - (-events-ch [this ch] - "Returns ch, the given core.async channel that will convey all - events (from time of call onward).")) - -(defn create-command - "Creates a command by recording to the Log. If sync? is false (the - default if not given), writes to the Log and returns immediately. - If sync? is true, writes to the Log and waits for the command's - corresponding completion event to arrive on the Log before - returning. Returns the newly created command in either case." - ([api command-params] - (create-command api command-params false)) - ([api command-params sync?] - (log/info ::create-command [api command-params sync?]) - (if sync? - (-create-command-sync api command-params (:sync-timeout-ms api)) - (-create-command api command-params)))) - -(s/def ::CommandService (partial satisfies? CommandService)) - -(s/fdef create-command - :args (s/cat :api ::CommandService - :command-params ::commander/command-params - :sync? (s/? (s/nilable boolean?))) - :ret ::commander/command - :fn (s/and #(= (-> % :ret :action) (-> % :args :command-params :action)) - #(= (-> % :ret :data) (-> % :args :command-params :data)))) - -(defn list-commands - "Returns a map of :commands, :limit, :offset, and :total, - where :commands is `limit` indexed commands, starting at `offset`. - If limit is 0, returns all indexed commands starting with - offset. :total is the total count of all commands." - ([api] (list-commands api 0)) - ([api offset] (list-commands api offset 0)) - ([api offset limit] - (log/info ::list-commands [api offset limit]) - (-list-commands api (or offset 0) (or limit 0)))) - -(s/def ::commands (s/every ::commander/command)) -(s/def ::total (s/int-in 0 Long/MAX_VALUE)) - -(s/fdef list-commands - :args (s/cat :api ::CommandService - :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) - :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) - :ret (s/keys :req-un [::commands ::commander/limit ::commander/offset ::total]) - :fn #(let [limit (-> % :args :limit)] - (if (pos? limit) - (= (-> % :ret count) limit) - true))) - -(defn get-command-by-id - "Returns the indexed command with the given id, or nil if none - found." - [api id] - (log/info ::get-command-by-id [api id]) - (-get-command-by-id api id)) - -(s/fdef get-command-by-id - :args (s/cat :api ::CommandService - :id ::commander/id) - :ret ::commander/command) - -(defn command-map - [{:keys [key value topic partition offset timestamp] :as command}] - (log/debug ::command-map [command]) - (let [{:keys [action data]} value] - {:id key - :action action - :data data - :timestamp timestamp - :topic topic - :partition partition - :offset offset})) - -(defn commands-ch - "Returns a core.async channel (ch if given, a sliding-buffer channel - of size 10 otherwise) that will convey all commands arriving from - the time of the call onward." - ([api] - (commands-ch api (a/chan (a/sliding-buffer 10)))) - ([api ch] - (log/info ::commands-ch [api ch]) - (-commands-ch api ch) - ch)) - -(defn validate-command-params - "Returns true if valid, a map of errors otherwise." - [api command-params] - (-validate-command-params api command-params)) - -(s/fdef validate-command-params - :args (s/cat :api ::CommandService - :command-params ::commander/command-params) - :ret (s/or :valid true? - :invalid (s/keys))) - -(s/def ::EventService (partial satisfies? EventService)) - -(defn list-events - "Returns a map of :events, :limit, :offset, and :total, - where :events is `limit` indexed events, starting at `offset`. - If limit is 0, returns all indexed events starting with - offset. :total is the total count of all events." - ([api] (list-events api 0)) - ([api offset] (list-events api offset 0)) - ([api offset limit] - (log/info ::list-events [api offset limit]) - (-list-events api (or offset 0) (or limit 0)))) - -(s/def ::events (s/every ::commander/event)) -(s/fdef list-events - :args (s/cat :api ::EventService - :offset (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE))) - :limit (s/? (s/nilable (s/int-in 0 Long/MAX_VALUE)))) - :ret (s/keys :req-un [::events ::commander/limit ::commander/offset ::total]) - :fn #(let [limit (-> % :args :limit)] - (if (pos? limit) - (= (-> % :ret count) limit) - true))) - -(defn get-event-by-id - "Returns the indexed event with the given id, or nil if none - found." - [api id] - (log/info ::get-event-by-id [api id]) - (-get-event-by-id api id)) - -(s/fdef get-event-by-id - :args (s/cat :api ::EventService - :id ::commander/id) - :ret ::commander/event) - -(defn event-map - [{:keys [key value topic partition offset timestamp] :as event}] - (log/debug ::event-map [event]) - (let [{:keys [action data parent]} value] - {:id key - :parent parent - :action action - :data data - :timestamp timestamp - :topic topic - :partition partition - :offset offset})) - -(defn events-ch - "Returns a core.async channel (ch if given, a sliding-buffer channel - of size 10 otherwise) that will convey all events arriving from - the time of the call onward." - ([api] - (events-ch api (a/chan (a/sliding-buffer 10)))) - ([api ch] - (log/info ::events-ch [api ch]) - (-events-ch api ch) - ch)) - -(defn- command-record - [topic id command] - {:topic topic - :key id - :value command}) - -(defn- send-command-and-await-result! - [kafka-producer command-topic id command] - (let [record (command-record command-topic id command) - ch (e/send! kafka-producer record)] - (if-some [ret (a/Commander {:commands-topic commands-topic - :events-topic events-topic - :sync-timeout-ms sync-timeout-ms})) diff --git a/src/com/capitalone/commander/grpc.clj b/src/com/capitalone/commander/grpc.clj index dc4e8df..d531e0b 100644 --- a/src/com/capitalone/commander/grpc.clj +++ b/src/com/capitalone/commander/grpc.clj @@ -15,7 +15,7 @@ [io.pedestal.log :as log] [com.stuartsierra.component :as c] [com.capitalone.commander.util :as util] - [com.capitalone.commander.api :as api]) + [com.capitalone.commander :as api]) (:import java.util.UUID [com.capitalone.commander.grpc CommanderProtos diff --git a/src/com/capitalone/commander/indexer/component/indexer.clj b/src/com/capitalone/commander/indexer/component/indexer.clj index 60f3097..0eed6f0 100644 --- a/src/com/capitalone/commander/indexer/component/indexer.clj +++ b/src/com/capitalone/commander/indexer/component/indexer.clj @@ -16,7 +16,7 @@ [com.stuartsierra.component :as component] [io.pedestal.log :as log] [clojure.java.jdbc :as j] - [com.capitalone.commander.api :as api] + [com.capitalone.commander :as api] [com.capitalone.commander.database :as d] [com.capitalone.commander.event-log.kafka :as k]) (:import [org.apache.kafka.clients.consumer Consumer ConsumerRebalanceListener] diff --git a/src/com/capitalone/commander/rest/endpoint/commander.clj b/src/com/capitalone/commander/rest/endpoint/commander.clj index 9e4a5c0..8f46e85 100644 --- a/src/com/capitalone/commander/rest/endpoint/commander.clj +++ b/src/com/capitalone/commander/rest/endpoint/commander.clj @@ -28,7 +28,7 @@ [core :as papi] [helpers :refer [defhandler]]] [ring.util.response :as ring-resp] - [com.capitalone.commander.api :as api] + [com.capitalone.commander :as api] [com.capitalone.commander.rest.hiccup :as h])) (set! *warn-on-reflection* true) diff --git a/src/com/capitalone/commander/rest/system.clj b/src/com/capitalone/commander/rest/system.clj index 52d85f2..1b8b709 100644 --- a/src/com/capitalone/commander/rest/system.clj +++ b/src/com/capitalone/commander/rest/system.clj @@ -21,7 +21,7 @@ [com.capitalone.commander.grpc :refer [construct-grpc-server]] [com.capitalone.commander.database :refer [construct-jdbc-db]] [com.capitalone.commander.event-log.kafka :refer [construct-producer construct-consumer]] - [com.capitalone.commander.api :refer [construct-commander-api]])) + [com.capitalone.commander :refer [construct-commander-api]])) (set! *warn-on-reflection* true)