Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions dataloader/dataloader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# `dataloader.q` – Loading delimited data and creating databases for kdb+


This package is used for automated customisable dataloading and database creation and is a generalisation of http://code.kx.com/wiki/Cookbook/LoadingFromLargeFiles.
Package employs a chunk-based loading strategy to minimize memory usage when processing large datasets. Rather than loading entire datasets into memory before writing to disk, data is processed incrementally in manageable chunks.

The memory footprint is determined by the maximum of:
- Memory required to load and save one data chunk
- Memory required to sort the final resultant table

This approach enables processing of large data volumes with a relatively small memory footprint. Example use cases include:
- Large File Processing: Loading very large files by processing them in small, sequential chunks
- Cross-Partition Loading: Efficiently handling distributed data across multiple small files (e.g., separate monthly files for AAPL and MSFT data)

The chunked architecture ensures scalable performance regardless of total dataset size, making it suitable for processing datasets that would otherwise exceed available system memory.
When all the data is written, the on-disk data is re-sorted and the attributes are applied.

---

## :sparkles: Features

- Load delimited data from disk directory in customisable chunks
- Persist data to disk in partitioned format
- Dynamically sort and apply attributes to tables in the resulting database
- Configure compression of database

---

## :gear: Initialisation

After loading the package into the session, the `loadallfiles` function is ready to run. By default, tables will be written with the `p` attribute applied to the `sym` column and sorted.

### :mag_right: Custom sorting parameters

By default, the sorting paramters for all tables are:
```
tabname att column sort
-----------------------
default p sym 1
```

That is, for every table the `p` attribute will be applied to the `sym` column and sorted. If the table being loaded requires different attributes applied on different columns, custom sorting parameters can be added using the `addsortparams` function. This takes 4 inputs: tabname, att, column, and sort. These arguments are used to determine how the tables in the resulting database should be sorted and where attributes applied when being persisted. Furthermore, this will add (or update existing) parameters for the specified table.

You may apply default sorting and attributes to all tables loaded in by the package by passing in the `tabname` with a value of `default` and specifying your default sorting and attribute parameters. By passing in `default` this will overwrite the current default paramters.

