Skip to content
32 changes: 32 additions & 0 deletions code/common/merge.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
\d .merge

/- function to get additional partition(s) defined by parted attribute in sort.csv
getextrapartitiontype:{[tablename]
/- check that that each table is defined or the default attributes are defined in sort.csv
/- exits with error if a table cannot find parted attributes in tablename or default
/- only checks tables that have sort enabled
tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p;
[.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"];
tabparts];
count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p;
[.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"];
defaultparts];
[.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]]
];
tabparts
};

/- function to check each partiton type specified in sort.csv is actually present in specified table
checkpartitiontype:{[tablename;extrapartitiontype]
$[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename;
.lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"];
.lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]];
};



/- function to get list of distinct combiniations for partition directories
/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ]
getextrapartitions:{[tablename;extrapartitiontype]
value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype]
};
185 changes: 153 additions & 32 deletions code/processes/tickerlogreplay.q
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla

sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file

compression:@[value;`compression;()] //specify the compress level, empty list if no required
partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk
tempdir:@[value;`tempdir;`:tempmergedir] //location to save data for partandmerge replay
mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process
mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process


/ - settings for the common save code (see code/common/save.q)
.save.savedownmanipulation:@[value;`savedownmanipulation;()!()] // a dict of table!function used to manipuate tables at EOD save
.save.postreplay:@[value;`postreplay;{{[d;p] }}] // post replay function, invoked after all the tables have been written down for a given log file
Expand Down Expand Up @@ -50,6 +57,9 @@ sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file
[-.replay.basicmode [0|1]]\t\t\tDo a basic replay, which reads the table into memory then saves down with .Q.hdpf. Is probably faster for basic replays (in-memory sort rather than on-disk). Default is 0
[-.replay.exitwhencomplete [0|1]]\t\tProcess exits when complete. Default is 1
[-.replay.checklogfiles [0|1]\t\t\tCheck log files for corruption, if corrupt then write a good log and replay this. Default is 0
[-.replay.partandmerge [0|1]\t\t\tDo a replay where the data is partitioned to a specified temp directory and then merged on disk. Default is 0
[-.replay.compression x]\t\t\tSet the compression settings for .z.zd. Default is empty list (no compression)
[-.replay.tempdir x]\t\t\tThe directory to save data to before moving it to the hdb. Default is the same as the hdb
\n
There are some other functions/variables which can be modified to change the behaviour of the replay, but shouldn't be done from the config file
Instead, load the script in a wrapper script which sets up the definition
Expand Down Expand Up @@ -77,8 +87,11 @@ if[not partitiontype in `date`month`year; .err.ex[`replayinit;"partitiontype mus
if[messagechunks=0;.err.ex[`replayinit;"messagechunks value cannot be 0";2]];

trackonly:messagechunks < 0
if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]]
messagechunks:abs messagechunks
if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]];
messagechunks:abs messagechunks;

if[partandmerge and hdbdir = tempdir;.err.ex[`replayinit;"if using partandmerge replay, tempdir must be set to a different directory than the hdb";1]];
if[partandmerge and sortafterreplay;(sortafterreplay:0b; .lg.o[`replayinit;"Setting sortafterreplay to 0b"])];

