Skip to content

Commit

Permalink
add stats logging example
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Jan 25, 2024
1 parent a157c0b commit 65d3c23
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 8 deletions.
19 changes: 19 additions & 0 deletions examples/stats_logging/stats_client.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.comment("****")


****

.py("util.py")

cli::.cli(8888)

:" create a dictionary mapped sensor udpate "
dsensor::{[d];d:::{};d,"t",now();d,"n",,"temp";d,"v",(.rn()*100)}

:" create a raw sensor update "
sensor::{now(),1,(.rn()*100)}

:" send a batch of sensor updates "
send::{cli(:rupdate,,({sensor()}'!100000));1}

.timer("update";0;send)
51 changes: 51 additions & 0 deletions examples/stats_logging/stats_server.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
.comment("****")

A simple stats server that collects stats from clients.



The database is flushed to disk periodically.

****

.py("klongpy.db")

:" function timing utility "
time0::{[t0];t0::.pc();x@[];.pc()-t0}

:" open the tables key-value store "
tbs::.tables("/tmp/tables/stats")

:" Load or create a new table with stats columns "
cols::["t" "n" "v"]
colsFromData::{{(x@0),,[]}'x}
colsFromNames::{{x,,[]}'x}
statsT:::[:_(tbs?"stats");.table(colsFromNames(cols));tbs?"stats"]

:" Create a database so we can inspect the data "
db::.db(:{},"stats",,statsT)

syncDb::{db("select 1 from stats")}
tableSize::{[q];q::db("select count(*) from stats");.d("rows: ");.p(q);q}
tableSize()

lastTime::.pc()
lastSize::0
batchSize::0
writeStats::{[n];n::.pc();.d("writes/sec: ");.p((batchSize-lastSize)%(n-lastTime));lastTime::n;lastSize::batchSize}
.timer("write stats";60;writeStats)

flushTable::{tbs,"stats",statsT}
store::{:[batchSize;flushTable();0];batchSize::0;1}
timeStore::{[r];r::time0(store());.d("store ms: ");.p(r)}
flush::{tableSize();timeStore();1}
.timer("flush";300;flush)

:" Called by clients to add new data to the server "
update::{{[u];u::x;.insert(statsT;{u?x}'cols)}'x}

:" Raw bulk update (without column mapping) "
rupdate::{.insert(statsT;x);batchSize::batchSize+#x}

:" Start the IPC server so clients can connect and add data (via update) "
.srv(8888)
8 changes: 8 additions & 0 deletions examples/stats_logging/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from datetime import datetime

import pytz


def now():
return datetime.now().astimezone(pytz.utc).timestamp()

25 changes: 17 additions & 8 deletions klongpy/db/sys_fn_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def get_dataframe(self):
def insert(self, y):
self.buffer.append(y)

def insertb(self, y):
self.buffer.extend(y)

def commit(self):
if not self.buffer:
return
Expand Down Expand Up @@ -254,19 +257,25 @@ def eval_sys_fn_insert_table(x, y):
Examples:
t::.table(d)
.insert(t, [1;2;3])
.insert(t, [1 2 3])
for batch inserts supply an array of arrays:
.insert(t, [[1 2 3] [4 5 6]])
"""
if not isinstance(x,Table):
raise KlongDbException(x, "Inserts must be applied to a table")
if len(x.columns) > 1:
if not np.isarray(y):
raise KlongDbException(x, "Values to insert must be a list")
elif not np.isarray(y):
y = np.array([y])
if len(y) != len(x.columns):
if not np.isarray(y):
raise KlongDbException(x, "Values to insert must be a list")
batch = len(y.shape) > 1
y_cols = len(y[0]) if batch else len(y)
if y_cols != len(x.columns):
raise KlongDbException(x, f"Expected {len(x.columns)} values, received {len(y)}")
x.insert(y)
if batch:
x.insertb(y)
else:
x.insert(y)
return x


Expand Down
19 changes: 19 additions & 0 deletions tests/kgtests/db/test_multi_bulk_insert.kg
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.py("klongpy.db")

a::[1 2 3]
b::[2 3 4]
c::[3 4 5]

e::[]
e::e,,"a",,a
e::e,,"b",,b
e::e,,"c",,c
T::.table(e)

db::.db(:{},"T",,T)

t("db(""select * from T"")"; db("select * from T"); [[1 2 3] [2 3 4] [3 4 5]])

.insert(T; [[4 5 6] [7 8 9]])

t("db(""select * from T"")"; db("select * from T"); [[1 2 3] [2 3 4] [3 4 5] [4 5 6] [7 8 9]])

0 comments on commit 65d3c23

Please sign in to comment.