Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define cache layer loader to load parquet files to in-memory duckDB #154

Closed
Tracked by #150
kokokuo opened this issue Apr 6, 2023 · 0 comments · Fixed by #162
Closed
Tracked by #150

Define cache layer loader to load parquet files to in-memory duckDB #154

kokokuo opened this issue Apr 6, 2023 · 0 comments · Fixed by #162

Comments

@kokokuo
Copy link
Contributor

kokokuo commented Apr 6, 2023

What’s the problem you're trying to solve

In #150, In order to enhance our query performance after users send the API request to run our data endpoint to get the result from the data source. We need to provide a Caching (pre-loading) Layer with the duckDB to enhance query performance.

Describe the solution you’d like

In order to send the query result from a data source and keep in the cache layer with duckDB after users define the cache config in schema YAML, we need a method to keep the query result and pass it to duckDB by creating the table.

Export to parquet format

The method is to keep the query result in a Parquet format file, the reason is that we could make the result portable and put it in a local or remote place and share it with other users to use.

The directory format is [templateName]/[profileName]/[cachedTableName] and each data source create it own file name *(maybe a lot of partition file) [folderPartitionName].parquet and put in the directory.

The time to export result to parquet and load to duckDB

Otherwise, we decide to send the query result from a data source and keep it to parquet, and when to read the parquet files and load them to duckDB when users type vulcan serve to start our vulcan server. The reason is to prevent making the workload happen when API receives:

  • If not find any parquet files in the [templateName]/[profileName]/[cachedTableName] directory :
    1. Read data sources data and export to parquet
    2. Then read Parquet to load data to duckDB.
  • If exist parquet files in the [templateName]/[profileName]/[cachedTableName] direcotry, check if refreshExpression or addRefreshTime is set and triggered and do the refresh step:
    1. Read data sources data and export to parquet
    2. Then read Parquet to load data to duckDB.

Define CacheLayerLoader to control the flow

Here we define CacheLayerLoader class to implement delegating data sources export query results to parquet files according to artifact, and then get the parquet file and load parquet to duckDB.

  • The CacheLayerLoader injects all DataSource when the container is loading.
  • Define a method named preload :
    • The preload calls the export from each DataSource and keeps the location and in key mapper to keep cachedTableName and [templateName]/[profileName]/[cachedTableName] directory. Then reading all parquets in the [templateName]/[profileName]/[cachedTableName] directory for loading to duckDB is needed.
  • Load all parquet files data in the [templateName]/[profileName]/[cachedTableName] directory and create cacheTableName in duckDB by preload of CacheLayerLoader after all parquet exported.

Additional Context

According to the above solution, here are some tips that need to implement:

How to know where the parquets files save and get?

we could set the parameter in vulcan.yaml like template and make what the loader be our cache layer storage, export the sql result to what format and load to our cache layer.

cache:
  type: parquet
  folderPath: cache-files
  loader: duckdb

So we need to add cache key in ICoreOptions with ICacheLayerOptions interface and a CacheLayerOptions class to validate all fields of the cache key in vulcan.yaml.

Define a method contract for exporting results to parquet in DataSource

Define a export method contract and make each data sources could implement in other issues:

export interface ExportOptions {
  // The sql query result to export
  sql: string;
  // The directory to export result to file
  directory: string;
  // The profile name to select to export data
  profileName: string;
  // export file format type
  type: CacheLayerStoreFormatType | string;
}
export interface ImportOptions {
  // The table name to create from imported file data
  tableName: string;
  // The directory to import cache files
  directory: string;
  // The profile name to select to import data
  profileName: string;
  // default schema
  schema: string;
  // import file format type
  type: CacheLayerStoreFormatType | string;
}

@VulcanExtension(TYPES.Extension_DataSource, { enforcedId: true })
export abstract class DataSource<
  C = any,
  PROFILE = Record<string, any>
> extends ExtensionBase<C> {
  private profiles: Map<string, Profile<PROFILE>>;

  constructor(
    @inject(TYPES.ExtensionConfig) config: C,
    @inject(TYPES.ExtensionName) moduleName: string,
    @multiInject(TYPES.Profile) @optional() profiles: Profile[] = []
  ) {
    super(config, moduleName);
    this.profiles = profiles.reduce(
      (prev, curr) => prev.set(curr.name, curr),
      new Map()
    );
  }
  ....

  /**
   * Export query result data to cache file for cache layer loader used
   */
  public export(options: ExportOptions): Promise<void> {
    throw new Error(`Export method not implemented`);
  }

  /**
   * Import data to create table from cache file for cache layer loader used
   */
  public import(options: ImportOptions): Promise<void> {
    throw new Error(`import method not implemented`);
  }
}

How to load parquet to duckDB code:

Here is a simple example to show how to load parquet to duckDB:

import * as duckdb from 'duckdb';

const db = new duckdb.Database(':memory:');
// create a table from a parquet file in memory.
db.run(
  'CREATE TABLE people AS SELECT * FROM read_parquet(?)',
  'candidate.parquet',
);
// select all rows from the table
const statement = db.prepare('SELECT * FROM people');
const queryResult = await statement.stream();
const firstChunk = await queryResult.nextChunk();

// get the columns and data stream. "getColumns" and "getData" defined in vulcan
const columns = getColumns(firstChunk);
const dataStream = getData(queryResult, firstChunk);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: In process
1 participant