Skip to content
This repository
Browse code

Remove streams/delete-from-index and streams/update-index.

Nobody is using multiple indexes, and having 3 copies of these functions
was getting confusing. riemann.core deletes and updates a core's index,
and riemann.config proxies to it. Configuration files will be unaffected
by this change, unless they explicitly referenced
streams/delete-from-index or streams/update-index.
  • Loading branch information...
commit e14e1df6337458f31cd6b391398df03b97364690 1 parent 968768a
Kyle Kingsbury authored January 17, 2013
22  src/riemann/config.clj
@@ -20,7 +20,7 @@
20 20
         riemann.email
21 21
         [riemann.pagerduty :only [pagerduty]]
22 22
         [riemann.librato :only [librato-metrics]]
23  
-        [riemann.streams :exclude [update-index delete-from-index]])
  23
+        [riemann.streams])
24 24
   (:gen-class))
25 25
 
26 26
 (def core "The currently running core."
@@ -70,7 +70,7 @@
70 70
            (concat (:streams @next-core) things))))
71 71
 
72 72
 (defn index
73  
-  "Set the index used by this core."
  73
+  "Set the index used by this core. Returns the index."
74 74
   [& opts]
75 75
   (let [index (apply riemann.index/index opts)]
76 76
     (locking core 
@@ -81,12 +81,22 @@
81 81
   "Updates the given index with all events received. Also publishes to the
82 82
   index pubsub channel."
83 83
   [index]
84  
-  (fn [event] (core/update-index @core event)))
  84
+  (fn update [event] (core/update-index @core event)))
85 85
 
86 86
 (defn delete-from-index
87  
-  "Deletes any events that pass through from the index"
88  
-  [index]
89  
-  (fn [event] (core/delete-from-index @core event)))
  87
+  "Deletes any events that pass through from the index. By default, deletes
  88
+  events with the same host and service. If a field, or a list of fields, is
  89
+  given, deletes any events with matching values for all of those fields.
  90
+
  91
+  ; Delete all events in the index with the same host
  92
+  (delete-from-index :host event)
  93
+  
  94
+  ; Delete all events in the index with the same host and state.
  95
+  (delete-from-index [:host :state] event)"
  96
+  ([]
  97
+   (fn delete [event] (core/delete-from-index @core event)))
  98
+  ([fields]
  99
+   (fn delete [event] (core/delete-from-index @core fields event))))
90 100
 
91 101
 (defn periodically-expire
92 102
   "Sets up a reaper for this core. See core API docs."
20  src/riemann/core.clj
@@ -100,6 +100,20 @@
100 100
       (ps/publish registry "index" event))))
101 101
 
102 102
 (defn delete-from-index
103  
-  "Updates this core's index with an event."
104  
-  [core event]
105  
-  (index/delete (:index core) event))
  103
+  "Deletes similar events from the index. By default, deletes events with the
  104
+  same host and service. If a field, or a list of fields, is given, deletes any
  105
+  events with matching values for all of those fields.
  106
+  
  107
+  ; Delete all events in the index with the same host
  108
+  (delete-from-index index :host event)
  109
+  
  110
+  ; Delete all events in the index with the same host and state.
  111
+  (delete-from-index index [:host :state] event)"
  112
+  ([core event]
  113
+   (index/delete (:index core) event))
  114
