From dc0c450722a1218244b3a520472ef64d9bbb6f32 Mon Sep 17 00:00:00 2001 From: Brian Guarraci Date: Thu, 14 Dec 2023 15:01:14 -0700 Subject: [PATCH] add alpaca feed/feed_consumer stock examples --- examples/{ => stocks}/alpaca/update_data.kg | 0 examples/stocks/alpaca/ws/feed.kg | 76 +++++++++++++++++++ examples/stocks/alpaca/ws/feed_consumer.kg | 75 ++++++++++++++++++ .../{alpaca => stocks/alpaca/ws}/stream.kg | 12 ++- examples/{ => stocks}/polygon/update_data.kg | 0 examples/{ => stocks/polygon}/ws/feed.kg | 0 .../{ => stocks/polygon}/ws/feed_consumer.kg | 0 .../polygon/ws/stream.kg} | 6 ++ klongpy/sys_fn.py | 21 ++++- scripts/kgpy | 19 ++--- 10 files changed, 193 insertions(+), 16 deletions(-) rename examples/{ => stocks}/alpaca/update_data.kg (100%) create mode 100644 examples/stocks/alpaca/ws/feed.kg create mode 100644 examples/stocks/alpaca/ws/feed_consumer.kg rename examples/{alpaca => stocks/alpaca/ws}/stream.kg (76%) rename examples/{ => stocks}/polygon/update_data.kg (100%) rename examples/{ => stocks/polygon}/ws/feed.kg (100%) rename examples/{ => stocks/polygon}/ws/feed_consumer.kg (100%) rename examples/{ws/polygon.kg => stocks/polygon/ws/stream.kg} (83%) diff --git a/examples/alpaca/update_data.kg b/examples/stocks/alpaca/update_data.kg similarity index 100% rename from examples/alpaca/update_data.kg rename to examples/stocks/alpaca/update_data.kg diff --git a/examples/stocks/alpaca/ws/feed.kg b/examples/stocks/alpaca/ws/feed.kg new file mode 100644 index 0000000..679871f --- /dev/null +++ b/examples/stocks/alpaca/ws/feed.kg @@ -0,0 +1,76 @@ +.py("klongpy.ws") +.pyf("iso8601";"parse_date") + +.comment("****") + +feed type: +t trades (includes corrections and cancelErrors) +q quotes +b bars + +. + +e.g. + +b.MSFT minute bar for MSFT + +or + +b.* for all minute bars + +**** + +auth:::{["action" "auth"]} +auth,"key",,.os.env?"ALPACA_API_KEY" +auth,"secret",,.os.env?"ALPACA_SECRET_KEY" + +:" subscription handler " +mkbars::{{2_x}'x@&{#x?"b."}'x} +mktrades::{[]} +mkquotes::{[]} +psmsg::{[d];d:::{["action" "subscribe"]};d,"bars",,mkbars(x);d,"trades",,mktrades(x);d,"quotes",,mkquotes(x)} +unionValues::{[q];q:::{};{{q,x,1}'x@1}'x;{x@0}'q} +updSubscription::{[v q];.d("subscribing: ");v::unionValues(x);v::psmsg(v);.p(v);wsc(v)} + +:" Map of clients handles to their subscribed tickers " +clients:::{} +rmclient::{x_clients} + +:" Handle server auth request " +sendAuth::{.p("sending auth");.p(auth);x(auth)} +handleUnknown::{.d("unhandled");.p(y)} +handleLogin::{.p("logged in")} +handleControl::{[msg];msg::y?"msg";:[msg="connected";sendAuth(x);:[msg="authenticated";handleLogin(x;y);handleAuthFailure(x;y)]]} +handleSubscribed::{.p(y)} + +handlers:::{} +handlers,"success",handleControl +handlers,"subscription",handleSubscribed +handleStatus::{[h];.p(y);h::handlers?(y?"T");:[h;h(x;y);handleUnknown(x;y)]} + +:" convert the RFC-3339 formatted timestamp " +mkts::{[d ts];d::.pyc("parse_date";,x;:{});.pyc(d,,"timestamp";[];:{})} + +:" send the message to all subscribed clients " +send::{.d("sending ");.d(k);.d(" to client ");.p(y);y(:update,,z)} +match::{[ev k subscription];ev::x;k::y;subscription::z;all::ev,".*";:[subscription?all;1;:[subscription?k;1;0]]} +broadcast::{[ev k data];ev::x;k::y;data::z;{:[match(ev;k;x@1);send(k;x@0;data);0]}'clients} +handleData::{[ev sym k];.d("data: ");.p(y);ev::(y?"T");sym::(y?"S");y,"t",mkts(y?"t");k::ev,".",sym;broadcast(ev;k;y)} + +handleMsg::{:[(y?"T")="b";handleData(x;y);handleStatus(x;y)]} + +.ws.m::{[c];c::x;{handleMsg(c;x)}'y} + +wsuri:::[(#.os.argv);.os.argv@0;"wss://stream.data.alpaca.markets/v2/sip"] +.d("connecting to ");.p(wsuri) +wsc::.ws(wsuri) + +:" Called by clients to subscribe to ticker updates " +updClientSub::{clients,x,,(clients?x),,y;.d("clients: ");.p(clients)} +subscribe::{.d("subscribing client: ");.p(.cli.h,,x);updClientSub(.cli.h;x);updSubscription(clients);x} + +:" Setup the IPC server and callbacks " +.srv(8888) +.srv.o::{.d("client connected: ");.p(x);clients,x,,[]} +.srv.c::{.d("client disconnected: ");.p(x);rmclient(x);updSubscription(clients);.d("clients left: ");.p(#clients)} +.srv.e::{.d("error: ");.p(x);.p(y)} diff --git a/examples/stocks/alpaca/ws/feed_consumer.kg b/examples/stocks/alpaca/ws/feed_consumer.kg new file mode 100644 index 0000000..c7b85e9 --- /dev/null +++ b/examples/stocks/alpaca/ws/feed_consumer.kg @@ -0,0 +1,75 @@ +.py("klongpy.db") + +.comment("****") + +Feed consumer for listening to streaming stock bars from Alpaca. + +Data is received from the feed server (that's connected to Alpaca) and stored in the database. + +Sample bars data: + +{ + "T": "b", + "S": "SPY", + "o": 388.985, + "h": 389.13, + "l": 388.975, + "c": 389.12, + "v": 49378, + "t": "2021-02-22T19:15:00Z" (converted to timestamp on server) +} + +TODO: add delta sync against feed server + +**** + +time::{[t0];t0::.pc();x();.pc()-t0} +time1::{[t0];t0::.pc();x(y);.pc()-t0} + +tbs::.tables("/tmp/tables/consumer") + +:" Create a new table with appropriate columns for the Alpaca data " +cols::["S" "o" "h" "l" "c" "v" "t"] +colsFromData::{{(x@0),,[]}'x} +colsFromNames::{{x,,[]}'x} +newt::{.table(colsFromNames(cols))} + +:" Attempt to load the prices table from disk " +prices::tbs?"prices" +prices:::[:_prices;newt();prices] + +:" Create a database so we can inspect the data " +q:::{} +q,"prices",,prices +db::.db(q) + +stats::{[q];q::db("select count(*) from prices");.d("rows: ");.p(q);q} +lastCnt::stats() + +:" Flush the prices table every minute " +store::{[q];.p("");q::stats();:[q>lastCnt;tbs,"prices",prices;.p("no change")];lastCnt::q;1} +timeStore::{[r];r::time(store);.d("store ms: ");.p(r)} +.timer("store";60;timeStore) + +:"Connect to the broadcast server" +.p("connecting to server on port 8888") +cli::.cli(8888) + +cli(:subscribe,,["b.MSFT" "b.AAPL" "b.GOOG"]) + +:" A basic bollinger band strategy " +mean::{(+/x)%#x} +std::{((+/((x-y)^2))%#x)^0.5} +bollinger::{[m s];m::mean(x);s::std(x;mean(x));(m+2*s),(m-2*s)} +signal::{[upper lower p];upper::x@0;lower::x@1;p::y@0;:[pupper;"sell";"hold"]]} + +analyze::{[b c];c::db("select c from prices order by S desc limit 20");b::bollinger(c);.d("signal: ");.p(signal(b;c))} +timeAnalyze::{time(analyze)} +analyzeComplete::{.d("analyze ms: ");.p(x)} +asyncAnalyze::.async(timeAnalyze;analyzeComplete) + +updateDb::{[u d];u::x;d::{u?x}'cols;.d("insert: ");.d(d);.insert(prices;d)} +timeUpdateDb::{[ms];ms::time1(updateDb;x);.d(" ");.d(ms);.p(" ms")} + +:" Called by server when there is a subscription update." +update::{timeUpdateDb(x);asyncAnalyze()} diff --git a/examples/alpaca/stream.kg b/examples/stocks/alpaca/ws/stream.kg similarity index 76% rename from examples/alpaca/stream.kg rename to examples/stocks/alpaca/ws/stream.kg index 1523022..86eeb73 100644 --- a/examples/alpaca/stream.kg +++ b/examples/stocks/alpaca/ws/stream.kg @@ -1,6 +1,9 @@ .py("klongpy.ws") .comment("****") + +Simple example for connecting to Alpaca stock data websocket and subscribing to AAPL Trades. + Auth Volley: receive: [{"T":"success","msg":"connected"}] @@ -8,10 +11,9 @@ send: {"action": "auth", "key": "{KEY_ID}", "secret": "{SECRET}"} receive: [{"T":"success","msg":"authenticated"}] send: {"action":"subscribe","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"]} -confirmation: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"],"updatedBars":[],"dailyBars":["VOO"],"statuses":["*"],"lulds":[],"corrections":["AAPL"],"cancelErrors":["AAPL"]}] - -{"action": "unsubscribe", "bars": ["*"]} +receive: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"],"updatedBars":[],"dailyBars":["VOO"],"statuses":["*"],"lulds":[],"corrections":["AAPL"],"cancelErrors":["AAPL"]}] +Trade sample: { "T": "t", "i": 96921, @@ -23,6 +25,9 @@ confirmation: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"ba "c": ["@", "I"], "z": "C" } + +to unsubcribe: {"action": "unsubscribe", "bars": ["*"]} + **** auth:::{["action" "auth"]} @@ -31,7 +36,6 @@ auth,"secret",,.os.env?"ALPACA_SECRET_KEY" subscription:::{["action" "subscribe"]} subscription,"trades",,["AAPL"] -.p(subscription) subscribe::{x(subscription)} handleData::{.p(y)} diff --git a/examples/polygon/update_data.kg b/examples/stocks/polygon/update_data.kg similarity index 100% rename from examples/polygon/update_data.kg rename to examples/stocks/polygon/update_data.kg diff --git a/examples/ws/feed.kg b/examples/stocks/polygon/ws/feed.kg similarity index 100% rename from examples/ws/feed.kg rename to examples/stocks/polygon/ws/feed.kg diff --git a/examples/ws/feed_consumer.kg b/examples/stocks/polygon/ws/feed_consumer.kg similarity index 100% rename from examples/ws/feed_consumer.kg rename to examples/stocks/polygon/ws/feed_consumer.kg diff --git a/examples/ws/polygon.kg b/examples/stocks/polygon/ws/stream.kg similarity index 83% rename from examples/ws/polygon.kg rename to examples/stocks/polygon/ws/stream.kg index 5de6ca3..6cf6262 100644 --- a/examples/ws/polygon.kg +++ b/examples/stocks/polygon/ws/stream.kg @@ -1,5 +1,11 @@ .py("klongpy.ws") +.comment("****") + +Simple Stream example for connecting to Polygon stock data websocket. + +**** + auth:::{["action" "auth"]} KEY::.os.env?"POLYGON_API_KEY" auth::auth,"params",,KEY diff --git a/klongpy/sys_fn.py b/klongpy/sys_fn.py index a9d8fcb..0b573db 100644 --- a/klongpy/sys_fn.py +++ b/klongpy/sys_fn.py @@ -451,8 +451,23 @@ def eval_sys_python_call(klong, x, y, z): Example: - .pyc(obj;[1 2 3];:{"a":1,"b":2,"c":3}) - .pyc(obj,"method";[1 2 3];:{"a":1,"b":2,"c":3}) + Python objects may be directly called: + + .pyc(obj;[1 2 3];:{"a":1,"b":2,"c":3}) + + or a method on the object may be called: + + .pyc(obj,"method";[1 2 3];:{"a":1,"b":2,"c":3}) + + if the python object name is snake case, then the quote is not needed: + + Here, parse_date is a function in the iso8601 module: + + .pyf("iso8601";"parse_date") + + and it can be called via: + + .pyc("parse_date";,"2020-01-01";:{}) """ if is_list(x): @@ -467,7 +482,7 @@ def eval_sys_python_call(klong, x, y, z): if not callable(f): return f else: - f = x + f = klong[KGSym(x)] if isinstance(x,str) else x if not callable(f): return f if not is_list(y): diff --git a/scripts/kgpy b/scripts/kgpy index 4d538db..eca75af 100644 --- a/scripts/kgpy +++ b/scripts/kgpy @@ -368,15 +368,16 @@ if __name__ == "__main__": def gather_io_tasks(io_loop): done_event = threading.Event() - tasks = asyncio.all_tasks(loop=io_loop) - async def main_task(): - gathered_task = asyncio.gather(*tasks) - gathered_task.add_done_callback(lambda: done_event.set()) - await gathered_task - if len(tasks) > 1: - io_loop.call_soon_threadsafe(asyncio.create_task, main_task()) - else: - done_event.set() + # tasks = asyncio.all_tasks(loop=io_loop) + # print(tasks) + # async def main_task(): + # gathered_task = asyncio.gather(*tasks) + # gathered_task.add_done_callback(lambda x: done_event.set()) + # await gathered_task + # if len(tasks) > 1: + # io_loop.call_soon_threadsafe(asyncio.create_task, main_task()) + # else: + # done_event.set() return done_event gather_io_tasks(io_loop).wait()