Skip to content

Commit

Permalink
add alpaca feed/feed_consumer stock examples
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Dec 14, 2023
1 parent c93b927 commit dc0c450
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 16 deletions.
File renamed without changes.
76 changes: 76 additions & 0 deletions examples/stocks/alpaca/ws/feed.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
.py("klongpy.ws")
.pyf("iso8601";"parse_date")

.comment("****")

feed type:
t trades (includes corrections and cancelErrors)
q quotes
b bars

<feed type>.<symbol or *>

e.g.

b.MSFT minute bar for MSFT

or

b.* for all minute bars

****

auth:::{["action" "auth"]}
auth,"key",,.os.env?"ALPACA_API_KEY"
auth,"secret",,.os.env?"ALPACA_SECRET_KEY"

:" subscription handler "
mkbars::{{2_x}'x@&{#x?"b."}'x}
mktrades::{[]}
mkquotes::{[]}
psmsg::{[d];d:::{["action" "subscribe"]};d,"bars",,mkbars(x);d,"trades",,mktrades(x);d,"quotes",,mkquotes(x)}
unionValues::{[q];q:::{};{{q,x,1}'x@1}'x;{x@0}'q}
updSubscription::{[v q];.d("subscribing: ");v::unionValues(x);v::psmsg(v);.p(v);wsc(v)}

:" Map of clients handles to their subscribed tickers "
clients:::{}
rmclient::{x_clients}

:" Handle server auth request "
sendAuth::{.p("sending auth");.p(auth);x(auth)}
handleUnknown::{.d("unhandled");.p(y)}
handleLogin::{.p("logged in")}
handleControl::{[msg];msg::y?"msg";:[msg="connected";sendAuth(x);:[msg="authenticated";handleLogin(x;y);handleAuthFailure(x;y)]]}
handleSubscribed::{.p(y)}

handlers:::{}
handlers,"success",handleControl
handlers,"subscription",handleSubscribed
handleStatus::{[h];.p(y);h::handlers?(y?"T");:[h;h(x;y);handleUnknown(x;y)]}

:" convert the RFC-3339 formatted timestamp "
mkts::{[d ts];d::.pyc("parse_date";,x;:{});.pyc(d,,"timestamp";[];:{})}

:" send the message to all subscribed clients "
send::{.d("sending ");.d(k);.d(" to client ");.p(y);y(:update,,z)}
match::{[ev k subscription];ev::x;k::y;subscription::z;all::ev,".*";:[subscription?all;1;:[subscription?k;1;0]]}
broadcast::{[ev k data];ev::x;k::y;data::z;{:[match(ev;k;x@1);send(k;x@0;data);0]}'clients}
handleData::{[ev sym k];.d("data: ");.p(y);ev::(y?"T");sym::(y?"S");y,"t",mkts(y?"t");k::ev,".",sym;broadcast(ev;k;y)}

handleMsg::{:[(y?"T")="b";handleData(x;y);handleStatus(x;y)]}

.ws.m::{[c];c::x;{handleMsg(c;x)}'y}

wsuri:::[(#.os.argv);.os.argv@0;"wss://stream.data.alpaca.markets/v2/sip"]
.d("connecting to ");.p(wsuri)
wsc::.ws(wsuri)

:" Called by clients to subscribe to ticker updates "
updClientSub::{clients,x,,(clients?x),,y;.d("clients: ");.p(clients)}
subscribe::{.d("subscribing client: ");.p(.cli.h,,x);updClientSub(.cli.h;x);updSubscription(clients);x}

:" Setup the IPC server and callbacks "
.srv(8888)
.srv.o::{.d("client connected: ");.p(x);clients,x,,[]}
.srv.c::{.d("client disconnected: ");.p(x);rmclient(x);updSubscription(clients);.d("clients left: ");.p(#clients)}
.srv.e::{.d("error: ");.p(x);.p(y)}
75 changes: 75 additions & 0 deletions examples/stocks/alpaca/ws/feed_consumer.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
.py("klongpy.db")

.comment("****")

Feed consumer for listening to streaming stock bars from Alpaca.

Data is received from the feed server (that's connected to Alpaca) and stored in the database.

Sample bars data:

{
"T": "b",
"S": "SPY",
"o": 388.985,
"h": 389.13,
"l": 388.975,
"c": 389.12,
"v": 49378,
"t": "2021-02-22T19:15:00Z" (converted to timestamp on server)
}

TODO: add delta sync against feed server

****

time::{[t0];t0::.pc();x();.pc()-t0}
time1::{[t0];t0::.pc();x(y);.pc()-t0}

tbs::.tables("/tmp/tables/consumer")

:" Create a new table with appropriate columns for the Alpaca data "
cols::["S" "o" "h" "l" "c" "v" "t"]
colsFromData::{{(x@0),,[]}'x}
colsFromNames::{{x,,[]}'x}
newt::{.table(colsFromNames(cols))}

:" Attempt to load the prices table from disk "
prices::tbs?"prices"
prices:::[:_prices;newt();prices]

:" Create a database so we can inspect the data "
q:::{}
q,"prices",,prices
db::.db(q)

stats::{[q];q::db("select count(*) from prices");.d("rows: ");.p(q);q}
lastCnt::stats()

:" Flush the prices table every minute "
store::{[q];.p("");q::stats();:[q>lastCnt;tbs,"prices",prices;.p("no change")];lastCnt::q;1}
timeStore::{[r];r::time(store);.d("store ms: ");.p(r)}
.timer("store";60;timeStore)

:"Connect to the broadcast server"
.p("connecting to server on port 8888")
cli::.cli(8888)

cli(:subscribe,,["b.MSFT" "b.AAPL" "b.GOOG"])

:" A basic bollinger band strategy "
mean::{(+/x)%#x}
std::{((+/((x-y)^2))%#x)^0.5}
bollinger::{[m s];m::mean(x);s::std(x;mean(x));(m+2*s),(m-2*s)}
signal::{[upper lower p];upper::x@0;lower::x@1;p::y@0;:[p<lower;"buy";:[p>upper;"sell";"hold"]]}

analyze::{[b c];c::db("select c from prices order by S desc limit 20");b::bollinger(c);.d("signal: ");.p(signal(b;c))}
timeAnalyze::{time(analyze)}
analyzeComplete::{.d("analyze ms: ");.p(x)}
asyncAnalyze::.async(timeAnalyze;analyzeComplete)

updateDb::{[u d];u::x;d::{u?x}'cols;.d("insert: ");.d(d);.insert(prices;d)}
timeUpdateDb::{[ms];ms::time1(updateDb;x);.d(" ");.d(ms);.p(" ms")}

:" Called by server when there is a subscription update."
update::{timeUpdateDb(x);asyncAnalyze()}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
.py("klongpy.ws")

.comment("****")

Simple example for connecting to Alpaca stock data websocket and subscribing to AAPL Trades.

Auth Volley:

receive: [{"T":"success","msg":"connected"}]
send: {"action": "auth", "key": "{KEY_ID}", "secret": "{SECRET}"}
receive: [{"T":"success","msg":"authenticated"}]

send: {"action":"subscribe","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"]}
confirmation: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"],"updatedBars":[],"dailyBars":["VOO"],"statuses":["*"],"lulds":[],"corrections":["AAPL"],"cancelErrors":["AAPL"]}]

{"action": "unsubscribe", "bars": ["*"]}
receive: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"],"updatedBars":[],"dailyBars":["VOO"],"statuses":["*"],"lulds":[],"corrections":["AAPL"],"cancelErrors":["AAPL"]}]

Trade sample:
{
"T": "t",
"i": 96921,
Expand All @@ -23,6 +25,9 @@ confirmation: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"ba
"c": ["@", "I"],
"z": "C"
}

to unsubcribe: {"action": "unsubscribe", "bars": ["*"]}

****

auth:::{["action" "auth"]}
Expand All @@ -31,7 +36,6 @@ auth,"secret",,.os.env?"ALPACA_SECRET_KEY"

subscription:::{["action" "subscribe"]}
subscription,"trades",,["AAPL"]
.p(subscription)
subscribe::{x(subscription)}

handleData::{.p(y)}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
.py("klongpy.ws")

.comment("****")

Simple Stream example for connecting to Polygon stock data websocket.

****

auth:::{["action" "auth"]}
KEY::.os.env?"POLYGON_API_KEY"
auth::auth,"params",,KEY
Expand Down
21 changes: 18 additions & 3 deletions klongpy/sys_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,23 @@ def eval_sys_python_call(klong, x, y, z):
Example:
.pyc(obj;[1 2 3];:{"a":1,"b":2,"c":3})
.pyc(obj,"method";[1 2 3];:{"a":1,"b":2,"c":3})
Python objects may be directly called:
.pyc(obj;[1 2 3];:{"a":1,"b":2,"c":3})
or a method on the object may be called:
.pyc(obj,"method";[1 2 3];:{"a":1,"b":2,"c":3})
if the python object name is snake case, then the quote is not needed:
Here, parse_date is a function in the iso8601 module:
.pyf("iso8601";"parse_date")
and it can be called via:
.pyc("parse_date";,"2020-01-01";:{})
"""
if is_list(x):
Expand All @@ -467,7 +482,7 @@ def eval_sys_python_call(klong, x, y, z):
if not callable(f):
return f
else:
f = x
f = klong[KGSym(x)] if isinstance(x,str) else x
if not callable(f):
return f
if not is_list(y):
Expand Down
19 changes: 10 additions & 9 deletions scripts/kgpy
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,16 @@ if __name__ == "__main__":

def gather_io_tasks(io_loop):
done_event = threading.Event()
tasks = asyncio.all_tasks(loop=io_loop)
async def main_task():
gathered_task = asyncio.gather(*tasks)
gathered_task.add_done_callback(lambda: done_event.set())
await gathered_task
if len(tasks) > 1:
io_loop.call_soon_threadsafe(asyncio.create_task, main_task())
else:
done_event.set()
# tasks = asyncio.all_tasks(loop=io_loop)
# print(tasks)
# async def main_task():
# gathered_task = asyncio.gather(*tasks)
# gathered_task.add_done_callback(lambda x: done_event.set())
# await gathered_task
# if len(tasks) > 1:
# io_loop.call_soon_threadsafe(asyncio.create_task, main_task())
# else:
# done_event.set()
return done_event

gather_io_tasks(io_loop).wait()
Expand Down

0 comments on commit dc0c450

Please sign in to comment.