-
Notifications
You must be signed in to change notification settings - Fork 5
/
clojask_aggre.clj
131 lines (114 loc) · 4.56 KB
/
clojask_aggre.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
(ns clojask.clojask-aggre
(:require [onyx.peer.function :as function]
[onyx.plugin.protocols :as p]
[clojure.java.io :as io]
[taoensso.timbre :refer [debug info] :as timbre]
[clojure.string :as string]
[clojask.api.aggregate :refer [start]]
[clojask.utils :as u])
(:import [java.io BufferedReader FileReader BufferedWriter FileWriter]
[com.clojask.exception ExecutionException]))
(def df (atom nil))
(def aggre-func (atom nil))
(def select (atom nil))
(defn inject-dataframe
[dataframe a b]
(reset! df dataframe)
(reset! aggre-func a)
(reset! select b)
)
(defn c-count
[a]
(if (coll? a)
(count a)
1))
(defn- inject-into-eventmap
[event lifecycle]
(let [wtr (io/writer (:buffered-wtr/filename lifecycle) :append true)
order (:order lifecycle)
aggre-func (.getAggreFunc (:row-info (deref df)))]
{:clojask/wtr wtr
:clojask/aggre-func aggre-func}))
(defn- close-writer [event lifecycle]
(.close (:clojask/wtr event)))
;; Map of lifecycle calls that are required to use this plugin.
;; Users will generally always have to include these in their lifecycle calls
;; when submitting the job.
(def writer-calls
{:lifecycle/before-task-start inject-into-eventmap
:lifecycle/after-task-stop close-writer})
(defrecord ClojaskOutput
[memo
aggre-func
select
output-func]
p/Plugin
(start [this event]
;; Initialize the plugin, generally by assoc'ing any initial state.
this)
(stop [this event]
;; Nothing is required here. However, most plugins have resources
;; (e.g. a connection) to clean up.
;; Mind that such cleanup is also achievable with lifecycles.
(let [data (mapv (fn [_] (if (coll? _) _ [_])) (deref memo))
wtr (:clojask/wtr event)]
;; (.write (:clojask/wtr event) (str data "\n"))
(if (apply = (map count data))
(mapv
;; #(.write (:clojask/wtr event) (str (string/join "," (u/gets % select)) "\n"))
(fn [msg] (output-func wtr [(u/gets msg select)])) (apply map vector data))
(throw (ExecutionException. "aggregation result is not of the same length"))))
this)
p/Checkpointed
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(checkpoint [this])
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(recover! [this replica-version checkpoint])
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(checkpointed! [this epoch])
p/BarrierSynchronization
(synced? [this epoch]
;; Nothing is required here. This is commonly used to check whether all
;; async writes have finished.
true)
(completed? [this]
;; Nothing is required here. This is commonly used to check whether all
;; async writes have finished (just like synced).
true)
p/Output
(prepare-batch [this event replica messenger]
;; Nothing is required here. This is useful for some initial preparation,
;; before write-batch is called repeatedly.
true)
(write-batch [this {:keys [onyx.core/write-batch clojask/wtr]} replica messenger]
;; keys [:Departement]
;; Write the batch to your datasink.
;; In this case we are conjoining elements onto a collection.
(let [write-batch write-batch]
(doseq [msg write-batch]
;; (if-let [msg (first batch)]
(do
;; (swap! example-datasink conj msg)
(if (not= (:d msg) nil)
(let [data (:d msg)]
;; (.write wtr (str (string/join "," (:d msg)) "\n"))
;; (swap! memo assoc index (func (get index (deref memo)) (:d msg)))
(vreset! memo (doall (map-indexed (fn [ind prev] ((nth (nth aggre-func ind) 0) prev (nth data (nth (nth aggre-func ind) 1)))) (deref memo))))
;; (.write wtr (str (vec (deref memo)) "\n"))
)))))
true))
;; Builder function for your output plugin.
;; Instantiates a record.
;; It is highly recommended you inject and pre-calculate frequently used data
;; from your task-map here, in order to improve the performance of your plugin
;; Extending the function below is likely good for most use cases.
(defn output [pipeline-data]
(let []
(->ClojaskOutput (volatile! (doall (take (count (deref aggre-func))
(repeat start))))
(deref aggre-func)
(deref select)
(.getOutput (deref df)))))