-
Notifications
You must be signed in to change notification settings - Fork 5
/
outer_output.clj
95 lines (80 loc) · 3.13 KB
/
outer_output.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
(ns clojask.join.outer-output
(:require [onyx.peer.function :as function]
[onyx.plugin.protocols :as p]
[clojure.java.io :as io]
[taoensso.timbre :refer [debug info] :as timbre]
[clojure.string :as string])
(:import (java.io BufferedReader FileReader BufferedWriter FileWriter)))
(def write-func (atom nil))
(defn inject-write-func
[func]
(reset! write-func func))
(defn- inject-into-eventmap
[event lifecycle]
(let [wtr (io/writer (:buffered-wtr/filename lifecycle) :append true)]
{:clojask/wtr wtr}))
(defn- close-writer [event lifecycle]
(.close (:clojask/wtr event)))
;; Map of lifecycle calls that are required to use this plugin.
;; Users will generally always have to include these in their lifecycle calls
;; when submitting the job.
(def writer-calls
{:lifecycle/before-task-start inject-into-eventmap
:lifecycle/after-task-stop close-writer})
(defrecord ClojaskOutput [write-func]
p/Plugin
(start [this event]
;; Initialize the plugin, generally by assoc'ing any initial state.
this)
(stop [this event]
;; Nothing is required here. However, most plugins have resources
;; (e.g. a connection) to clean up.
;; Mind that such cleanup is also achievable with lifecycles.
this)
p/Checkpointed
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(checkpoint [this])
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(recover! [this replica-version checkpoint])
;; Nothing is required here. This is normally useful for checkpointing in
;; input plugins.
(checkpointed! [this epoch])
p/BarrierSynchronization
(synced? [this epoch]
;; Nothing is required here. This is commonly used to check whether all
;; async writes have finished.
true)
(completed? [this]
;; Nothing is required here. This is commonly used to check whether all
;; async writes have finished (just like synced).
true)
p/Output
(prepare-batch [this event replica messenger]
;; Nothing is required here. This is useful for some initial preparation,
;; before write-batch is called repeatedly.
true)
(write-batch [this {:keys [onyx.core/write-batch clojask/wtr]} replica messenger]
;; keys [:Departement]
;; Write the batch to your datasink.
;; In this case we are conjoining elements onto a collection.
(doseq [msg write-batch]
;; (if-let [msg (first batch)]
(do
;; (swap! example-datasink conj msg)
(if (not= (:d msg) nil)
(do
(write-func wtr (:d msg))
;; (doseq [data (:d msg)]
;; (.write wtr (str (string/join "," data) "\n")))
;; !! define argument (debug)
))))
true))
;; Builder function for your output plugin.
;; Instantiates a record.
;; It is highly recommended you inject and pre-calculate frequently used data
;; from your task-map here, in order to improve the performance of your plugin
;; Extending the function below is likely good for most use cases.
(defn output [pipeline-data]
(->ClojaskOutput (deref write-func)))