Skip to content

Commit

Permalink
release v3.0.0 with disk storage ability
Browse files Browse the repository at this point in the history
  • Loading branch information
bytadaniel committed Oct 15, 2023
1 parent 42d5465 commit 6c06118
Show file tree
Hide file tree
Showing 23 changed files with 838 additions and 768 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
lib
chunks
node_modules
sandbox.ts
.DS_Store
2 changes: 2 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
src
chunks
tsconfig.json
sandbox.ts
.DS_Store
78 changes: 51 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# clickcache by @bytadaniel

[![Join the chat at https://gitter.im/bytadaniel/clickcache](https://badges.gitter.im/bytadaniel/clickcache.svg)](https://gitter.im/bytadaniel/clickcache?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

<a href="https://www.npmjs.com/package/clickcache" alt="NPM latest version"><img src="https://img.shields.io/npm/v/clickcache.svg"></a>
<a href="https://www.npmjs.com/package/clickcache" alt="NPM total downloads"><img src="https://img.shields.io/npm/dt/clickcache.svg"></a>
Expand All @@ -10,62 +9,87 @@

![GitHub followers](https://img.shields.io/github/followers/bytadaniel?style=social)


## Why

In my data work, I encountered the need for a straightforward way to asynchronously gather small pieces of data into larger batches and efficiently transmit them to Clickhouse.
To address this requirement, I developed the `clickcache` package.

`сlickcache` excels at working not only with the [official clickhouse client](https://github.com/ClickHouse/clickhouse-js) but also with third-party clients.
It does so by delegating the read/write work to them while focusing on data aggregation in one central location and preparing it for insertion.

## Roadmap

This module was tested in production for about a year being a part of [clickhouse-ts](https://www.npmjs.com/package/clickhouse-ts) package, my Clickhouse client.
In order to clean up the code, I decided to separate theis caching module into its own repository.
This cache collector will support of is actually supporting caching data

- ✅ in the process memory
- 🏗 on the hard disk
- 🏗 in RAM
- 🏗 in S3 Object Storage
- ✅ on the hard disk
- 🏗 in the cloud

## Usage

```bash
npm install clickcache
```

```js
const config = {
// choose the way to store data
dataWatcher: 'process' // 'process' | 'disk'
// set the time to live limit for batches
ttlMs: 60_000,
// set the max size limit for batches
maxSize: 1_000,
// set the check interval
// it is normal to check batches state 5-10 times per TTL
checkIntervalMs: 10_000
}

const resolver = new ChunkResolver(new ProcessWatcher(), config)
// define the singleton resolver instance
const resolver = new ChunkResolver(config)

// set as much handlers as you need

// sync method
// sync handler to log chunk output
resolver.onResolved(chunk => {
handleInsertion(
chunk.table,
chunk.getRows().then(rows => {
// fooboo
})
)
console.log(chunk.id)
console.log(chunk.size)
})

// enqueued async resolving
resolver.onAsyncResolved(async chunk => {
await saveToClickhouse(chunk.table, await chunk.getRows())
// async handler to pass data in clickhouse storage
resolver.onResolved(async chunk => {
const myRows = await chunk.loadRows()
await clickhouseClient
.insertFunction(myTable, myRows)
.then(() => console.log('Hurrah! My data is saved!'))
.catch(e => resolver.cache(myTable, myRows))
})

onGracefulStutdown(() => resolver.resolveImmediately())
// use this method to cache a few rows or a single row
// it will be stored and collected to a huuuge batch of data
const chunk = await resolver.cache(myTable, rows)
```

const chunk = await resolver.cache('table', rows)
# How it works

chunk.isOverfilled() // boolean
chunk.isExpired() // boolean
chunk.isUnblocked() // boolean
chunk.block()
chunk.unblock()
chunk.size // length
chunk.createdAt // unix
chunk.expiresAt // unix
```
This package contains some enities

- `ChunkResolver`
- `ChunkRegistry`
- `DataWatcher`
- `Chunk`

It collects many single rows by uning `ChunkResolver`, then arranges these rows to chunks. When the chunk is ready, `ChunkResolver` passes it to your your handlers, where you are able to prohibit database insertion

`Chunk` has a relation with `ChunkRegistry` and `DataWatcher`

`ChunkRegistry` is a in-memory storage shared within all parts of the core functionality. It contains chunk metadata such as chunk state (is blocked or not, is consistent or not, is expired or not etc) and chunk refs itself

`Chunk` has a relation with the stored data though `DataWatcher` and can `load` it according your need

`DataWatcher` is an abstract entity which interacts with the data. Data can be stored in `process memory`, `disk storage` and `cloud`. Data watcher can store and restore your data.

For example, you are using the `disk storage` watcher. You are caching your data and someshing goes wrong with the main process. It restarts, restores the last state of data and concistently resolves it

It is not possible to restore the data by using `process memory` data watcher
8 changes: 4 additions & 4 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export * from './src/watchers/process.watcher'
// export * from './src/data-tracker/fs.data-tracker' // TODO
export * from './src/chunk-resolver'
import { ChunkResolver } from './src/chunk-resolver'

export * from './src/interface'
export * from './src/errors'

export * from './src/interface'
export { ChunkResolver }
92 changes: 92 additions & 0 deletions src/chunk-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Chunk } from './chunk/chunk'
import { ChunkId, Table } from './interface'

interface ChunkState {
chunkRef: Chunk,
size: number,
expiresAt: number
}

interface RegistryState {
chunkTable: Record<string, string>,
tableChunk: Record<string, string>,
chunks: Record<ChunkId, ChunkState>
}

/**
* InMemoryPool is a subset of ChunkPool abstraction
* This pool saves rows in chunks that are stored in process memory
*
* @warning storing data in process memory can lead you to LOSS OF CONSISTANCE
* but instead of this disadvantage you gain the cheapest and the most simple way to work
* with clickhouse
*
* Data loss can only occur in two cases:
* 1. When OS sends to a process SIGKILL code which is killing your process without grace
* 2. When some piece of data contains anomalies such as `undefined` etc
*/
export class ChunkRegistry {
#registry: RegistryState

/**
* Create InMemoryPool instance
*/
constructor () {
this.#registry = {
tableChunk: {},
chunkTable: {},
chunks: {}
}
}

public increaseSize (id: ChunkId, delta: number) {
this.#registry.chunks[id].size += delta
this.#registry.chunks[id].chunkRef.size += delta
}

public decreaseSize (id: ChunkId, delta: number) {
this.#registry.chunks[id].size -= delta
this.#registry.chunks[id].chunkRef.size -= delta
}

public register (chunk: Chunk) {
this.#registry.chunkTable[chunk.id] = chunk.table
this.#registry.tableChunk[chunk.table] = chunk.id

this.#registry.chunks[chunk.id] = {
chunkRef: chunk,
expiresAt: chunk.expiresAt,
size: chunk.size
}
}

public unregister (id: ChunkId) {
if (this.#registry.chunks[id]) {
const table = this.#registry.chunkTable[id]

delete this.#registry.tableChunk[table]
delete this.#registry.chunkTable[id]
delete this.#registry.chunks[id]
}
}

public getOne (id: ChunkId): ChunkState {
return this.#registry.chunks[id]
}

public getAll(): ChunkState[] {
return Object.values(this.#registry.chunks)
}

public isEmpty(): boolean {
return Object.keys(this.#registry.chunks).length === 0
}

public getTables(): Table[] {
return Object.keys(this.#registry.tableChunk)
}

public getChunks(): ChunkId[] {
return Object.keys(this.#registry.chunks)
}
}

0 comments on commit 6c06118

Please sign in to comment.