-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
222 lines (187 loc) · 10.8 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
(ns cljblpapiwrapper.core
(:gen-class)
(:import
(java.time LocalDate ZonedDateTime)
(java.time.format DateTimeFormatter)
(com.bloomberglp.blpapi CorrelationID Session SessionOptions Subscription SubscriptionList MessageIterator Event$EventType$Constants SessionOptions$ClientMode Event Message Element Request NotFoundException)))
;; Useful functions, not Bloomberg add-in dependent ;;
(defn- date->yyyyMMdd
"This will convert all of LocalDate, ZonedDateTime and yyyy-MM-dd into yyyyMMdd"
[date]
(condp = (type date)
LocalDate (.format date (DateTimeFormatter/ofPattern "yyyyMMdd"))
ZonedDateTime (.format date (DateTimeFormatter/ofPattern "yyyyMMdd"))
String (clojure.string/replace date #"-" "")))
(defn bdh-result->records
"This is useful for e.g. vega-lite display"
[res]
(apply concat (for [[k v] res] (mapv #(assoc % :security k) v))))
(defn bdh-result->field
[res field]
(assert (apply = (map count (vals res))) "Error, series misaligned!")
(sort-by :date
(into [] (for [[d v] (group-by :date (bdh-result->records res))]
(into {:date d} (for [r v] [(r :security) (r field)]))))))
(defn bdh-result->date
[res date]
(filter #(= (:date %) date) (bdh-result->records res)))
(defn bdh-result->date-field
[res date field]
(into {} (for [r (bdh-result->date res date)] [(r :security) (r field)])))
;; Session functions
(def default-local-host "localhost")
(def default-local-port 8194)
(defn sapi-session
"SAPI authentication
- host-ip and host-port are for the server
- uuid is the UUID of a user who's creating the request and is logged into Bloomberg desktop
- local-ip is the ip of the user"
[^String host-ip ^Long host-port ^Long uuid ^String local-ip]
(let [session-options (doto
(SessionOptions.)
(.setClientMode SessionOptions$ClientMode/SAPI)
(.setServerHost host-ip)
(.setServerPort host-port))
session (doto (Session. session-options) (.start) (.openService "//blp/apiauth"))
bbgidentity (.createIdentity session)
api-auth-svc (.getService session "//blp/apiauth")
auth-req (doto (.createAuthorizationRequest api-auth-svc) (.set "uuid" (str uuid)) (.set "ipAddress" local-ip))
corr (CorrelationID. uuid)]
(.sendAuthorizationRequest session auth-req bbgidentity corr)
(loop [s session]
(let [event (.nextEvent s)]
(if (= (.intValue (.eventType event)) Event$EventType$Constants/RESPONSE)
[session (= (subs (.toString (.next (.messageIterator event))) 0 20) "AuthorizationSuccess")]
(recur s))))))
;; Response handling ;;
(defn- handle-response-event [event]
(loop [iter (.messageIterator ^Event event)]
(let [res (.next iter)]
(if (.hasNext iter) (recur iter) res))))
(defn- handle-other-event [event] nil) ;(log/info "non-event")
(defn- read-spot-response
"Returns {sec1 {field1 value1 field2 value2} {sec2 {field1 value1 field2 value2}"
[message fields]
(let [msg (.getElement ^Message message "securityData") ;;message
fieldscoll (if (coll? fields) fields [fields])]
(into {} (for [secid (range (.numValues msg))]
(let [o (.getValueAsElement msg secid)]
[(.getValueAsString (.getElement o "security"))
(let [fieldres (.getElement o "fieldData")]
(into {} (for [f fieldscoll] [(keyword f)
(let [v (.getElement ^Element fieldres f)]
(if (zero? (.numValues v)) nil (.getValueAsString v)))])))])))))
(defn- read-historical-response
"Returns {security [{field1 value1 field2 value2 :date date-id}}"
[message fields]
(let [blparray (.getElement (.getElement message "securityData") "fieldData")
fieldscoll (if (coll? fields) fields [fields])]
{(.getValueAsString (.getElement (.getElement message "securityData") "security") 0)
(into [] (for [i (range (.numValues blparray))]
(let [x (.getValueAsElement blparray i)]
(into {:date (.getElementAsString x "date")} (for [f fieldscoll] [(keyword f) (try (.getElementAsFloat64 x f) (catch NotFoundException e nil))])))))}))
(defn- wait-for-response
"This will loop indefinitely if no more events"
[session spot-or-history fields]
(letfn [(assoc-response [acc event fields]
(let [response ((if (= spot-or-history :spot) read-spot-response read-historical-response) (handle-response-event event) fields)]
(merge acc response)))]
(loop [s session acc {}]
(let [event (.nextEvent s)]
(condp = (.intValue (.eventType event))
Event$EventType$Constants/RESPONSE (assoc-response acc event fields)
Event$EventType$Constants/PARTIAL_RESPONSE (recur s (assoc-response acc event fields)) ; (assoc-response acc event fields)
(do (handle-other-event event) (recur s acc)))))))
;; BDP definition ;;
(defn clj-bdp-session
"We either take the session as an input (SAPI) or create a
local session, which will only work locally on a computer that is connected to Bloomberg"
([securitiescoll fieldscoll override-map session-input]
(let [session (if session-input session-input (doto (Session. (doto (SessionOptions.) (.setServerHost default-local-host) (.setServerPort default-local-port))) (.start)))]
(.openService session "//blp/refdata")
(let [request-id (CorrelationID. 1)
ref-data-service (.getService session "//blp/refdata")
request (.createRequest ref-data-service "ReferenceDataRequest")]
(doseq [s securitiescoll] (.append ^Request request "securities" s))
(doseq [f fieldscoll] (.append ^Request request "fields" f))
(when override-map
(doseq [[k v] override-map]
(doto (.appendElement (.getElement request "overrides"))
(.setElement "fieldId" k)
(.setElement "value" v))))
(.sendRequest session request request-id)
session))))
(defn bdp [securities fields & {:keys [session override-map] :or {session nil override-map nil}}]
(let [securitiescoll (if (coll? securities) securities [securities])
fieldscoll (map name (if (coll? fields) fields [fields]))]
(wait-for-response (clj-bdp-session securitiescoll fieldscoll override-map session) :spot fieldscoll)))
(defn bdp-simple
"One security and one field, one override- will return a string"
[security field & {:keys [override-field override-value] :or {override-field nil override-value nil}}]
(get-in
(if (and override-field override-value)
(bdp security field :override-map {override-field override-value})
(bdp security field))
[security (keyword field)]))
;; BDH definition ;;
(defn- clj-bdh-session [securitiescoll fieldscoll start-date end-date adjustment-split periodicity session-input]
(let [session (if session-input session-input (doto (Session. (doto (SessionOptions.) (.setServerHost default-local-host) (.setServerPort default-local-port))) (.start)))]
(.openService session "//blp/refdata")
(let [request-id (CorrelationID. 1)
ref-data-service (.getService session "//blp/refdata")
request (doto
(.createRequest ref-data-service "HistoricalDataRequest")
(.set "startDate" start-date)
(.set "endDate" end-date)
(.set "adjustmentSplit" (if adjustment-split "TRUE" "FALSE"))
(.set "periodicitySelection" periodicity))]
(doseq [s securitiescoll] (.append request "securities" s))
(doseq [f fieldscoll] (.append request "fields" f))
(.sendRequest session request request-id)
session)))
(defn bdh [securities fields start-date end-date & {:keys [adjustment-split periodicity session] :or {adjustment-split false periodicity "DAILY" session nil}}]
(let [securitiescoll (if (coll? securities) securities [securities])
fieldscoll (map name (if (coll? fields) fields [fields]))]
(wait-for-response (clj-bdh-session securitiescoll fieldscoll (date->yyyyMMdd start-date) (date->yyyyMMdd end-date) adjustment-split periodicity session) :history fieldscoll)))
;Examples
;(def out1 (bdh ["AAPL US Equity" "GOOG US Equity" "FB US Equity"] ["PX_OPEN" "PX_HIGH" "PX_LOW" "PX_LAST"] "20190101" "20190120"))
;(def out2 (bdh ["AAPL US Equity" "GOOG US Equity" "FB US Equity"] ["PX_OPEN" "PX_HIGH" "PX_LOW" "PX_LAST"] "20190101" "20190120" :adjustment-split true :periodicity "WEEKLY"))
;(def out3 (bdp-simple "AAPL US Equity" "PX_LAST"))
;(def out4 (bdp-simple "US900123AL40 Corp" "YAS_BOND_YLD" :override-field "YAS_BOND_PX" :override-value 100.))
;(def out4bis (bdp ["XS1713469911 Corp"] ["BETA_ADJ_OVERRIDABLE"] :override-map {"BETA_OVERRIDE_REL_INDEX" "JBCDCOMP Index" "BETA_OVERRIDE_PERIOD" "D" "BETA_OVERRIDE_START_DT","20210101"}))
;(def out5 (bdp ["AAPL US Equity" "GOOG US Equity" "FB US Equity"] ["PX_OPEN" "PX_HIGH" "PX_LOW" "PX_LAST"]))
;(def out6 (bdh-result->field out1 :PX_OPEN))
;(def out7 (bdh-result->date out1 "2019-01-18"))
;(def out8 (bdh-result->date-field out1 "2019-01-18" :PX_OPEN))
;(def out9 (bdh-result->records out1))
;
;; Subscription ;;
(defn clj-bdp-subscribe [securities fields session-input atom-map]
(let [session (if session-input session-input (doto (Session. (doto (SessionOptions.) (.setServerHost default-local-host) (.setServerPort default-local-port))) (.start)))]
(.openService session "//blp/mktdata")
(let [subscriptions (SubscriptionList.)
securitiescoll (if (coll? securities) securities [securities])
fieldscoll (if (coll? fields) fields [fields])
fieldstring (clojure.string/join "," fieldscoll)
corrmap (into {} (map-indexed vector securitiescoll))]
(doseq [[c s] corrmap]
(.add subscriptions (Subscription. s fieldstring (CorrelationID. c))))
(Thread.
(fn []
(try
(.subscribe session subscriptions)
(while true
(let [event (.nextEvent session)]
(if (= (.intValue (.eventType event)) Event$EventType$Constants/SUBSCRIPTION_DATA)
(let [iter (.messageIterator event) msg (.next iter) s (corrmap (.object (.correlationID msg)))]
(doseq [f fieldscoll]
(when (.hasElement msg f) (swap! atom-map assoc-in [s f] (.getValueAsString (.getElement msg f)))))))))
(catch InterruptedException e
(.stop session)
(println (.getMessage e)))))))))
;Examples
;(def m (atom nil))
;(def t (clj-bdp-subscribe ["AAPL US Equity" "GOOG US Equity"] ["LAST_PRICE"] nil m))
;(.start t)
;(log/info @m)
;(.stop t)