// load the schema
\d .
Expand Down Expand Up @@ -108,33 +121,36 @@ pathtotable:{[h;p;t] `$(string .Q.par[h;partitiontype$p;t]),"/"}

// create empty tables - we need to make sure we only create them once
emptytabs:`symbol$()
createemptytable:{[h;p;t]
if[(not (path:pathtotable[h;p;t]) in .replay.emptytabs) and .replay.emptytables;
createemptytable:{[h;p;t;td]
$[partandmerge;dest:td;dest:h];
if[(not (path:pathtotable[dest;p;t]) in .replay.emptytabs) and .replay.emptytables;
.lg.o[`replay;"creating empty table ",(string t)," at ",string path];
.replay.emptytabs,:path;
savetabdatatrapped[h;p;t;0#value t;0b]]}

savetabdata:{[h;p;t;data;UPSERT]
path:pathtotable[h;p;t];
.lg.o[`replay;"saving table ",(string t)," to ",string path];
savetabdata:{[h;p;t;data;UPSERT;td]
$[partandmerge;path:pathtotable[td;p;t];path:pathtotable[h;p;t]];
if[not partandmerge;.lg.o[`replay;"saving table ",(string t)," to ",string path]];
.replay.pathlist[t],:path;
$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])}
savetabdatatrapped:{[h;p;t;data;UPSERT] .[savetabdata;(h;p;t;data;UPSERT);{.lg.e[`replay;"failed to save table : ",x]}]}
$[partandmerge;savetablesbypart[td;p;t;h];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])]
}

savetabdatatrapped:{[h;p;t;data;UPSERT;td] .[savetabdata;(h;p;t;data;UPSERT;td);{.lg.e[`replay;"failed to save table : ",x]}]}

// this function should be invoked for saving tables
savetab:{[h;p;t]
createemptytable[h;p;t];
savetab:{[td;h;p;t]
if[not partandmerge;createemptytable[h;p;t;td]];
if[count value t;
.lg.o[`replay;"saving ",(string t)," which has row count ",string count value t];
savetabdatatrapped[h;p;t;value t;1b];
savetabdatatrapped[h;p;t;value t;1b;td];
delete from t;
if[gc;.gc.run[]]]}

// function to apply the sorting and attributes at the end of the replay
// input is a dictionary of tablename!(list of paths)
// should be the same as .replay.pathlist
applysortandattr:{[pathlist]
/ - convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab
// convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab
.sort.sorttab each flip (key;value) @\: distinct each pathlist
};

Expand All @@ -145,24 +161,27 @@ tabsincountorder:{x iasc count each value each x}
// check if the count has been exceeded, and save down if it has
currentcount:0
totalcount:0
checkcount:{[h;p;counter]
checkcount:{[h;p;counter;td]
currentcount+::counter;
if[.replay.currentcount >= .replay.messagechunks;
$[.replay.trackonly;
[.replay.totalcount +: .replay.currentcount;
.lg.o[`replay;"replayed a chunk of ",(string .replay.messagechunks)," messages. Total message count so far is ",string .replay.totalcount]];
[.lg.o[`replay;"number of messages to replay at once (",(string .replay.messagechunks),") has been exceeded. Saving down"];
savetab[h;p] each tabsincountorder[.replay.tablestoreplay];
savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay];
.lg.o[`replay;"save complete- replaying next chunk of data"]]];
.replay.currentcount:0]}

// function used to finish off the replay
// generally this will be to re-sort the table, and set an attribute
finishreplay:{[h;p]
finishreplay:{[h;p;td]
// save down any tables which haven't been saved
savetab[h;p] each tabsincountorder[.replay.tablestoreplay];
savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay];
// sort data and apply the attributes
if[sortafterreplay;applysortandattr[.replay.pathlist]];

if[partandmerge;postreplaymerge[td;p;h]];

// invoke any user defined post replay function
.save.postreplay[h;p];
}
Expand All @@ -175,7 +194,7 @@ replaylog:{[logfile]
$[firstmessage>0;
[.lg.o[`replay;"skipping first ",(string firstmessage)," messages"];
@[`.;`upd;:;.replay.initialupd]];
@[`.;`upd;:;.replay.realupd]];
@[`.;`upd;:;.replay.realupd]];
.replay.tablecounts:.replay.errorcounts:.replay.pathlist:()!();
.replay.replaydate:"D"$-10#string logfile;
if[lastmessage<firstmessage; .lg.o[`replay;"lastmessage (",(string lastmessage),") is less than firstmessage (",(string firstmessage),"). Not replaying log file"]; :()];
Expand All @@ -185,38 +204,140 @@ replaylog:{[logfile]
.lg.o[`replay;"replayed data into tables with the following counts: ","; " sv {" = " sv string x}@'flip(key .replay.tablecounts;value .replay.tablecounts)];
if[count .replay.errorcounts;
.lg.e[`replay;"errors were hit when replaying the following tables: ","; " sv {" = " sv string x}@'flip(key .replay.errorcounts;value .replay.errorcounts)]];
if[(`$(string .replay.replaydate)) in key hdbdir;
.lg.o[`replay;"HDB directory already contains ",(string .replay.replaydate)," partition. Deleting from the HDB directory"];
.os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; // delete the current dates HDB directory before performing replay
];

// set compression level
if[ 3= count compression;
.lg.o[`compression;"setting compression level to (",(";" sv string compression),")"];
.z.zd:compression;
.lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]];

$[basicmode;
[.lg.o[`replay;"basicmode set to true, saving down tables with .Q.hdpf"];
.Q.hdpf[`::;hdbdir;partitiontype$.replay.replaydate;`sym]];
// if not in basic mode, then we need to finish off the replay
finishreplay[hdbdir;.replay.replaydate]];
// if not in basic mode, then we need to finish off the replay
finishreplay[hdbdir;.replay.replaydate;tempdir]];
if[gc;.gc.run[]];}

// upd functions down here
realupd:{[f;t;x]
// increment the tablecounts
tablecounts[t]+::count first x;
// run the supplied function in the error trap
.[f;(t;x);{[t;x;e] errorcounts[t]+::count first x}[t;x]];
}[.replay.upd]
// increment the tablecounts
tablecounts[t]+::count first x;
// run the supplied function in the error trap
.[f;(t;x);{[t;x;e] errorcounts[t]+::count first x}[t;x]];
}[.replay.upd]

// amend the upd function to filter based on the table list
if[not tablelist~enlist `all; realupd:{[f;t;x] if[t in .replay.tablestoreplay; f[t;x]]}[realupd]]

// amend to do chunked saves
if[messagechunks < 0W; realupd:{[f;t;x] f[t;x]; checkcount[hdbdir;replaydate;1]}[realupd]]
if[messagechunks < 0W; realupd:{[f;t;x] f[t;x]; checkcount[hdbdir;replaydate;1;tempdir]}[realupd]]

initialupd:{[t;x]
// spin through the first X messages
$[msgcount < (firstmessage - 1);
msgcount+::1;
// Once we reach the correct message, reset the upd function
@[`.;`upd;:;.replay.realupd]]}
// spin through the first X messages
$[msgcount < (firstmessage - 1);
msgcount+::1;
// Once we reach the correct message, reset the upd function
@[`.;`upd;:;.replay.realupd]]
}

// extract user defined row counts
mergemaxrows:{[tabname] mergenumrows^mergenumtab[tabname]};

// post replay function for merge replay, invoked after all the tables have been written down for a given log file
postreplaymerge:{[td;p;h]

mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),();
// merge the tables from each partition in the tempdir together
merge[td;p;;mergelimits;h] each tabsincountorder[.replay.tablestoreplay];
.os.deldir .os.pth[string .Q.par[td;p;`]]; // delete the contents of tempdir after merge completion
}

// function to upsert to specified directory
upserttopartition:{[h;dir;tablename;tabdata;pt;expttype;expt]
dirpar:.Q.par[dir;pt;`$string first expt];
directory:` sv dirpar,tablename,`;
// make directories for tables if they don't exist
if[count tabpar:tabsincountorder[.replay.tablestoreplay] except key dirpar;
.lg.o[`dir;"creating directories under ",1_string dirpar];
tabpar:tabpar except `heartbeat`logmsg;
.[{[d;h;t](` sv d,t,`) set .Q.en[h;0#value t]};] each dirpar,'h,'tabpar];
.lg.o[`save;"saving ",(string tablename)," data to partition ",string directory];
.[
upsert;
(directory;r:update `sym!sym from ?[tabdata;{(x;y;(),z)}[in;;]'[expttype;expt];0b;()]);
{[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e}];
};

savetablesbypart:{[dir;pt;tablename;h]
arows: count value tablename;
.lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"];
// get additional partition(s) defined by parted attribute in sort.csv
extrapartitiontype:.merge.getextrapartitiontype[tablename];

// check each partition type actually is a column in the selected table
.merge.checkpartitiontype[tablename;extrapartitiontype];
// enumerate data to be upserted
enumdata:update (`. `sym)?sym from .Q.en[h;value tablename];
// get list of distinct combiniations for partition directories
extrapartitions:(`. `sym)?.merge.getextrapartitions[tablename;extrapartitiontype];

.lg.o[`save;"enumerated ",(string tablename)," table"];
// upsert data to specific partition directory
upserttopartition[h;dir;tablename;enumdata;pt;extrapartitiontype] each extrapartitions;
// empty the table
.lg.o[`delete;"deleting ",(string tablename)," data from in-memory table"];
@[`.;tablename;0#];
// run a garbage collection (if enabled)
if[gc;.gc.run[]];
};


merge:{[dir;pt;tablename;mergelimits;h]
// get int partitions
intpars:asc key ` sv dir,`$string pt; // list of enumerated partitions 0 1 2 3...
k:key each intdir:.Q.par[hsym dir;pt] each intpars; // list of table names
if[0=count raze k inter\: tablename; :()];
// get list of partition directories containing specified table
partdirs:` sv' (intdir,'parts) where not ()~/:parts:k inter\: tablename; // get each of the directories that hold the table

// exit function if no subdirectories are found
if[0=count partdirs; :()];
// merge the data in chunks depending on max rows for table
// destination for data to be userted to [backslashes corrected for windows]

dest:` sv .Q.par[h;pt;tablename],`; // provides path to where to move data to
{[tablename;dest;mergemaxrows;curr;segment;islast]
.lg.o[`merge;"reading partition ", string segment];
curr[0]:curr[0],select from get segment;
curr[1]:curr[1],segment;
$[islast or mergemaxrows < count curr[0];
[.lg.o[`merge;"upserting ",(string count curr[0])," rows to ",string dest];
dest upsert curr[0];
.lg.o[`merge;"removing segments", (", " sv string curr[1])];
.os.deldir each string curr[1];
(();())];
curr]
}[tablename;dest;(mergelimits[tablename])]/[(();());partdirs; 1 _ ((count partdirs)#0b),1b];
// set the attributes
.lg.o[`merge;"setting attributes"];

@[dest;;`p#] each .merge.getextrapartitiontype[tablename];
.lg.o[`merge;"merge complete"];
// run a garbage collection (if enabled)
if[gc;.gc.run[]];
};


\d .
/-load the sort csv
//load the sort csv
.sort.getsortcsv[.replay.sortcsv]

.replay.replaylog each .replay.logstoreplay;
.lg.o[`replay;"replay complete"]
if[.replay.exitwhencomplete; exit 0]


36 changes: 4 additions & 32 deletions code/processes/wdb.q
Original file line number Diff line number Diff line change
Expand Up @@ -129,34 +129,6 @@ savetables:{[dir;pt;forcesave;tabname]
if[gc;.gc.run[]];
]};

/- function to get additional partition(s) defined by parted attribute in sort.csv
getextrapartitiontype:{[tablename]
/- check that that each table is defined or the default attributes are defined in sort.csv
/- exits with error if a table cannot find parted attributes in tablename or default
/- only checks tables that have sort enabled
tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p;
[.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"];
tabparts];
count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p;
[.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"];
defaultparts];
[.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]]
];
tabparts
};

/- function to check each partiton type specified in sort.csv is actually present in specified table
checkpartitiontype:{[tablename;extrapartitiontype]
$[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename;
.lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"];
.lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]];
};

/- function to get list of distinct combiniations for partition directories
/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ]
getextrapartitions:{[tablename;extrapartitiontype]
value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype]
};

/- function to upsert to specified directory
upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt]
Expand All @@ -181,11 +153,11 @@ savetablesbypart:{[dir;pt;forcesave;tablename]
if[forcesave or maxrows[tablename] < arows: count value tablename;
.lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"];
/- get additional partition(s) defined by parted attribute in sort.csv
extrapartitiontype:getextrapartitiontype[tablename];
extrapartitiontype:.merge.getextrapartitiontype[tablename];
/- check each partition type actually is a column in the selected table
checkpartitiontype[tablename;extrapartitiontype];
.merge.checkpartitiontype[tablename;extrapartitiontype];
/- get list of distinct combiniations for partition directories
extrapartitions:getextrapartitions[tablename;extrapartitiontype];
extrapartitions:.merge.getextrapartitions[tablename;extrapartitiontype];
/- enumerate data to be upserted
enumdata:.Q.en[hdbdir;value tablename];
.lg.o[`save;"enumerated ",(string tablename)," table"];
Expand Down Expand Up @@ -302,7 +274,7 @@ merge:{[dir;pt;tablename;mergelimits]
}[tablename;dest;(mergelimits[tablename])]/[(();());partdirs; 1 _ ((count partdirs)#0b),1b];
/- set the attributes
.lg.o[`merge;"setting attributes"];
@[dest;;`p#] each getextrapartitiontype[tablename];
@[dest;;`p#] each .merge.getextrapartitiontype[tablename];
.lg.o[`merge;"merge complete"];
/- run a garbage collection (if enabled)
if[gc;.gc.run[]];
Expand Down