-
Notifications
You must be signed in to change notification settings - Fork 2
/
client.clj
136 lines (121 loc) · 4.5 KB
/
client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
(ns clj-faktory.client
(:require [cheshire.core :as cheshire]
[clojure.string :as string]
[clojure.tools.logging :as log]
[clj-sockets.core :as sockets])
(:import [clojure.lang IDeref]
[java.net InetAddress URI]
[java.security MessageDigest]))
(defprotocol Connectable
(connect [this])
(reconnect [this]))
(defn- bytes->hex
"Convert Byte Array to Hex String"
^String
[^"[B" data]
(let [len (alength data)
^"[B" hex-chars (byte-array (.getBytes "0123456789abcdef" "UTF-8"))
^"[B" buffer (byte-array (* 2 len))]
(loop [i 0]
(when (< i len)
(let [b (aget data i)]
(aset buffer (* 2 i) (aget hex-chars (bit-shift-right (bit-and b 0xF0) 4)))
(aset buffer (inc (* 2 i)) (aget hex-chars (bit-and b 0x0F))))
(recur (inc i))))
(String. buffer "UTF-8")))
(defn- read-and-parse-response [conn]
(let [response (sockets/read-line conn)
_ (log/debug "<<<" response)
[resp-type message] (-> response
((juxt first rest))
(update 1 (partial apply str)))]
(case resp-type
\+ (some-> message
(string/split #" ")
(second)
(cheshire/parse-string true))
\$ (when-not (= message "-1")
(cheshire/parse-string (sockets/read-line conn) true))
\- (throw (Exception. message))
(throw (ex-info "Unknown conn error" {:type ::conn-error})))))
(defn- command-str [[verb & segments]]
(->> segments
(cons (string/upper-case (name verb)))
(map #(if (map? %)
(cheshire/generate-string %)
%))
(string/join " ")))
(defn- send-command* [socket command]
(log/debug ">>>" (command-str command))
(sockets/write-line socket (command-str command))
(read-and-parse-response socket))
(defn- send-command [socket command]
(loop [retry-ms [1000 10000 30000]]
(let [[status result] (try
[:success (send-command* socket command)]
(catch Exception e
(if (and (seq retry-ms)
(= (:type (ex-data e)) ::conn-error))
[:failure]
(do (log/warn e)
(throw e)))))]
(case status
:success result
:failure (let [wait-ms (first retry-ms)]
(log/warn (str "Connection error. Retrying in " wait-ms " ms."))
(Thread/sleep wait-ms)
(recur (rest retry-ms)))))))
(defn- hash-password
[password salt iterations]
(let [digest (MessageDigest/getInstance "SHA-256")]
(loop [i iterations
bs (.getBytes (str password salt))]
(if (= i 0)
(bytes->hex bs)
(recur (dec i)
(.digest digest bs))))))
(defn fail [conn jid e]
(send-command conn [:fail {:jid jid
:message (.getMessage e)
:errtype (str (class e))
:backtrace (map #(.toString %) (.getStackTrace e))}]))
(defn ack [conn jid]
(send-command conn [:ack {:jid jid}]))
(defn fetch [conn queues]
(send-command conn (cons :fetch queues)))
(defn beat [conn wid]
(send-command conn [:beat {:wid wid}]))
(defn push [conn job]
(send-command conn [:push job]))
(defn- connect* [uri worker-info]
(let [uri (URI. uri)
host (.getHost uri)
port (.getPort uri)
conn (sockets/create-socket host port)]
(.setKeepAlive conn true)
(let [{version :v
salt :s
iterations :i} (read-and-parse-response conn)]
(if salt
(if-let [hashed-password (some-> (.getUserInfo uri)
(string/split #":")
(last)
(hash-password salt iterations))]
(send-command conn [:hello (assoc worker-info :pwdhash hashed-password)])
(throw (Exception. "Server requires password, but none has been configured")))
(send-command conn [:hello worker-info])))
conn))
(deftype Connection [uri worker-info conn-atom]
IDeref
(deref [this]
@conn-atom)
Connectable
(connect [this]
(reset! conn-atom (connect* uri worker-info))
this)
(reconnect [this]
(when-let [conn @conn-atom]
(.close conn))
(connect this)))
(defn connection [uri worker-info]
(Connection. uri worker-info (atom nil)))