Skip to content

Commit

Permalink
add alpaca examples
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Dec 14, 2023
1 parent d5a6cb7 commit 2064ba6
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 12 deletions.
48 changes: 48 additions & 0 deletions examples/alpaca/stream.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
.py("klongpy.ws")

.comment("****")
Auth Volley:

receive: [{"T":"success","msg":"connected"}]
send: {"action": "auth", "key": "{KEY_ID}", "secret": "{SECRET}"}
receive: [{"T":"success","msg":"authenticated"}]

send: {"action":"subscribe","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"]}
confirmation: [{"T":"subscription","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["*"],"updatedBars":[],"dailyBars":["VOO"],"statuses":["*"],"lulds":[],"corrections":["AAPL"],"cancelErrors":["AAPL"]}]

{"action": "unsubscribe", "bars": ["*"]}

{
"T": "t",
"i": 96921,
"S": "AAPL",
"x": "D",
"p": 126.55,
"s": 1,
"t": "2021-02-22T15:51:44.208Z",
"c": ["@", "I"],
"z": "C"
}
****

auth:::{["action" "auth"]}
auth,"key",,.os.env?"ALPACA_API_KEY"
auth,"secret",,.os.env?"ALPACA_SECRET_KEY"

subscription:::{["action" "subscribe"]}
subscription,"trades",,["AAPL"]
.p(subscription)
subscribe::{x(subscription)}

handleData::{.p(y)}

handleError::{.d("error: ");.p(y)}

sendAuth::{.p("sending auth");x(auth)}
handleAuthFailure::{.p("failed to auth")}
handleLogin::{.p("logged in");subscribe(x)}
handleControl::{[msg];msg::y?"msg";:[msg="connected";sendAuth(x);:[msg="authenticated";handleLogin(x;y);handleAuthFailure(x;y)]]}
handleMsg::{[q T];.p(y);T::y?"T";:[T="success";handleControl(x;y);:[T="error";handleError(x;y);handleData(x;y)]]}
.ws.m::{[c];c::x;{handleMsg(c;x)}'y}

c::.ws("wss://stream.data.alpaca.markets/v2/sip")
30 changes: 30 additions & 0 deletions examples/alpaca/update_data.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
.comment("****")

Get a list of all tickers from Alpaca.
For each ticker, get the latest stored time from the dfs and use that as the start time for the next request
map the new ticker data to the columns of the dfs
append the new data to the dfs
write the data back to the dfs

****

.py("klongpy.db")
.pyf("alpaca.trading.client";"TradingClient")
.pyf("alpaca.data.historical";"StockHistoricalDataClient")
.pyf("alpaca.data.requests";"StockLatestTradeRequest")

keys:::{}
keys,"api_key",,.os.env?"ALPACA_API_KEY"
keys,"secret_key",,.os.env?"ALPACA_SECRET_KEY"

args:::{};{args,x,,y}'keys

tc::.pyc(TradingClient;[];args)
account::.pyc(tc,"get_account";[];:{})

.d("cash: ");.p(.pyc(account,"cash";[];:{}))

dc::.pyc(StockHistoricalDataClient;[];keys)
tr::.pyc(StockLatestTradeRequest;[];:{["symbol_or_symbols" ["MSFT"]]})
prices::.pyc(dc,"get_stock_latest_trade";,tr;:{})
.d("prices: ");.p(prices)
5 changes: 5 additions & 0 deletions klongpy/sys_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ def eval_sys_python_call(klong, x, y, z):
y is either a list of or a single positional arguments to pass to the function.
z is a dictionary of keyword arguments to pass to the function.
Example:
.pyc(obj;[1 2 3];:{"a":1,"b":2,"c":3})
.pyc(obj,"method";[1 2 3];:{"a":1,"b":2,"c":3})
"""
if is_list(x):
if not isinstance(x[0],object):
Expand Down
31 changes: 19 additions & 12 deletions klongpy/ws/sys_fn_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ class KGRemoteCloseConnectionException(KlongException):
pass


class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)


def encode_message(msg):
return json.dumps(msg)
return json.dumps(msg, cls=NumpyEncoder)


def decode_message(data):
Expand Down Expand Up @@ -159,7 +166,7 @@ def __init__(self, ioloop, klongloop, klong, conn_provider, shutdown_event=None,
self.shutdown_event.subscribe(self.close)

def run_client(self):
"""
"""
"""
self.running = True
connect_event = threading.Event()
Expand All @@ -181,7 +188,7 @@ def run_server(self):
"""
self.running = True
return self._run(self.on_connect, self.on_close, self.on_error, self.on_message)

async def _run(self, on_connect, on_close, on_error, on_message):
while self.running:
try:
Expand Down Expand Up @@ -237,7 +244,7 @@ async def _listen(self, on_message):
except websockets.exceptions.ConnectionClosed:
logging.info("Connection error")
raise KlongWSConnectionFailureException()

def call(self, msg):
"""
"""
Expand Down Expand Up @@ -268,7 +275,7 @@ def _stop(self):
Stop the network client.
First send the KGRemoteCloseConnection message to the server to tell it to close the connection.
"""
self.running = False
self._run_exit_event.wait()
Expand Down Expand Up @@ -316,16 +323,16 @@ def is_open(self):

def get_arity(self):
return 1

def __str__(self):
return f"{str(self.conn_provider)}:fn"

@staticmethod
def create_from_conn_provider(ioloop, klongloop, klong, conn_provider, shutdown_event=None, on_connect=None, on_close=None, on_error=None, on_message=None):
"""
Create a network client to connect to a remote server.
:param ioloop: the asyncio ioloop
:param klongloop: the klong loop
:param klong: the klong interpreter
Expand All @@ -339,7 +346,7 @@ def create_from_conn_provider(ioloop, klongloop, klong, conn_provider, shutdown_
@staticmethod
def create_from_uri(ioloop, klongloop, klong, uri, shutdown_event=None, on_connect=None, on_close=None, on_error=None, on_message=None):
"""
Create a network client to connect to a remote server.
:param ioloop: the asyncio ioloop
Expand All @@ -348,7 +355,7 @@ def create_from_uri(ioloop, klongloop, klong, uri, shutdown_event=None, on_conne
:param addr: the address to connect to. If the address is an integer, it is interpreted as a port in "localhost:<port>".
:return: a network client
"""
conn_provider = ClientConnectionProvider(uri)
return NetworkClient.create_from_conn_provider(ioloop, klongloop, klong, conn_provider, shutdown_event=shutdown_event, on_connect=on_connect, on_close=on_close, on_error=on_error, on_message=on_message)
Expand Down Expand Up @@ -389,9 +396,9 @@ def create_from_uri(ioloop, klongloop, klong, uri, shutdown_event=None, on_conne

# async def handle_client(self, websocket):
# """

# Handle a client connection. Messages are read from the client and executed on the klong loop.

# """
# # host, port, _, _ = writer.get_extra_info('peername')
# # if host == "::1":
Expand Down

0 comments on commit 2064ba6

Please sign in to comment.