/
core.clj
134 lines (105 loc) · 3.32 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
(ns onyx-repl.core
(:refer-clojure :exclude [map mapcat filter remove])
(:require [clj-sockets.core :refer [create-socket write-line close-socket]])
(:import [java.util UUID]))
(defn invoke-map [f segment]
{:value (f (:value segment))})
(defn invoke-mapcat [f segment]
[{:value (f (:value segment))}])
(defn invoke-filter [f segment]
(if (f (:value segment))
segment
[]))
(defn invoke-remove [f segment]
(if (f (:value segment))
[]
segment))
(defn new-task-name []
(keyword (str "task-" (UUID/randomUUID))))
(defn define-remote-named-function! [socket task-name f]
(let [f-body (rest (read-string (pr-str local-inc)))
f-def (pr-str (conj f-body (symbol (name task-name)) 'defn))]
(write-line socket f-def)))
(defn define-remote-anonymous-function! [socket task-name f]
(write-line
socket
(format "(def %s %s)"
(name task-name)
f)))
(defmacro defn-remote [& body]
`(defn ~@body))
(defn link-workflow-and-head [job task-name]
(-> job
(assoc-in [:head] task-name)
(update-in [:workflow] conj [(:head job) task-name])))
(defn map-catalog-entry [task-name]
{:onyx/name task-name
:onyx/fn ::invoke-map
:onyx/type :function
:onyx/batch-size 20
:onyx-repl/fn (keyword "user" (name task-name))})
(defn mapcat-catalog-entry [task-name]
{:onyx/name task-name
:onyx/fn ::invoke-mapcat
:onyx/type :function
:onyx/batch-size 20
:onyx-repl/fn (keyword "user" (name task-name))})
(defn filter-catalog-entry [task-name]
{:onyx/name task-name
:onyx/fn ::invoke-filter
:onyx/type :function
:onyx/batch-size 20
:onyx-repl/fn (keyword "user" (name task-name))})
(defn remove-catalog-entry [task-name]
{:onyx/name task-name
:onyx/fn ::invoke-remove
:onyx/type :function
:onyx/batch-size 20
:onyx-repl/fn (keyword "user" (name task-name))})
(defmacro repl-function [job f catalog-f]
`(let [job# ~job
task-name# (new-task-name)]
(if (fn? ~f)
(define-remote-named-function! (:socket job#) task-name# '~f)
(define-remote-anonymous-function! (:socket job#) task-name# '~f))
(-> job#
(link-workflow-and-head task-name#)
(update-in [:catalog] conj (~catalog-f task-name#)))))
(defmacro map [job f]
`(repl-function ~job ~f ~map-catalog-entry))
(defmacro mapcat [job f]
`(repl-function ~job ~f ~mapcat-catalog-entry))
(defmacro filter [job f]
`(repl-function ~job ~f ~filter-catalog-entry))
(defmacro remove [job f]
`(repl-function ~job ~f ~remove-catalog-entry))
(defn remote-onyx-connection [host port]
(create-socket "localhost" 5555))
(defn onyx-input [conn]
(let [task-name (new-task-name)]
{:head task-name
:socket conn
:workflow []
:catalog [{:onyx/name task-name
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size 20
:onyx/max-peers 1}]}))
(defn explain [job]
(select-keys
job
[:workflow :catalog :flow-conditions
:lifecycles :windows :triggers]))
(defn-remote foo [x]
(inc x))
(let [conn (remote-onyx-connection "localhost" 5555)]
(-> (onyx-input conn)
(map (fn [x] (* 2 x)))
(map local-inc)
(clojure.pprint/pprint)))
(defn-remote peers foo
(fn [x]
(inc x)))
(defn-remote hello []
:bye)