+  ([core fields event]
  115
+   (let [match-fn (if (coll? fields) (apply juxt fields) fields)
  116
+         match (match-fn event)
  117
+         index (:index core)]
  118
+       (doseq [event (filter #(= match (match-fn %)) index)]
  119
+         (index/delete-exactly index event)))))
20  src/riemann/streams.clj
@@ -1151,23 +1151,3 @@
1151 1151
     `(fn [~'event]
1152 1152
        (when-let [stream# (some split-match (list ~@clauses))]
1153 1153
          (call-rescue ~'event [stream#])))))
1154  
-
1155  
-(defn delete-from-index
1156  
-  "Deletes similar events from the index. By default, deletes events with the
1157  
-  same host and service. If a field, or a list of fields, is given, deletes any
1158  
-  events with matching values for all of those fields.
1159  
-  
1160  
-  ; Delete all events in the index with the same host
1161  
-  (delete-from-index index :host)
1162  
-  
1163  
-  ; Delete all events in the index with the same host and state.
1164  
-  (delete-from-index index [:host :state])"
1165  
-  ([index]
1166  
-   (fn [event]
1167  
-     (index/delete index event)))
1168  
-  ([index fields]
1169  
-   (let [match-fn (if (coll? fields) (apply juxt fields) fields)]
1170  
-     (fn [event]
1171  
-       (let [match (match-fn event)]
1172  
-         (doseq [event (filter #(= match (match-fn %)) index)]
1173  
-           (index/delete-exactly index event)))))))
23  test/riemann/test/config.clj
@@ -71,6 +71,29 @@
71 71
            (up {:service 1 :state "ok"})
72 72
            (is (= (seq i) [{:service 1 :state "ok"}]))))
73 73
 
  74
+(deftest delete-from-index-test
  75
+         (let [i (index)
  76
+               update (update-index i)
  77
+               delete (delete-from-index)
  78
+               states [{:host 1 :state "ok"}
  79
+                       {:host 2 :state "ok"}
  80
+                       {:host 1 :state "bad"}]]
  81
+           (apply!)
  82
+           (dorun (map update states))
  83
+           (delete {:host 1 :state "definitely not seen before"})
  84
+           (is (= (seq i) [{:host 2 :state "ok"}]))))
  85
+
  86
+(deftest delete-from-index-fields
  87
+         (let [i (index)
  88
+               update (update-index i)
  89
+               delete (delete-from-index [:host :state])]
  90
+           (apply!)
  91
+           (update {:host 1 :state "foo"})
  92
+           (update {:host 2 :state "bar"})
  93
+           (delete {:host 1 :state "not seen"})
  94
+           (delete {:host 2 :state "bar"})
  95
+           (is (= (seq i) [{:host 1 :state "foo"}]))))
  96
+
74 97
 (deftest subscribe-in-stream-test
75 98
          (let [received (promise)]
76 99
            (streams
34  test/riemann/test/streams.clj
@@ -803,40 +803,6 @@
803 803
            (doseq [m metrics] (r {:metric m}))
804 804
            (is (= expect (vec (map (fn [s] (:metric s)) (deref output)))))))
805 805
 
806  
-(deftest update-test
807  
-         (let [i (index/index)
808  
-               s (update-index i)
809  
-               states [{:host 1 :state "ok"} 
810  
-                       {:host 2 :state "ok"} 
811  
-                       {:host 1 :state "bad"}]]
812  
-           (doseq [state states] (s state))
813  
-           (is (= (set i)
814  
-                  #{{:host 1 :state "bad"}
815  
-                    {:host 2 :state "ok"}}))))
816  
-
817  
-(deftest delete-from-index-test
818  
-         (let [i (index/index)
819  
-               s (update-index i)
820  
-               d (delete-from-index i)
821  
-               states [{:host 1 :state "ok"} 
822  
-                       {:host 2 :state "ok"} 
823  
-                       {:host 1 :state "bad"}]]
824  
-           (doseq [state states] (s state))
825  
-           (doseq [state states] (d state))
826  
-           (is (= (vec (seq i)) []))))
827  
-
828  
-(deftest delete-by-test
829  
-         (let [index (index/index)
830  
-               update (update-index index)
831  
-               delete (delete-from-index index :host)
832  
-               states [{:host 1 :service "a" :state "ok"}
833  
-                       {:host 2 :service "a" :state "critical"}
834  
-                       {:host 1 :service "b" :state "warning"}]]
835  
-               (dorun (map update states))
836  
-               (delete {:host 1 :service "c"})
837  
-               (is (= (vec (seq index))
838  
-                   [{:host 2 :service "a" :state "critical"}]))))
839  
-
840 806
 (deftest ewma-timeless-test
841 807
          (test-stream (ewma-timeless 0)
842 808
                       (em 1 10 20 -100 4)

0 notes on commit e14e1df

Please sign in to comment.
Something went wrong with that request. Please try again.