diff --git a/code/common/merge.q b/code/common/merge.q new file mode 100644 index 000000000..5e61d9ec2 --- /dev/null +++ b/code/common/merge.q @@ -0,0 +1,32 @@ +\d .merge + +/- function to get additional partition(s) defined by parted attribute in sort.csv +getextrapartitiontype:{[tablename] + /- check that that each table is defined or the default attributes are defined in sort.csv + /- exits with error if a table cannot find parted attributes in tablename or default + /- only checks tables that have sort enabled + tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; + tabparts]; + count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"]; + defaultparts]; + [.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]] + ]; + tabparts + }; + +/- function to check each partiton type specified in sort.csv is actually present in specified table +checkpartitiontype:{[tablename;extrapartitiontype] + $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; + .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; + .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; + }; + + + +/- function to get list of distinct combiniations for partition directories +/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ] +getextrapartitions:{[tablename;extrapartitiontype] + value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] + }; diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 14ea03ebe..14f49e452 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -22,6 +22,13 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file +compression:@[value;`compression;()] //specify the compress level, empty list if no required +partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk +tempdir:@[value;`tempdir;`:tempmergedir] //location to save data for partandmerge replay +mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process +mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process + + / - settings for the common save code (see code/common/save.q) .save.savedownmanipulation:@[value;`savedownmanipulation;()!()] // a dict of table!function used to manipuate tables at EOD save .save.postreplay:@[value;`postreplay;{{[d;p] }}] // post replay function, invoked after all the tables have been written down for a given log file @@ -50,6 +57,9 @@ sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file [-.replay.basicmode [0|1]]\t\t\tDo a basic replay, which reads the table into memory then saves down with .Q.hdpf. Is probably faster for basic replays (in-memory sort rather than on-disk). Default is 0 [-.replay.exitwhencomplete [0|1]]\t\tProcess exits when complete. Default is 1 [-.replay.checklogfiles [0|1]\t\t\tCheck log files for corruption, if corrupt then write a good log and replay this. Default is 0 + [-.replay.partandmerge [0|1]\t\t\tDo a replay where the data is partitioned to a specified temp directory and then merged on disk. Default is 0 + [-.replay.compression x]\t\t\tSet the compression settings for .z.zd. Default is empty list (no compression) + [-.replay.tempdir x]\t\t\tThe directory to save data to before moving it to the hdb. Default is the same as the hdb \n There are some other functions/variables which can be modified to change the behaviour of the replay, but shouldn't be done from the config file Instead, load the script in a wrapper script which sets up the definition @@ -77,8 +87,11 @@ if[not partitiontype in `date`month`year; .err.ex[`replayinit;"partitiontype mus if[messagechunks=0;.err.ex[`replayinit;"messagechunks value cannot be 0";2]]; trackonly:messagechunks < 0 -if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]] -messagechunks:abs messagechunks +if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]]; +messagechunks:abs messagechunks; + +if[partandmerge and hdbdir = tempdir;.err.ex[`replayinit;"if using partandmerge replay, tempdir must be set to a different directory than the hdb";1]]; +if[partandmerge and sortafterreplay;(sortafterreplay:0b; .lg.o[`replayinit;"Setting sortafterreplay to 0b"])]; // load the schema \d . @@ -108,25 +121,28 @@ pathtotable:{[h;p;t] `$(string .Q.par[h;partitiontype$p;t]),"/"} // create empty tables - we need to make sure we only create them once emptytabs:`symbol$() -createemptytable:{[h;p;t] - if[(not (path:pathtotable[h;p;t]) in .replay.emptytabs) and .replay.emptytables; +createemptytable:{[h;p;t;td] + $[partandmerge;dest:td;dest:h]; + if[(not (path:pathtotable[dest;p;t]) in .replay.emptytabs) and .replay.emptytables; .lg.o[`replay;"creating empty table ",(string t)," at ",string path]; .replay.emptytabs,:path; savetabdatatrapped[h;p;t;0#value t;0b]]} -savetabdata:{[h;p;t;data;UPSERT] - path:pathtotable[h;p;t]; - .lg.o[`replay;"saving table ",(string t)," to ",string path]; +savetabdata:{[h;p;t;data;UPSERT;td] + $[partandmerge;path:pathtotable[td;p;t];path:pathtotable[h;p;t]]; + if[not partandmerge;.lg.o[`replay;"saving table ",(string t)," to ",string path]]; .replay.pathlist[t],:path; - $[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])} -savetabdatatrapped:{[h;p;t;data;UPSERT] .[savetabdata;(h;p;t;data;UPSERT);{.lg.e[`replay;"failed to save table : ",x]}]} + $[partandmerge;savetablesbypart[td;p;t;h];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] + } + +savetabdatatrapped:{[h;p;t;data;UPSERT;td] .[savetabdata;(h;p;t;data;UPSERT;td);{.lg.e[`replay;"failed to save table : ",x]}]} // this function should be invoked for saving tables -savetab:{[h;p;t] - createemptytable[h;p;t]; +savetab:{[td;h;p;t] + if[not partandmerge;createemptytable[h;p;t;td]]; if[count value t; .lg.o[`replay;"saving ",(string t)," which has row count ",string count value t]; - savetabdatatrapped[h;p;t;value t;1b]; + savetabdatatrapped[h;p;t;value t;1b;td]; delete from t; if[gc;.gc.run[]]]} @@ -134,7 +150,7 @@ savetab:{[h;p;t] // input is a dictionary of tablename!(list of paths) // should be the same as .replay.pathlist applysortandattr:{[pathlist] - / - convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab + // convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab .sort.sorttab each flip (key;value) @\: distinct each pathlist }; @@ -145,24 +161,27 @@ tabsincountorder:{x iasc count each value each x} // check if the count has been exceeded, and save down if it has currentcount:0 totalcount:0 -checkcount:{[h;p;counter] +checkcount:{[h;p;counter;td] currentcount+::counter; if[.replay.currentcount >= .replay.messagechunks; $[.replay.trackonly; [.replay.totalcount +: .replay.currentcount; .lg.o[`replay;"replayed a chunk of ",(string .replay.messagechunks)," messages. Total message count so far is ",string .replay.totalcount]]; [.lg.o[`replay;"number of messages to replay at once (",(string .replay.messagechunks),") has been exceeded. Saving down"]; - savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; + savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay]; .lg.o[`replay;"save complete- replaying next chunk of data"]]]; .replay.currentcount:0]} // function used to finish off the replay // generally this will be to re-sort the table, and set an attribute -finishreplay:{[h;p] +finishreplay:{[h;p;td] // save down any tables which haven't been saved - savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; + savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay]; // sort data and apply the attributes if[sortafterreplay;applysortandattr[.replay.pathlist]]; + + if[partandmerge;postreplaymerge[td;p;h]]; + // invoke any user defined post replay function .save.postreplay[h;p]; } @@ -175,7 +194,7 @@ replaylog:{[logfile] $[firstmessage>0; [.lg.o[`replay;"skipping first ",(string firstmessage)," messages"]; @[`.;`upd;:;.replay.initialupd]]; - @[`.;`upd;:;.replay.realupd]]; + @[`.;`upd;:;.replay.realupd]]; .replay.tablecounts:.replay.errorcounts:.replay.pathlist:()!(); .replay.replaydate:"D"$-10#string logfile; if[lastmessage