/
stream_graph.clj
105 lines (92 loc) · 3.02 KB
/
stream_graph.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
;; Copyright (c) 2013-2014 Michael S. Klishin, Alex Petrov, and the ClojureWerkz Team.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns clojurewerkz.meltdown.stream-graph
(:require [clojurewerkz.meltdown.streams :as ms]))
(def create ms/create)
(def accept ms/accept)
(defmacro graph
"Creates a stream processing graph"
([channel]
channel)
([channel & downstreams]
`(let [~'upstream ~channel]
~@downstreams
~'upstream)))
(defmacro detach
"Detaches given functions from the graph, to be attached later on.
Mostly used to break down functions into smaller pieces."
[body]
`(fn [u#]
(let [~'upstream u#]
~body)))
(defmacro attach
"Attaches parts back to the graph as if they were declared as a part of the graph from the
beginning."
[detached]
`(~detached ~'upstream))
(defmacro map*
"Like clojure.core/map but for graph computations"
([f]
`(ms/map* ~f ~'upstream))
([f & downstreams]
`(let [~'upstream (ms/map* ~f ~'upstream)]
~@downstreams
~'upstream)))
(defmacro fmap*
"Like clojure.core/fmap but for graph computations"
([f] `(ms/fmap* ~f ~'upstream))
([f & downstreams]
`(let [~'upstream (ms/fmap* ~f ~'upstream)]
~@downstreams
~'upstream)))
(defmacro fold*
"Like clojure.core/fold but for graph computations"
([f] `(ms/fold* ~f ~'upstream))
([f & downstreams]
`(let [~'upstream (ms/fold* ~f ~'upstream)]
~@downstreams
~'upstream)))
(defmacro mappend*
"Like clojure.core/mappend but for graph computations"
([monoid] `(ms/mappend* ~monoid ~'upstream))
([monoid condition] `(ms/mappend* ~monoid ~condition ~'upstream))
([monoid condition & downstreams]
`(let [~'upstream (ms/mappend* ~monoid ~condition ~'upstream)]
~@downstreams
~'upstream)))
(defmacro filter*
"Like clojure.core/filter but for graph computations"
([f]
`(ms/filter* ~f ~'upstream))
([f & downstreams]
`(let [~'upstream (ms/filter* ~f ~'upstream)]
~@downstreams
~'upstream)))
(defmacro batch*
([f]
`(ms/batch* ~f ~'upstream))
([f & downstreams]
`(let [~'upstream (ms/batch* ~f ~'upstream)]
~@downstreams
~'upstream)))
(defmacro reduce*
"Like clojure.core/reduce but for graph computations"
([f default-value]
`(ms/reduce* ~f ~default-value ~'upstream))
([f default-value & downstreams]
`(let [~'upstream (ms/reduce* ~f ~default-value ~'upstream)]
~@downstreams
~'upstream)))
(defmacro consume
[f] `(ms/consume ~'upstream ~f))