From 8b1f12f878b58e54cc038e4c978c62e6f0cffc30 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Mon, 12 Dec 2016 15:08:35 +0000 Subject: [PATCH 1/9] change tickerlogreplay to support writing out data and merging it. Compression variable added --- code/processes/tickerlogreplay.q | 203 ++++++++++++++++++++++++++++++- 1 file changed, 200 insertions(+), 3 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 14ea03ebe..9539f8e49 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -22,6 +22,13 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file +compression:@[value;`compression;()] //specify the compress level, empty list if no required +partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk +tempdir:@[value;`savedir;hdbdir] //location to save data for partandmerge replay +mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process +mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process + + / - settings for the common save code (see code/common/save.q) .save.savedownmanipulation:@[value;`savedownmanipulation;()!()] // a dict of table!function used to manipuate tables at EOD save .save.postreplay:@[value;`postreplay;{{[d;p] }}] // post replay function, invoked after all the tables have been written down for a given log file @@ -50,6 +57,9 @@ sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file [-.replay.basicmode [0|1]]\t\t\tDo a basic replay, which reads the table into memory then saves down with .Q.hdpf. Is probably faster for basic replays (in-memory sort rather than on-disk). Default is 0 [-.replay.exitwhencomplete [0|1]]\t\tProcess exits when complete. Default is 1 [-.replay.checklogfiles [0|1]\t\t\tCheck log files for corruption, if corrupt then write a good log and replay this. Default is 0 + [-.replay.partandmerge [0|1]\t\t\tDo a replay where the data is partitioned to a specified temp directory and then merged on disk. Default is 0 + [-.replay.compression x]\t\t\tSet the compression settings for .z.zd. Default is empty list (no compression) + [-.replay.tempdir x]\t\t\tThe directory to save data to before moving it to the hdb. Default is the same as the hdb \n There are some other functions/variables which can be modified to change the behaviour of the replay, but shouldn't be done from the config file Instead, load the script in a wrapper script which sets up the definition @@ -77,8 +87,10 @@ if[not partitiontype in `date`month`year; .err.ex[`replayinit;"partitiontype mus if[messagechunks=0;.err.ex[`replayinit;"messagechunks value cannot be 0";2]]; trackonly:messagechunks < 0 -if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]] -messagechunks:abs messagechunks +if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay progress will be tracked"]]; +messagechunks:abs messagechunks; + +if[partandmerge and hdbdir = tempdir;.err.ex[`replayinit;"if using partandmerge replay, tempdir must be set to a different directory than the hdb";1]]; // load the schema \d . @@ -118,7 +130,11 @@ savetabdata:{[h;p;t;data;UPSERT] path:pathtotable[h;p;t]; .lg.o[`replay;"saving table ",(string t)," to ",string path]; .replay.pathlist[t],:path; - $[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])} +/******************* + $[partandmerge;savetablesbypart[tempdir;p;t];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] +/****************** + } + savetabdatatrapped:{[h;p;t;data;UPSERT] .[savetabdata;(h;p;t;data;UPSERT);{.lg.e[`replay;"failed to save table : ",x]}]} // this function should be invoked for saving tables @@ -163,6 +179,9 @@ finishreplay:{[h;p] savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; // sort data and apply the attributes if[sortafterreplay;applysortandattr[.replay.pathlist]]; +/********** + if[partandmerge;postreplaymerge[h;p]]; +/********** // invoke any user defined post replay function .save.postreplay[h;p]; } @@ -185,6 +204,12 @@ replaylog:{[logfile] .lg.o[`replay;"replayed data into tables with the following counts: ","; " sv {" = " sv string x}@'flip(key .replay.tablecounts;value .replay.tablecounts)]; if[count .replay.errorcounts; .lg.e[`replay;"errors were hit when replaying the following tables: ","; " sv {" = " sv string x}@'flip(key .replay.errorcounts;value .replay.errorcounts)]]; +/****************** + if[(`$(string .replay.replaydate)) in key hdbdir; + .lg.o[`replay;"HDB directory already contains ",(string .replay.replaydate)," partition. Deleting from the HDB directory"]; + .os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; / delete the the current dates HDB directory before performing replay + ]; +/***************** $[basicmode; [.lg.o[`replay;"basicmode set to true, saving down tables with .Q.hdpf"]; .Q.hdpf[`::;hdbdir;partitiontype$.replay.replaydate;`sym]]; @@ -213,6 +238,176 @@ initialupd:{[t;x] // Once we reach the correct message, reset the upd function @[`.;`upd;:;.replay.realupd]]} +/************* + +/- extract user defined row counts +mergemaxrows:{[tabname] mergenumrows^mergenumtab[tabname]}; + +/ post replay function for merge replay, invoked after all the tables have been written down for a given log file +postreplaymerge:{[d;p] + /- set compression level + if[ 3= count compression; + .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; + .z.zd:compression; + .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; + buildsummarytables[p;tempdir;hdbdir;tables[`.]]; + mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); + /merge the tables from each partition in the tempdir together + merge[tempdir;p;;mergelimits] each tabsincountorder[.replay.tablestoreplay]; + .os.deldir .os.pth[string .Q.par[tempdir;p;`]]; / delete the contents of tempdir after merge completion + } + + +buildsummarytables:{[pt;tempdir;hdbdir;tablist] + if[`testquote in tablist; + writesummary[`tquote_summary;(get_tquote_summary_temp;tempdir;pt);hdbdir;pt] ]; + if[`pp_prices in tablist; + writesummary[`pp_price_update_summary;(get_pp_price_update_summary_temp;tempdir;pt);hdbdir;pt] ]; + if[`ecn_marketdata_ladder in tablist; + writesummary[`ecn_vwap_avg;(get_ecn_vwap_avg_temp;tempdir;pt);hdbdir;pt] ]; + } + + + +get_tquote_summary_temp:{[tempdir;pt] + // build list of partitions + p:partitionlist[tempdir;pt;`testquote]; + raze get_tquote_summary[;0Np;0Np] each p} + + +get_pp_price_update_summary_temp:{[tempdir;pt] + // build list of partitions + p:partitionlist[tempdir;pt;`pp_prices]; + raze get_pp_price_update_summary[;0Np;0Np] each p} + + +get_ecn_vwap_avg_temp:{[tempdir;pt] + // build list of partitions + p:partitionlist[tempdir;pt;`ecn_marketdata_ladder]; + raze get_ecn_vwap_avg[;0Np;0Np] each p} + + +writesummary:{[summaryname;summaryfunc;hdb;pt] + .lg.o[`summarytables;"loading sym file"]; + load ` sv (hsym hdb),`sym; + .lg.o[`summarytables;"building ",(string summaryname)," lookup table"]; + t:@[value;summaryfunc;{.lg.e[`summarytables;"failed to execute summary function for ",(string x),": ",y];-1}[summaryname]]; + if[not t~-1; + .lg.o[`summarytables;"writing summary table"]; + .[set;(` sv .Q.par[hdb;pt;summaryname],`;.Q.en[hdb;delete date from t:0!t]);{.lg.e[`summarytables;"failed to write summary table for ",(string x),": ",y]}[summaryname]]; + .lg.o[`summarytables;"summary table successfully written"]]; + } + + +/- function to get additional partition(s) defined by parted attribute in sort.csv +getextrapartitiontype:{[tablename] + /- check that that each table is defined or the default attributes are defined in sort.csv + /- exits with error if a table cannot find parted attributes in tablename or default + /- only checks tables that have sort enabled + tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; + tabparts]; + count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"]; + defaultparts]; + [.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]] + ]; + tabparts + }; + +/- function to check each partiton type specified in sort.csv is actually present in specified table +checkpartitiontype:{[tablename;extrapartitiontype] + $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; + .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; + .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; + }; + +/- function to get list of distinct combiniations for partition directories +/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablename ] +getextrapartitions:{[tablename;extrapartitiontype] + value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] + }; + +/- function to upsert to specified directory +upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt] + dirpar:.Q.par[dir;pt;`$string first expt]; + directory:` sv dirpar,tablename,`; + /- make directories for tables if they don't exist + if[count tabpar:tabsincountorder[.replay.tablestoreplay] except key dirpar; + .lg.o[`dir;"creating directories under ",1_string dirpar]; + .[{(` sv x,y,`) set .Q.en[hdbdir;0#value y]};] each dirpar,'tabpar]; + + .lg.o[`save;"saving ",(string tablename)," data to partition ",string directory]; + .[ + upsert; + (directory;r:update `sym!sym from ?[tabdata;{(x;y;(),z)}[in;;]'[expttype;expt];0b;()]); + {[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e} + ]; + }; + +savetablesbypart:{[dir;pt;tablename] + arows: count value tablename; + .lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"]; + /- get additional partition(s) defined by parted attribute in sort.csv + extrapartitiontype:getextrapartitiontype[tablename]; + + /- check each partition type actually is a column in the selected table + checkpartitiontype[tablename;extrapartitiontype]; + /- enumerate data to be upserted + enumdata:update (`. `sym)?sym from .Q.en[hdbdir;value tablename]; + /- get list of distinct combiniations for partition directories + extrapartitions:(`. `sym)?getextrapartitions[tablename;extrapartitiontype]; + + .lg.o[`save;"enumerated ",(string tablename)," table"]; + /- upsert data to specific partition directory + upserttopartition[dir;tablename;enumdata;pt;extrapartitiontype] each extrapartitions; + /- empty the table + .lg.o[`delete;"deleting ",(string tablename)," data from in-memory table"]; + @[`.;tablename;0#]; + /- run a garbage collection (if enabled) + if[gc;.gc.run[]]; + }; + + +merge:{[dir;pt;tablename;mergelimits] + /- get int partitions + intpars:asc key ` sv dir,`$string pt; / list of enumerated partitions 0 1 2 3... + k:key each intdir:.Q.par[hsym dir;pt] each intpars; / list of table names + if[0=count raze k inter\: tablename; :()]; + /- get list of partition directories containing specified table + partdirs:` sv' (intdir,'parts) where not ()~/:parts:k inter\: tablename; / get each of the directories that hold the table + + /- exit function if no subdirectories are found + if[0=count partdirs; :()]; + /- merge the data in chunks depending on max rows for table + /- destination for data to be userted to [backslashes corrected for windows] + + dest:` sv .Q.par[hdbdir;pt;tablename],`; / provides path to where to move data to + {[tablename;dest;mergemaxrows;curr;segment;islast] + .lg.o[`merge;"reading partition ", string segment]; + curr[0]:curr[0],select from get segment; + curr[1]:curr[1],segment; + $[islast or mergemaxrows < count curr[0]; + [.lg.o[`merge;"upserting ",(string count curr[0])," rows to ",string dest]; + dest upsert curr[0]; + .lg.o[`merge;"removing segments", (", " sv string curr[1])]; + .os.deldir each string curr[1]; + (();())]; + curr] + }[tablename;dest;(mergelimits[tablename])]/[(();());partdirs; 1 _ ((count partdirs)#0b),1b]; + /- set the attributes + .lg.o[`merge;"setting attributes"]; + + @[dest;;`p#] each getextrapartitiontype[tablename]; + .lg.o[`merge;"merge complete"]; + /- run a garbage collection (if enabled) + if[gc;.gc.run[]]; + }; + +/************* + + + \d . /-load the sort csv .sort.getsortcsv[.replay.sortcsv] @@ -220,3 +415,5 @@ initialupd:{[t;x] .replay.replaylog each .replay.logstoreplay; .lg.o[`replay;"replay complete"] if[.replay.exitwhencomplete; exit 0] + + From 127788c9c6b02d76891c213ac97e2aea88af3966 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Wed, 14 Dec 2016 15:37:03 +0000 Subject: [PATCH 2/9] tidy up of merge code in tickerlogreplay.q file --- code/processes/tickerlogreplay.q | 202 ++++++++++++------------------- 1 file changed, 78 insertions(+), 124 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 9539f8e49..f9d30a774 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -24,7 +24,7 @@ sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file compression:@[value;`compression;()] //specify the compress level, empty list if no required partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk -tempdir:@[value;`savedir;hdbdir] //location to save data for partandmerge replay +tempdir:@[value;`tempdir;hdbdir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process @@ -91,6 +91,7 @@ if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay pro messagechunks:abs messagechunks; if[partandmerge and hdbdir = tempdir;.err.ex[`replayinit;"if using partandmerge replay, tempdir must be set to a different directory than the hdb";1]]; +if[partandmerge and sortafterreplay;.err.ex[`replayinit;"if using partandmerge replay, sortafterreplay must be set to 0b";1]]; // load the schema \d . @@ -121,18 +122,20 @@ pathtotable:{[h;p;t] `$(string .Q.par[h;partitiontype$p;t]),"/"} // create empty tables - we need to make sure we only create them once emptytabs:`symbol$() createemptytable:{[h;p;t] + temppath:pathtotable[tempdir;p;t]; if[(not (path:pathtotable[h;p;t]) in .replay.emptytabs) and .replay.emptytables; - .lg.o[`replay;"creating empty table ",(string t)," at ",string path]; + $[partandmerge;.lg.o[`replay;"creating empty table ",(string t)," at ",string temppath]; + .lg.o[`replay;"creating empty table ",(string t)," at ",string path]]; .replay.emptytabs,:path; savetabdatatrapped[h;p;t;0#value t;0b]]} savetabdata:{[h;p;t;data;UPSERT] path:pathtotable[h;p;t]; - .lg.o[`replay;"saving table ",(string t)," to ",string path]; + temppath:pathtotable[tempdir;p;t]; + $[partandmerge;.lg.o[`replay;"saving partitioned table ",(string t)," to ",string temppath]; + .lg.o[`replay;"saving table ",(string t)," to ",string path]]; .replay.pathlist[t],:path; -/******************* $[partandmerge;savetablesbypart[tempdir;p;t];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] -/****************** } savetabdatatrapped:{[h;p;t;data;UPSERT] .[savetabdata;(h;p;t;data;UPSERT);{.lg.e[`replay;"failed to save table : ",x]}]} @@ -179,9 +182,7 @@ finishreplay:{[h;p] savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; // sort data and apply the attributes if[sortafterreplay;applysortandattr[.replay.pathlist]]; -/********** if[partandmerge;postreplaymerge[h;p]]; -/********** // invoke any user defined post replay function .save.postreplay[h;p]; } @@ -204,12 +205,10 @@ replaylog:{[logfile] .lg.o[`replay;"replayed data into tables with the following counts: ","; " sv {" = " sv string x}@'flip(key .replay.tablecounts;value .replay.tablecounts)]; if[count .replay.errorcounts; .lg.e[`replay;"errors were hit when replaying the following tables: ","; " sv {" = " sv string x}@'flip(key .replay.errorcounts;value .replay.errorcounts)]]; -/****************** if[(`$(string .replay.replaydate)) in key hdbdir; .lg.o[`replay;"HDB directory already contains ",(string .replay.replaydate)," partition. Deleting from the HDB directory"]; .os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; / delete the the current dates HDB directory before performing replay ]; -/***************** $[basicmode; [.lg.o[`replay;"basicmode set to true, saving down tables with .Q.hdpf"]; .Q.hdpf[`::;hdbdir;partitiontype$.replay.replaydate;`sym]]; @@ -219,11 +218,11 @@ replaylog:{[logfile] // upd functions down here realupd:{[f;t;x] - // increment the tablecounts - tablecounts[t]+::count first x; - // run the supplied function in the error trap - .[f;(t;x);{[t;x;e] errorcounts[t]+::count first x}[t;x]]; - }[.replay.upd] + // increment the tablecounts + tablecounts[t]+::count first x; + // run the supplied function in the error trap + .[f;(t;x);{[t;x;e] errorcounts[t]+::count first x}[t;x]]; + }[.replay.upd] // amend the upd function to filter based on the table list if[not tablelist~enlist `all; realupd:{[f;t;x] if[t in .replay.tablestoreplay; f[t;x]]}[realupd]] @@ -232,78 +231,37 @@ if[not tablelist~enlist `all; realupd:{[f;t;x] if[t in .replay.tablestoreplay; f if[messagechunks < 0W; realupd:{[f;t;x] f[t;x]; checkcount[hdbdir;replaydate;1]}[realupd]] initialupd:{[t;x] - // spin through the first X messages - $[msgcount < (firstmessage - 1); - msgcount+::1; - // Once we reach the correct message, reset the upd function - @[`.;`upd;:;.replay.realupd]]} + // spin through the first X messages + $[msgcount < (firstmessage - 1); + msgcount+::1; + // Once we reach the correct message, reset the upd function + @[`.;`upd;:;.replay.realupd]] + } -/************* -/- extract user defined row counts +// extract user defined row counts mergemaxrows:{[tabname] mergenumrows^mergenumtab[tabname]}; -/ post replay function for merge replay, invoked after all the tables have been written down for a given log file +// post replay function for merge replay, invoked after all the tables have been written down for a given log file postreplaymerge:{[d;p] - /- set compression level - if[ 3= count compression; - .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; - .z.zd:compression; - .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; - buildsummarytables[p;tempdir;hdbdir;tables[`.]]; - mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); - /merge the tables from each partition in the tempdir together - merge[tempdir;p;;mergelimits] each tabsincountorder[.replay.tablestoreplay]; - .os.deldir .os.pth[string .Q.par[tempdir;p;`]]; / delete the contents of tempdir after merge completion - } - - -buildsummarytables:{[pt;tempdir;hdbdir;tablist] - if[`testquote in tablist; - writesummary[`tquote_summary;(get_tquote_summary_temp;tempdir;pt);hdbdir;pt] ]; - if[`pp_prices in tablist; - writesummary[`pp_price_update_summary;(get_pp_price_update_summary_temp;tempdir;pt);hdbdir;pt] ]; - if[`ecn_marketdata_ladder in tablist; - writesummary[`ecn_vwap_avg;(get_ecn_vwap_avg_temp;tempdir;pt);hdbdir;pt] ]; - } - - - -get_tquote_summary_temp:{[tempdir;pt] - // build list of partitions - p:partitionlist[tempdir;pt;`testquote]; - raze get_tquote_summary[;0Np;0Np] each p} - - -get_pp_price_update_summary_temp:{[tempdir;pt] - // build list of partitions - p:partitionlist[tempdir;pt;`pp_prices]; - raze get_pp_price_update_summary[;0Np;0Np] each p} - - -get_ecn_vwap_avg_temp:{[tempdir;pt] - // build list of partitions - p:partitionlist[tempdir;pt;`ecn_marketdata_ladder]; - raze get_ecn_vwap_avg[;0Np;0Np] each p} - - -writesummary:{[summaryname;summaryfunc;hdb;pt] - .lg.o[`summarytables;"loading sym file"]; - load ` sv (hsym hdb),`sym; - .lg.o[`summarytables;"building ",(string summaryname)," lookup table"]; - t:@[value;summaryfunc;{.lg.e[`summarytables;"failed to execute summary function for ",(string x),": ",y];-1}[summaryname]]; - if[not t~-1; - .lg.o[`summarytables;"writing summary table"]; - .[set;(` sv .Q.par[hdb;pt;summaryname],`;.Q.en[hdb;delete date from t:0!t]);{.lg.e[`summarytables;"failed to write summary table for ",(string x),": ",y]}[summaryname]]; - .lg.o[`summarytables;"summary table successfully written"]]; - } + // set compression level + if[ 3= count compression; + .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; + .z.zd:compression; + .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; + + mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); + // merge the tables from each partition in the tempdir together + merge[tempdir;p;;mergelimits] each tabsincountorder[.replay.tablestoreplay]; + .os.deldir .os.pth[string .Q.par[tempdir;p;`]]; // delete the contents of tempdir after merge completion + } -/- function to get additional partition(s) defined by parted attribute in sort.csv +// function to get additional partition(s) defined by parted attribute in sort.csv getextrapartitiontype:{[tablename] - /- check that that each table is defined or the default attributes are defined in sort.csv - /- exits with error if a table cannot find parted attributes in tablename or default - /- only checks tables that have sort enabled + // check that each table is defined or the default attributes are defined in sort.csv + // exits with error if a table cannot find parted attributes in tablename or default + // only checks tables that have sort enabled tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; tabparts]; @@ -315,24 +273,24 @@ getextrapartitiontype:{[tablename] tabparts }; -/- function to check each partiton type specified in sort.csv is actually present in specified table +// function to check each partiton type specified in sort.csv is actually present in specified table checkpartitiontype:{[tablename;extrapartitiontype] $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; }; -/- function to get list of distinct combiniations for partition directories -/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablename ] -getextrapartitions:{[tablename;extrapartitiontype] + // function to get list of distinct combiniations for partition directories + // functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablename ] + getextrapartitions:{[tablename;extrapartitiontype] value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] }; -/- function to upsert to specified directory +// function to upsert to specified directory upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt] dirpar:.Q.par[dir;pt;`$string first expt]; directory:` sv dirpar,tablename,`; - /- make directories for tables if they don't exist + // make directories for tables if they don't exist if[count tabpar:tabsincountorder[.replay.tablestoreplay] except key dirpar; .lg.o[`dir;"creating directories under ",1_string dirpar]; .[{(` sv x,y,`) set .Q.en[hdbdir;0#value y]};] each dirpar,'tabpar]; @@ -341,49 +299,48 @@ upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt] .[ upsert; (directory;r:update `sym!sym from ?[tabdata;{(x;y;(),z)}[in;;]'[expttype;expt];0b;()]); - {[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e} - ]; + {[e] .lg.e[`savetablesbypart;"Failed to save table to disk : ",e];'e}]; }; savetablesbypart:{[dir;pt;tablename] - arows: count value tablename; - .lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"]; - /- get additional partition(s) defined by parted attribute in sort.csv - extrapartitiontype:getextrapartitiontype[tablename]; - - /- check each partition type actually is a column in the selected table - checkpartitiontype[tablename;extrapartitiontype]; - /- enumerate data to be upserted - enumdata:update (`. `sym)?sym from .Q.en[hdbdir;value tablename]; - /- get list of distinct combiniations for partition directories - extrapartitions:(`. `sym)?getextrapartitions[tablename;extrapartitiontype]; - - .lg.o[`save;"enumerated ",(string tablename)," table"]; - /- upsert data to specific partition directory - upserttopartition[dir;tablename;enumdata;pt;extrapartitiontype] each extrapartitions; - /- empty the table - .lg.o[`delete;"deleting ",(string tablename)," data from in-memory table"]; - @[`.;tablename;0#]; - /- run a garbage collection (if enabled) - if[gc;.gc.run[]]; + arows: count value tablename; + .lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"]; + // get additional partition(s) defined by parted attribute in sort.csv + extrapartitiontype:getextrapartitiontype[tablename]; + + // check each partition type actually is a column in the selected table + checkpartitiontype[tablename;extrapartitiontype]; + // enumerate data to be upserted + enumdata:update (`. `sym)?sym from .Q.en[hdbdir;value tablename]; + // get list of distinct combiniations for partition directories + extrapartitions:(`. `sym)?getextrapartitions[tablename;extrapartitiontype]; + + .lg.o[`save;"enumerated ",(string tablename)," table"]; + // upsert data to specific partition directory + upserttopartition[dir;tablename;enumdata;pt;extrapartitiontype] each extrapartitions; + // empty the table + .lg.o[`delete;"deleting ",(string tablename)," data from in-memory table"]; + @[`.;tablename;0#]; + // run a garbage collection (if enabled) + if[gc;.gc.run[]]; }; merge:{[dir;pt;tablename;mergelimits] - /- get int partitions - intpars:asc key ` sv dir,`$string pt; / list of enumerated partitions 0 1 2 3... - k:key each intdir:.Q.par[hsym dir;pt] each intpars; / list of table names - if[0=count raze k inter\: tablename; :()]; - /- get list of partition directories containing specified table - partdirs:` sv' (intdir,'parts) where not ()~/:parts:k inter\: tablename; / get each of the directories that hold the table + // get int partitions + intpars:asc key ` sv dir,`$string pt; // list of enumerated partitions 0 1 2 3... + k:key each intdir:.Q.par[hsym dir;pt] each intpars; // list of table names + if[0=count raze k inter\: tablename; :()]; + // get list of partition directories containing specified table + partdirs:` sv' (intdir,'parts) where not ()~/:parts:k inter\: tablename; // get each of the directories that hold the table - /- exit function if no subdirectories are found - if[0=count partdirs; :()]; - /- merge the data in chunks depending on max rows for table - /- destination for data to be userted to [backslashes corrected for windows] + // exit function if no subdirectories are found + if[0=count partdirs; :()]; + // merge the data in chunks depending on max rows for table + // destination for data to be userted to [backslashes corrected for windows] - dest:` sv .Q.par[hdbdir;pt;tablename],`; / provides path to where to move data to - {[tablename;dest;mergemaxrows;curr;segment;islast] + dest:` sv .Q.par[hdbdir;pt;tablename],`; // provides path to where to move data to + {[tablename;dest;mergemaxrows;curr;segment;islast] .lg.o[`merge;"reading partition ", string segment]; curr[0]:curr[0],select from get segment; curr[1]:curr[1],segment; @@ -395,21 +352,18 @@ merge:{[dir;pt;tablename;mergelimits] (();())]; curr] }[tablename;dest;(mergelimits[tablename])]/[(();());partdirs; 1 _ ((count partdirs)#0b),1b]; - /- set the attributes + // set the attributes .lg.o[`merge;"setting attributes"]; @[dest;;`p#] each getextrapartitiontype[tablename]; .lg.o[`merge;"merge complete"]; - /- run a garbage collection (if enabled) + // run a garbage collection (if enabled) if[gc;.gc.run[]]; }; -/************* - - \d . -/-load the sort csv +//load the sort csv .sort.getsortcsv[.replay.sortcsv] .replay.replaylog each .replay.logstoreplay; From 2fc2fd24b022d0cbec62b293c6770c55905b18a2 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Thu, 15 Dec 2016 13:44:03 +0000 Subject: [PATCH 3/9] further tidying and use of local variables instead of globals --- code/processes/tickerlogreplay.q | 258 +++++++++++++++---------------- 1 file changed, 127 insertions(+), 131 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index f9d30a774..b12f7dc0e 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -13,7 +13,7 @@ tplogfile:@[value;`tplogfile;`] // the tp log file to replay. Only this or t tplogdir:@[value;`tplogdir;`] // the tp log directory to read the log files from. Only this or tplogfile should be used (not both) partitiontype:@[value;`partitiontype;`date] // the partitioning of the database. Can be date, month or year (int would have to be handled bespokely) emptytables:@[value;`emptytables;1b] // whether to overwrite any tables at start up -sortafterreplay:@[value;`sortafterreplay;1b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) +sortafterreplay:@[value;`sortafterreplay;0b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) basicmode:@[value;`basicmode;0b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] exitwhencomplete:@[value;`exitwhencomplete;1b] // exit when the replay is complete checklogfiles:@[value;`checklogfiles;0b] // check if the log file is corrupt, if it is then write a new "good" file and replay it instead @@ -22,9 +22,9 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file -compression:@[value;`compression;()] //specify the compress level, empty list if no required -partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk -tempdir:@[value;`tempdir;hdbdir] //location to save data for partandmerge replay +compression:@[value;`compression;(17 2 4)] //specify the compress level, empty list if no required +partandmerge:@[value;`partandmerge;1b] //setting to do a replay where the data is partitioned and then merged on disk +tempdir:@[value;`tempdir;`:tempdir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process @@ -121,31 +121,28 @@ pathtotable:{[h;p;t] `$(string .Q.par[h;partitiontype$p;t]),"/"} // create empty tables - we need to make sure we only create them once emptytabs:`symbol$() -createemptytable:{[h;p;t] - temppath:pathtotable[tempdir;p;t]; - if[(not (path:pathtotable[h;p;t]) in .replay.emptytabs) and .replay.emptytables; - $[partandmerge;.lg.o[`replay;"creating empty table ",(string t)," at ",string temppath]; - .lg.o[`replay;"creating empty table ",(string t)," at ",string path]]; +createemptytable:{[h;p;t;td] + $[partandmerge;dest:td;dest:h]; + if[(not (path:pathtotable[dest;p;t]) in .replay.emptytabs) and .replay.emptytables; + .lg.o[`replay;"creating empty table ",(string t)," at ",string path]; .replay.emptytabs,:path; savetabdatatrapped[h;p;t;0#value t;0b]]} -savetabdata:{[h;p;t;data;UPSERT] - path:pathtotable[h;p;t]; - temppath:pathtotable[tempdir;p;t]; - $[partandmerge;.lg.o[`replay;"saving partitioned table ",(string t)," to ",string temppath]; - .lg.o[`replay;"saving table ",(string t)," to ",string path]]; +savetabdata:{[h;p;t;data;UPSERT;td] + $[partandmerge;path:pathtotable[td;p;t];path:pathtotable[h;p;t]]; + .lg.o[`replay;"saving table ",(string t)," to ",string path]; .replay.pathlist[t],:path; - $[partandmerge;savetablesbypart[tempdir;p;t];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] + $[partandmerge;savetablesbypart[td;p;t;h];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] } -savetabdatatrapped:{[h;p;t;data;UPSERT] .[savetabdata;(h;p;t;data;UPSERT);{.lg.e[`replay;"failed to save table : ",x]}]} +savetabdatatrapped:{[h;p;t;data;UPSERT;td] .[savetabdata;(h;p;t;data;UPSERT;td);{.lg.e[`replay;"failed to save table : ",x]}]} // this function should be invoked for saving tables -savetab:{[h;p;t] - createemptytable[h;p;t]; +savetab:{[td;h;p;t] + createemptytable[h;p;t;td]; if[count value t; .lg.o[`replay;"saving ",(string t)," which has row count ",string count value t]; - savetabdatatrapped[h;p;t;value t;1b]; + savetabdatatrapped[h;p;t;value t;1b;td]; delete from t; if[gc;.gc.run[]]]} @@ -164,25 +161,25 @@ tabsincountorder:{x iasc count each value each x} // check if the count has been exceeded, and save down if it has currentcount:0 totalcount:0 -checkcount:{[h;p;counter] +checkcount:{[h;p;counter;td] currentcount+::counter; if[.replay.currentcount >= .replay.messagechunks; $[.replay.trackonly; [.replay.totalcount +: .replay.currentcount; .lg.o[`replay;"replayed a chunk of ",(string .replay.messagechunks)," messages. Total message count so far is ",string .replay.totalcount]]; [.lg.o[`replay;"number of messages to replay at once (",(string .replay.messagechunks),") has been exceeded. Saving down"]; - savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; + savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay]; .lg.o[`replay;"save complete- replaying next chunk of data"]]]; .replay.currentcount:0]} // function used to finish off the replay // generally this will be to re-sort the table, and set an attribute -finishreplay:{[h;p] +finishreplay:{[h;p;td] // save down any tables which haven't been saved - savetab[h;p] each tabsincountorder[.replay.tablestoreplay]; + savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay]; // sort data and apply the attributes if[sortafterreplay;applysortandattr[.replay.pathlist]]; - if[partandmerge;postreplaymerge[h;p]]; + if[partandmerge;postreplaymerge[td;p;h]]; // invoke any user defined post replay function .save.postreplay[h;p]; } @@ -195,7 +192,7 @@ replaylog:{[logfile] $[firstmessage>0; [.lg.o[`replay;"skipping first ",(string firstmessage)," messages"]; @[`.;`upd;:;.replay.initialupd]]; - @[`.;`upd;:;.replay.realupd]]; + @[`.;`upd;:;.replay.realupd]]; .replay.tablecounts:.replay.errorcounts:.replay.pathlist:()!(); .replay.replaydate:"D"$-10#string logfile; if[lastmessage Date: Mon, 19 Dec 2016 12:22:39 +0000 Subject: [PATCH 4/9] don't create empty tables or replay heartbeat and logmsg when doing merge replay --- code/processes/tickerlogreplay.q | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index b12f7dc0e..85d3750cf 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -130,7 +130,7 @@ createemptytable:{[h;p;t;td] savetabdata:{[h;p;t;data;UPSERT;td] $[partandmerge;path:pathtotable[td;p;t];path:pathtotable[h;p;t]]; - .lg.o[`replay;"saving table ",(string t)," to ",string path]; + if[not partandmerge;.lg.o[`replay;"saving table ",(string t)," to ",string path]]; .replay.pathlist[t],:path; $[partandmerge;savetablesbypart[td;p;t;h];$[UPSERT;upsert;set] . (path;.Q.en[h;0!.save.manipulate[t;data]])] } @@ -139,7 +139,7 @@ savetabdatatrapped:{[h;p;t;data;UPSERT;td] .[savetabdata;(h;p;t;data;UPSERT;td); // this function should be invoked for saving tables savetab:{[td;h;p;t] - createemptytable[h;p;t;td]; + if[not partandmerge;createemptytable[h;p;t;td]]; if[count value t; .lg.o[`replay;"saving ",(string t)," which has row count ",string count value t]; savetabdatatrapped[h;p;t;value t;1b;td]; @@ -150,7 +150,7 @@ savetab:{[td;h;p;t] // input is a dictionary of tablename!(list of paths) // should be the same as .replay.pathlist applysortandattr:{[pathlist] - / - convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab + // convert pathlist dictionary into a keys and values then transpose before passing into .sort.sorttab .sort.sorttab each flip (key;value) @\: distinct each pathlist }; @@ -204,7 +204,7 @@ replaylog:{[logfile] .lg.e[`replay;"errors were hit when replaying the following tables: ","; " sv {" = " sv string x}@'flip(key .replay.errorcounts;value .replay.errorcounts)]]; if[(`$(string .replay.replaydate)) in key hdbdir; .lg.o[`replay;"HDB directory already contains ",(string .replay.replaydate)," partition. Deleting from the HDB directory"]; - .os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; / delete the the current dates HDB directory before performing replay + .os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; // delete the current dates HDB directory before performing replay ]; $[basicmode; [.lg.o[`replay;"basicmode set to true, saving down tables with .Q.hdpf"]; @@ -290,6 +290,7 @@ upserttopartition:{[h;dir;tablename;tabdata;pt;expttype;expt] // make directories for tables if they don't exist if[count tabpar:tabsincountorder[.replay.tablestoreplay] except key dirpar; .lg.o[`dir;"creating directories under ",1_string dirpar]; + tabpar:tabpar except `heartbeat`logmsg; .[{[d;h;t](` sv d,t,`) set .Q.en[h;0#value t]};] each dirpar,'h,'tabpar]; .lg.o[`save;"saving ",(string tablename)," data to partition ",string directory]; .[ From 3365aea0bfbe7849d43152c5211d071080d9b65d Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Wed, 21 Dec 2016 15:15:35 +0000 Subject: [PATCH 5/9] create common code file(merge.q) for merge write down in wdb and tickerlogreplay --- code/common/merge.q | 32 +++++++++++++++++++++++ code/processes/tickerlogreplay.q | 45 +++++--------------------------- code/processes/wdb.q | 38 ++++----------------------- 3 files changed, 44 insertions(+), 71 deletions(-) create mode 100644 code/common/merge.q diff --git a/code/common/merge.q b/code/common/merge.q new file mode 100644 index 000000000..5e61d9ec2 --- /dev/null +++ b/code/common/merge.q @@ -0,0 +1,32 @@ +\d .merge + +/- function to get additional partition(s) defined by parted attribute in sort.csv +getextrapartitiontype:{[tablename] + /- check that that each table is defined or the default attributes are defined in sort.csv + /- exits with error if a table cannot find parted attributes in tablename or default + /- only checks tables that have sort enabled + tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; + tabparts]; + count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p; + [.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"]; + defaultparts]; + [.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]] + ]; + tabparts + }; + +/- function to check each partiton type specified in sort.csv is actually present in specified table +checkpartitiontype:{[tablename;extrapartitiontype] + $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; + .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; + .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; + }; + + + +/- function to get list of distinct combiniations for partition directories +/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ] +getextrapartitions:{[tablename;extrapartitiontype] + value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] + }; diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 85d3750cf..977ad22c2 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -235,8 +235,7 @@ initialupd:{[t;x] @[`.;`upd;:;.replay.realupd]] } - -// extract user defined row counts +// extract user defined row counts mergemaxrows:{[tabname] mergenumrows^mergenumtab[tabname]}; // post replay function for merge replay, invoked after all the tables have been written down for a given log file @@ -246,43 +245,13 @@ postreplaymerge:{[td;p;h] .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; .z.zd:compression; .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; - - mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); + + mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); // merge the tables from each partition in the tempdir together merge[td;p;;mergelimits;h] each tabsincountorder[.replay.tablestoreplay]; .os.deldir .os.pth[string .Q.par[td;p;`]]; // delete the contents of tempdir after merge completion } - -// function to get additional partition(s) defined by parted attribute in sort.csv -getextrapartitiontype:{[tablename] - // check that each table is defined or the default attributes are defined in sort.csv - // exits with error if a table cannot find parted attributes in tablename or default - // only checks tables that have sort enabled - tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; - [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; - tabparts]; - count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p; - [.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"]; - defaultparts]; - [.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]] - ]; - tabparts - }; - -// function to check each partiton type specified in sort.csv is actually present in specified table -checkpartitiontype:{[tablename;extrapartitiontype] - $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; - .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; - .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; - }; - - // function to get list of distinct combiniations for partition directories - // functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablename ] - getextrapartitions:{[tablename;extrapartitiontype] - value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] - }; - // function to upsert to specified directory upserttopartition:{[h;dir;tablename;tabdata;pt;expttype;expt] dirpar:.Q.par[dir;pt;`$string first expt]; @@ -303,14 +272,14 @@ savetablesbypart:{[dir;pt;tablename;h] arows: count value tablename; .lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"]; // get additional partition(s) defined by parted attribute in sort.csv - extrapartitiontype:getextrapartitiontype[tablename]; + extrapartitiontype:.merge.getextrapartitiontype[tablename]; // check each partition type actually is a column in the selected table - checkpartitiontype[tablename;extrapartitiontype]; + .merge.checkpartitiontype[tablename;extrapartitiontype]; // enumerate data to be upserted enumdata:update (`. `sym)?sym from .Q.en[h;value tablename]; // get list of distinct combiniations for partition directories - extrapartitions:(`. `sym)?getextrapartitions[tablename;extrapartitiontype]; + extrapartitions:(`. `sym)?.merge.getextrapartitions[tablename;extrapartitiontype]; .lg.o[`save;"enumerated ",(string tablename)," table"]; // upsert data to specific partition directory @@ -352,7 +321,7 @@ merge:{[dir;pt;tablename;mergelimits;h] // set the attributes .lg.o[`merge;"setting attributes"]; - @[dest;;`p#] each getextrapartitiontype[tablename]; + @[dest;;`p#] each .merge.getextrapartitiontype[tablename]; .lg.o[`merge;"merge complete"]; // run a garbage collection (if enabled) if[gc;.gc.run[]]; diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 8cef9b805..c97caa89b 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -20,7 +20,7 @@ mode:@[value;`mode;`saveandsort]; /- the wdb process can operate in three modes /- data on disk, apply attributes and the trigger a reload on the /- rdb and hdb processes -writedownmode:@[value;`writedownmode;`default]; /- the wdb process can periodically write data to disc and sort at EOD in two ways: +writedownmode:@[value;`writedownmode;`partbyattr]; /- the wdb process can periodically write data to disc and sort at EOD in two ways: /- 1. default - the data is partitioned by [ partitiontype ] /- at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb /- 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s) assigned the parted attributed in sort.csv @@ -129,34 +129,6 @@ savetables:{[dir;pt;forcesave;tabname] if[gc;.gc.run[]]; ]}; -/- function to get additional partition(s) defined by parted attribute in sort.csv -getextrapartitiontype:{[tablename] - /- check that that each table is defined or the default attributes are defined in sort.csv - /- exits with error if a table cannot find parted attributes in tablename or default - /- only checks tables that have sort enabled - tabparts:$[count tabparts:distinct exec column from .sort.params where tabname=tablename,sort=1,att=`p; - [.lg.o[`getextraparttype;"parted attribute p found in sort.csv for ",(string tablename)," table"]; - tabparts]; - count defaultparts:distinct exec column from .sort.params where tabname=`default,sort=1,att=`p; - [.lg.o[`getextraparttype;"parted attribute p not found in sort.csv for ",(string tablename)," table, using default instead"]; - defaultparts]; - [.lg.e[`getextraparttype;"parted attribute p not found in sort.csv for ", (string tablename)," table and default not defined"]] - ]; - tabparts - }; - -/- function to check each partiton type specified in sort.csv is actually present in specified table -checkpartitiontype:{[tablename;extrapartitiontype] - $[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename; - .lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"]; - .lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]]; - }; - -/- function to get list of distinct combiniations for partition directories -/- functional select equivalent to: select distinct [ extrapartitiontype ] from [ tablenme ] -getextrapartitions:{[tablename;extrapartitiontype] - value each ?[tablename;();1b;extrapartitiontype!extrapartitiontype] - }; /- function to upsert to specified directory upserttopartition:{[dir;tablename;tabdata;pt;expttype;expt] @@ -181,11 +153,11 @@ savetablesbypart:{[dir;pt;forcesave;tablename] if[forcesave or maxrows[tablename] < arows: count value tablename; .lg.o[`rowcheck;"the ",(string tablename)," table consists of ", (string arows), " rows"]; /- get additional partition(s) defined by parted attribute in sort.csv - extrapartitiontype:getextrapartitiontype[tablename]; + extrapartitiontype:.merge.getextrapartitiontype[tablename]; /- check each partition type actually is a column in the selected table - checkpartitiontype[tablename;extrapartitiontype]; + .merge.checkpartitiontype[tablename;extrapartitiontype]; /- get list of distinct combiniations for partition directories - extrapartitions:getextrapartitions[tablename;extrapartitiontype]; + extrapartitions:.merge.getextrapartitions[tablename;extrapartitiontype]; /- enumerate data to be upserted enumdata:.Q.en[hdbdir;value tablename]; .lg.o[`save;"enumerated ",(string tablename)," table"]; @@ -302,7 +274,7 @@ merge:{[dir;pt;tablename;mergelimits] }[tablename;dest;(mergelimits[tablename])]/[(();());partdirs; 1 _ ((count partdirs)#0b),1b]; /- set the attributes .lg.o[`merge;"setting attributes"]; - @[dest;;`p#] each getextrapartitiontype[tablename]; + @[dest;;`p#] each .merge.getextrapartitiontype[tablename]; .lg.o[`merge;"merge complete"]; /- run a garbage collection (if enabled) if[gc;.gc.run[]]; From 7e13538f0cf0760154796a82585bb92ee00d32bc Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Wed, 21 Dec 2016 16:11:16 +0000 Subject: [PATCH 6/9] resetting default variables --- code/processes/tickerlogreplay.q | 8 ++++---- code/processes/wdb.q | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 977ad22c2..a4df5c3cc 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -13,7 +13,7 @@ tplogfile:@[value;`tplogfile;`] // the tp log file to replay. Only this or t tplogdir:@[value;`tplogdir;`] // the tp log directory to read the log files from. Only this or tplogfile should be used (not both) partitiontype:@[value;`partitiontype;`date] // the partitioning of the database. Can be date, month or year (int would have to be handled bespokely) emptytables:@[value;`emptytables;1b] // whether to overwrite any tables at start up -sortafterreplay:@[value;`sortafterreplay;0b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) +sortafterreplay:@[value;`sortafterreplay;1b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) basicmode:@[value;`basicmode;0b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] exitwhencomplete:@[value;`exitwhencomplete;1b] // exit when the replay is complete checklogfiles:@[value;`checklogfiles;0b] // check if the log file is corrupt, if it is then write a new "good" file and replay it instead @@ -22,9 +22,9 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file -compression:@[value;`compression;(17 2 4)] //specify the compress level, empty list if no required -partandmerge:@[value;`partandmerge;1b] //setting to do a replay where the data is partitioned and then merged on disk -tempdir:@[value;`tempdir;`:tempdir] //location to save data for partandmerge replay +compression:@[value;`compression;()] //specify the compress level, empty list if no required +partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk +tempdir:@[value;`tempdir;hdbdir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process diff --git a/code/processes/wdb.q b/code/processes/wdb.q index c97caa89b..067886de6 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -20,7 +20,7 @@ mode:@[value;`mode;`saveandsort]; /- the wdb process can operate in three modes /- data on disk, apply attributes and the trigger a reload on the /- rdb and hdb processes -writedownmode:@[value;`writedownmode;`partbyattr]; /- the wdb process can periodically write data to disc and sort at EOD in two ways: +writedownmode:@[value;`writedownmode;`default]; /- the wdb process can periodically write data to disc and sort at EOD in two ways: /- 1. default - the data is partitioned by [ partitiontype ] /- at EOD the data will be sorted and given attributes according to sort.csv before being moved to hdb /- 2. partbyattr - the data is partitioned by [ partitiontype ] and the column(s) assigned the parted attributed in sort.csv From 25ad66fce6284bc29066b166c18920a103bbb2d2 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Wed, 4 Jan 2017 16:00:06 +0000 Subject: [PATCH 7/9] Change default for tempdir. set sortafterreplay to 0b if using merge --- code/processes/tickerlogreplay.q | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index a4df5c3cc..8c171767a 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -24,7 +24,7 @@ sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file compression:@[value;`compression;()] //specify the compress level, empty list if no required partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk -tempdir:@[value;`tempdir;hdbdir] //location to save data for partandmerge replay +tempdir:@[value;`tempdir;`:tempmergedir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process mergenumtab:@[value;`mergenumtab;`quote`trade!10000 50000]; //specify number of rows per table for merge process @@ -91,7 +91,7 @@ if[trackonly;.lg.o[`replayinit;"messagechunks value is negative - log replay pro messagechunks:abs messagechunks; if[partandmerge and hdbdir = tempdir;.err.ex[`replayinit;"if using partandmerge replay, tempdir must be set to a different directory than the hdb";1]]; -if[partandmerge and sortafterreplay;.err.ex[`replayinit;"if using partandmerge replay, sortafterreplay must be set to 0b";1]]; +if[partandmerge and sortafterreplay;(sortafterreplay:0b; .lg.o[`replayinit;"Setting sortafterreplay to 0b"])]; // load the schema \d . From 52cc0f9e26d343b50ee7d17f5d0d435c08c81897 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Tue, 10 Jan 2017 09:11:59 +0000 Subject: [PATCH 8/9] make compression work with all write downs in tickerlogreplay --- code/processes/tickerlogreplay.q | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 8c171767a..49260acd2 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -14,7 +14,7 @@ tplogdir:@[value;`tplogdir;`] // the tp log directory to read the log files f partitiontype:@[value;`partitiontype;`date] // the partitioning of the database. Can be date, month or year (int would have to be handled bespokely) emptytables:@[value;`emptytables;1b] // whether to overwrite any tables at start up sortafterreplay:@[value;`sortafterreplay;1b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) -basicmode:@[value;`basicmode;0b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] +basicmode:@[value;`basicmode;1b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] exitwhencomplete:@[value;`exitwhencomplete;1b] // exit when the replay is complete checklogfiles:@[value;`checklogfiles;0b] // check if the log file is corrupt, if it is then write a new "good" file and replay it instead gc:@[value;`gc;1b] // garbage collect at appropriate points (after each table save and after the full log replay) @@ -22,7 +22,7 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file -compression:@[value;`compression;()] //specify the compress level, empty list if no required +compression:@[value;`compression;(17 2 4)] //specify the compress level, empty list if no required partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk tempdir:@[value;`tempdir;`:tempmergedir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process @@ -179,7 +179,9 @@ finishreplay:{[h;p;td] savetab[td;h;p] each tabsincountorder[.replay.tablestoreplay]; // sort data and apply the attributes if[sortafterreplay;applysortandattr[.replay.pathlist]]; + if[partandmerge;postreplaymerge[td;p;h]]; + // invoke any user defined post replay function .save.postreplay[h;p]; } @@ -206,6 +208,13 @@ replaylog:{[logfile] .lg.o[`replay;"HDB directory already contains ",(string .replay.replaydate)," partition. Deleting from the HDB directory"]; .os.deldir .os.pth[string .Q.par[hdbdir;.replay.replaydate;`]]; // delete the current dates HDB directory before performing replay ]; + + // set compression level + if[ 3= count compression; + .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; + .z.zd:compression; + .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; + $[basicmode; [.lg.o[`replay;"basicmode set to true, saving down tables with .Q.hdpf"]; .Q.hdpf[`::;hdbdir;partitiontype$.replay.replaydate;`sym]]; @@ -240,11 +249,6 @@ mergemaxrows:{[tabname] mergenumrows^mergenumtab[tabname]}; // post replay function for merge replay, invoked after all the tables have been written down for a given log file postreplaymerge:{[td;p;h] - // set compression level - if[ 3= count compression; - .lg.o[`compression;"setting compression level to (",(";" sv string compression),")"]; - .z.zd:compression; - .lg.o[`compression;".z.zd has been set to (",(";" sv string .z.zd),")"]]; mergelimits:(tabsincountorder[.replay.tablestoreplay],())!({[x] mergenumrows^mergemaxrows[x]}tabsincountorder[.replay.tablestoreplay]),(); // merge the tables from each partition in the tempdir together From cde9058884fad8539622c1afea2e60a81b928347 Mon Sep 17 00:00:00 2001 From: Scott Quigley Date: Wed, 11 Jan 2017 14:41:52 +0000 Subject: [PATCH 9/9] reset default values --- code/processes/tickerlogreplay.q | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/processes/tickerlogreplay.q b/code/processes/tickerlogreplay.q index 49260acd2..14f49e452 100644 --- a/code/processes/tickerlogreplay.q +++ b/code/processes/tickerlogreplay.q @@ -14,7 +14,7 @@ tplogdir:@[value;`tplogdir;`] // the tp log directory to read the log files f partitiontype:@[value;`partitiontype;`date] // the partitioning of the database. Can be date, month or year (int would have to be handled bespokely) emptytables:@[value;`emptytables;1b] // whether to overwrite any tables at start up sortafterreplay:@[value;`sortafterreplay;1b] // whether to re-sort the data and apply attributes at the end of the replay. Sort order is determined by the sortcsv (:config/sort.csv) -basicmode:@[value;`basicmode;1b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] +basicmode:@[value;`basicmode;0b] // do a basic replay, which replays everything in, then saves it down with .Q.hdpf[`::;d;p;`sym] exitwhencomplete:@[value;`exitwhencomplete;1b] // exit when the replay is complete checklogfiles:@[value;`checklogfiles;0b] // check if the log file is corrupt, if it is then write a new "good" file and replay it instead gc:@[value;`gc;1b] // garbage collect at appropriate points (after each table save and after the full log replay) @@ -22,7 +22,7 @@ upd:@[value;`upd;{{[t;x] insert[t;x]}}] // default upd function used for repla sortcsv:@[value;`sortcsv;`:config/sort.csv] //location of sort csv file -compression:@[value;`compression;(17 2 4)] //specify the compress level, empty list if no required +compression:@[value;`compression;()] //specify the compress level, empty list if no required partandmerge:@[value;`partandmerge;0b] //setting to do a replay where the data is partitioned and then merged on disk tempdir:@[value;`tempdir;`:tempmergedir] //location to save data for partandmerge replay mergenumrows:@[value;`mergenumrows;10000000]; //default number of rows for merge process