If no sorting or attributes are required pass in the dictionary with a `tabname` with `default`, `att` and `column` with backticks and `sort` with `0b`, examples shown below:
```q
dataloader:use`dataloader
dataloader.addsortparams[`tabname`att`column`sort!(`default;`;`;0b)] / Overwrite default to apply no sorting or attributes
dataloader.addsortparams[`tabname`att`column`sort!(`default;`p;`sym;1b)] / Overwrite default to sort all tables loaded in by the sym column and apply the parted attribute
dataloader.addsortparams[`tabname`att`column`sort!(`default`trade`quote;`p`s`;`sym`time`;110b)] / Apply default to all tables, however, sort trade by sym and apply `p and if quote is read in by the function then do not sort or apply attributes
```
The dictionary arguments are outlined below.

| Input | Type | Description |
|-----------|-----------------------|----------------------------------------------------------------------------------|
| `tabname` | symbol/ symbol list | Name of table |
| `att` | symbol/ symbol list | Attributes corresponding to the table names |
| `column` | symbol/ symbol list | Columns to sort and apply attributes to |
| `sort` | boolean /boolean list | Determines if the corresponding table will be sorted (1b: sorted; 0b:not sorted) |

---

### :rocket: Functions

`loadallfiles` is the primary function used to load in all data and create the database. The function takes two arguments, a dictionary of loading parameters and a directory containing files to read. The function reads in all specified delimited files into memory from a chosen directory then proceeds to apply any required processing, persists the table to disk in a kdb+ partitioned format, compresses the files if directed and finally sorting and applying attributes.


## :mag_right: Params in depth
The dictionary should/can have the following fields:

| Parameter | Required | Type | Description |
|-------------------|----------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `headers` | Y | symbol | Names of the header columns in the file |
| `types` | Y | char list | Data types to read from the file |
| `separator` | Y | char list | Delimiting character. Enlist it if first line of file is header data |
| `tablename` | Y | symbol | Name of table to write data to |
| `dbdir` | Y | symbol | Directory to write data to |
| `symdir` | N | symbol | Directory to enumerate against |
| `enumname` | N | symbol | Name of symfile to enumerate against. Default is `sym |
| `partitiontype` | N | symbol | Partitioning to use. Must be one of `date, month, year, int`. Default is `date` |
| `partitioncol` | N | symbol | Column to use to extract partition information.Default is `time` |
| `dataprocessfunc` | N | function | Diadic function to process data after it has been read in. First argument is load parameters dictionary, second argument is data which has been read in. Default is `{[x;y] y}` |
| `chunksize` | N | int | Data size in bytes to read in one chunk. Default is `100 MB` |
| `compression` | N | int list | Compression parameters to use e.g. `17 2 6`. Default is empty list for no compression |
| `gc` | N | boolean | Whether to run garbage collection at appropriate points. Default is `0b` (false) |
| `filepattern` | N | char list | Pattern used to only load certain files e.g. `".csv"`,`("*.csv","*.txt")` |

The second parameter is a directory handle .e.g
```q
`:dir
```

---

### :test_tube: Example

```q
dataloader:use`dataloader

// If using custom sorting parameters, check they are as expected
dataloader.sortparams[]
tabname att column sort
-----------------------
default p sym 1

// Read in data and create db
dataloader.loadallfiles[`headers`types`separator`tablename`dbdir!(`sym`time`price`volume`mktflag`cond`exclude;"SPFICHB";",";`trade;`:hdb);`:TRADE/toload]

//load in db
\l hdb

//check table and sorting
select from trade

date sym time price volume mktflag cond exclude
-------------------------------------------------------------------------------
2025.07.15 AAPL 2025.07.15D01:17:08.000000000 266 3980 B 10 1
2025.07.15 AAPL 2025.07.15D01:44:42.000000000 278 31 B 12 1
2025.07.15 AAPL 2025.07.15D02:05:37.000000000 34 8699 S 21 0
2025.07.15 AAPL 2025.07.15D02:06:02.000000000 97 1769 B 29 1
2025.07.15 AAPL 2025.07.15D02:14:24.000000000 106 8138 B 2 1
2025.07.15 AAPL 2025.07.15D02:40:33.000000000 61 2611 B 36 1
2025.07.15 AAPL 2025.07.15D03:29:37.000000000 31 4240 B 15 1

// Ensure attributes are applied
meta trade
c | t f a
-------| -----
date | d
sym | s p
time | p
price | f
volume | i
mktflag| c
cond | h
exclude| b
```
59 changes: 59 additions & 0 deletions dataloader/dataloader.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/ generic dataloader library

/ loads data in from delimited file, applies processing function, enumerates and writes to db. NOTE: it is not trivial to check user has inputted headers correctly, assume they have
loaddata:{[loadparams;rawdata]
/ check if first row matches headers provided
data:$[(`$"," vs rawdata 0)~loadparams`headers;
(loadparams`types`separator)0:rawdata;
flip loadparams[`headers]!(loadparams`types`separator)0:rawdata
];
if[not loadparams[`filename]in filesread;filesread,:loadparams`filename];
data:0!loadparams[`dataprocessfunc].(loadparams;data);
/ if enumname provided, use it, otherwise default to `sym
domain:(`sym;loadparams`enumname)`enumname in key loadparams;
data:.Q.ens[loadparams(`dbdir`symdir)`symdir in key loadparams;data;domain];
wd:writedatapartition[data]. loadparams`dbdir`partitiontype`partitioncol`tablename;
/ run the writedatapartition function for each partition
wd each distinct loadparams[`partitiontype]$data loadparams`partitioncol;
if[loadparams`gc;.Q.gc[]];
};

/ write data for provdided database and partition
writedatapartition:{[data;dbdir;partitiontype;partitioncol;tablename;partition]
towrite:data where partition=partitiontype$data partitioncol;
writepath:` sv .Q.par[dbdir;partition;tablename],`;
.[upsert;(writepath;towrite);{'"failed to save table: ",x}];
.z.m.partitions[writepath]:(tablename;partition);
};

/ adds compression, sorting and attributes selected
finish:{[loadparams]
/ temporarily set compression defaults
if[count loadparams`compression;.z.zd:loadparams`compression];
{.z.m.util.sorttab[.z.m.sp](x;where .z.m.partitions[;0]=x)}each distinct value .z.m.partitions[;0];
system"x .z.zd";
if[loadparams`gc;.Q.gc[]];
};

/ load all the files from a specified directory
loadallfiles:{[loadparams:.z.m.util.paramfilter;dir]
.z.m.partitions:()!();
.z.m.filesread:();
/ get the contents of the directory based on optional filepattern
filelist:$[`filepattern in key loadparams;
key[dir:hsym dir]where key[dir]like first loadparams`filepattern;
key dir:hsym dir];
filelist:` sv'dir,'filelist;
{[loadparams;file].Q.fsn[loaddata loadparams,(enlist`filename)!enlist file;file;loadparams`chunksize]}[loadparams]each filelist;
.z.m.finish loadparams;
};

/ set default sorting parameters
sp:flip`tabname`att`column`sort!(1#`default;`p;`sym;1b);
sortparams:{[].z.m.sp};

/ add custom sorting parameters to the sortparams table
addsortparams:{[tabname;att;column;sort]
x:flip(flip sortparams[]),'(tabname;att;column;sort);
.z.m.sp:select from x where i=(last;i)fby tabname;
};
6 changes: 6 additions & 0 deletions dataloader/init.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/ load core dataloader functions
\l ::dataloader.q
/ load util submodule
util:use`dataloader.util
/ expose public function
export:([loadallfiles:loadallfiles;addsortparams:addsortparams;sortparams:sortparams])
Loading