-
Notifications
You must be signed in to change notification settings - Fork 69
/
pubsub.q
181 lines (151 loc) · 5.94 KB
/
pubsub.q
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Pub/sub utilities initially used in Segmented TP process
// Functionality for clients to subscribe to all tables or a subset
// Includes option for subscribe to apply filters to received data
// Replaces u.q functionality
\d .stpps
// List of pub/sub tables, populated on startup
t:`
// Handles to publish all data
subrequestall:enlist[`]!enlist ()
// Handles and conditions to publish filtered data
subrequestfiltered:([]tbl:`$();handle:`int$();filts:();columns:())
// Function to send end of period messages to subscribers - endofperiod defined in client side
endp:{
(neg allsubhandles[])@\:(`endofperiod;x;y;z);
};
// Function to send end of day messages to subscribers - endofday defined on client side
end:{
(neg allsubhandles[])@\:(`endofday;x;y);
};
// Get all distinct sub handles from subrequestall and subrequestfiltered
allsubhandles:{
distinct raze union/[value subrequestall;exec handle from .stpps.subrequestfiltered]
};
// Subscribe to everything
suball:{
delhandle[x;.z.w];
add[x];
:(x;schemas[x]);
};
// Make a filtered subscription
subfiltered:{[x;y]
delhandlef[x;.z.w];
// Different handling for requests passed in as a sym list or a keyed table
val:![11 99h;(selfiltered;addfiltered)][type y] . (x;y);
$[all raze null val;(x;schemas[x]);val]
};
// Add handle to subscriber in sub all mode
add:{
if[not (count subrequestall x)>i:subrequestall[x]?.z.w;
subrequestall[x],:.z.w];
};
// Error trap function for parsing string filters
errparse:{.lg.e[`addfiltered;m:y," error: ",x];'m};
// Parse columns and where clause from keyed table, spit out errors if any stage fails
addfiltered:{[x;y]
// Use dummy queries to produce where and column clauses
filters:$[all null f:y[x;`filters];();@[parse;"select from t where ",f;.stpps.errparse[;"Filter"]] 2];
columns:last $[all null c:y[x;`columns];();@[parse;"select ",c," from t";.stpps.errparse[;"Column"]]];
// Run these clauses in a test query, add to table if successful, throw error if not
@[eval;(?;.stpps.schemas[x];filters;0b;columns);.stpps.errparse[;"Query"]];
`.stpps.subrequestfiltered upsert (x;.z.w;filters;columns);
};
// Add handle for subscriber using old API (filter is list of syms)
selfiltered:{[x;y]
filts:enlist enlist (in;`sym;enlist y);
@[eval;(?;.stpps.schemas[x];filts;0b;());.stpps.errparse[;"Query"]];
`.stpps.subrequestfiltered upsert (x;.z.w;filts;());
};
// Publish table data
pub:{[t;x]
if[not count x;:()];
if[count h:subrequestall[t];-25!(h;(`upd;t;x))];
if[t in .stpps.subrequestfiltered`tbl;
{[t;x;sels] data:eval(?;x;sels`filts;0b;sels`columns);
if[count data;neg[sels`handle](`upd;t;data)]}[t;x;]
each select handle,filts,columns from .stpps.subrequestfiltered where tbl=t
];
};
// publish and clear tables
pubclear:{
.stpps.pub'[x;value each x,:()];
@[`.;x;:;.stpps.schemasnoattributes[x]];
}
// Remove handle from subrequestall
delhandle:{[t;h]
@[`.stpps.subrequestall;t;except;h];
};
// Remove handle from subrequestfiltered
delhandlef:{[t;h]
delete from `.stpps.subrequestfiltered where tbl=t,handle=h;
};
// Remove all handles when connection closed
closesub:{[h]
delhandle[;h]each t;
delhandlef[;h]each t;
};
// Strip attributes and remove keying from tables and store in separate dictionary (for use on STP and SCTP)
attrstrip:{[t]
{@[x;cols x;`#]} each .stpps.t:t;
.stpps.schemasnoattributes:.stpps.t!value each .stpps.t;
};
// Set up table and schema information
init:{[t]
if[count b:t where not t in tables[];{.lg.e[`psinit;m:"Table ",string[x]," does not exist"];'m} each b];
.stpps.t:t except b;
.stpps.schemas:.stpps.t!value each .stpps.t;
.stpps.tabcols:.stpps.t!cols each .stpps.t;
};
\d .
// Call closesub function after initial .z.pc call on disconnect
.dotz.set[`.z.pc;{[f;x] @[f;x;()];.stpps.closesub x} @[value;.dotz.getcommand[`.z.pc];{{}}]];
// Function called on subscription
// Subscriber will call with null y parameter in sub all mode
// In sub filtered mode, y will contain tables to subscribe to and filters to apply
.u.sub:{[x;y]
if[x~`;:.z.s[;y] each .stpps.t];
if[not x in .stpps.t;
.lg.e[`sub;m:"Table ",string[x]," not in list of stp pub/sub tables"];
:(x;m)
];
$[y~`;.stpps.suball[x];.stpps.subfiltered[x;y]]
};
// Default definition for .u.pub incase process does publishes via .u.pub
.u.pub:.stpps.pub
// Define .ps wrapper functions
.ps.loaded:1b;
.ps.publish:.stpps.pub;
.ps.subscribe:.u.sub;
.ps.init:.stpps.init;
.ps.initialise:{.ps.init[tables[]];.ps.initialised:1b};
// Allow a non-kdb+ subscriber to subscribe with strings for simple conditions - return table name and schema to subscriber
.ps.subtable:{[tab;syms]
.lg.o[`subtable;"Received a subscription to ",$[count tab;tab;"all tables"]," for ",$[count syms;syms;"all syms"]];
val:.u.sub[`$tab;$[count syms;::;first] `$csv vs syms];
$[10h~type last val;'last val;val]
};
// Allow a non-kdb+ subscriber to subscribe with strings for complex conditions - return table name and schema to subscriber
.ps.subtablefiltered:{[tab;filters;columns]
.lg.o[`subtablefiltered;"Received a subscription to ",$[count tab;tab;"all tables"]," for filters: ",filters," and columns: ",columns];
val:.u.sub[`$tab;1!enlist `tabname`filters`columns!(`$tab;filters;columns)];
$[10h~type last val;'last val;val]
};
// Striping data in a TorQ Installation
// use mod to stripe into number of segments
.ds.map:{[numseg;sym] sym!(sum each string sym)mod numseg};
// Initialise subscription request on startup
.ds.subreq:(`u#`$())!`int$();
// Striping function which stores the mappings for any symbols that it has already computed and
// for subsequent requests for that symbol, it looks them up
.ds.stripe:{[input;skey]
// If no updates, return
if[0=count input;:`boolean$()];
// Check for new sym(s)
if[0N in val:.ds.subreq input;
// Append to .ds.subreq - unique attr is maintained
.ds.subreq,:.ds.map[.ds.numseg;distinct input where null val];
// Reassign val
val:.ds.subreq input;
];
skey=val
};