Skip to content
This repository was archived by the owner on May 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './redisClient'
export * from './redisKeyValueDB'
export * from './redisHashKeyValueDB'
132 changes: 132 additions & 0 deletions src/redisClient.manual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { localTime } from '@naturalcycles/js-lib'
import { RedisClient } from './redisClient'

let client: RedisClient

beforeAll(() => {
client = new RedisClient()
})

beforeEach(async () => {
await client.dropTable('test')
})

afterAll(async () => {
await client.dropTable('test')
await client.disconnect()
})

describe('hashmap functions', () => {
test('hset should save a map', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hgetall('test:key')

expect(result).toEqual({ foo: 'bar' })
})

test('should store/fetch numbers as strings', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hgetall('test:key')

expect(result).toEqual({ one: '1' })
})

test('hgetall should not fetch nested objects', async () => {
await client.hset('test:key', { nested: { one: 1 } })

const result = await client.hgetall('test:key')

expect(result).toEqual({ nested: '[object Object]' })
})

test('hget should fetch map property', async () => {
await client.hset('test:key', { foo: 'bar' })

const result = await client.hget('test:key', 'foo')

expect(result).toBe('bar')
})

test('hget should fetch value as string', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hget('test:key', 'one')

expect(result).toBe('1')
})

test('hmgetBuffer should get the values of the fields as strings', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmget('test:key', ['one', 'three'])

expect(result).toEqual(['1', '3'])
})

test('hmgetBuffer should get the values of the fields as buffers', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hmgetBuffer('test:key', ['one', 'three'])

expect(result).toEqual([Buffer.from('1'), Buffer.from('3')])
})

test('hincr should change the value and return with a numeric result', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one', -2)

expect(result).toBe(-1)
})

test('hincr should increase the value by 1 by default', async () => {
await client.hset('test:key', { one: 1 })

const result = await client.hincr('test:key', 'one')

expect(result).toBe(2)
})

test('hincr should set the value to 1 for a non-existing field', async () => {
const result = await client.hincr('test:key', 'one')

expect(result).toBe(1)
})

test('hscanCount should return the number of keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hscanCount('test:key', {})

expect(result).toBe(3)
})

test('hscanCount with a match pattern should return the number of matching keys in the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

const result = await client.hscanCount('test:key', { match: 't*' })

expect(result).toBe(2)
})

test('hdel should delete a fields from the hash', async () => {
await client.hset('test:key', { one: 1, two: 2, three: 3 })

await client.hdel('test:key', ['two', 'three'])

const result = await client.hgetall('test:key')
expect(result).toEqual({ one: '1' })
})

test('hsetWithTTL should set the fields with expiry', async () => {
const now = localTime.now().unix

await client.hsetWithTTL('test:key', { foo1: 'bar' }, now + 1000)
await client.hsetWithTTL('test:key', { foo2: 'bar' }, now - 1)

const result = await client.hgetall('test:key')
expect(result).toEqual({ foo1: 'bar' })
})
})
63 changes: 61 additions & 2 deletions src/redisClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
AnyObject,
CommonLogger,
NullableBuffer,
NullableString,
Expand Down Expand Up @@ -124,6 +125,38 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value)
}

async hgetall<T extends Record<string, string> = Record<string, string>>(
key: string,
): Promise<T | null> {
const result = await this.redis().hgetall(key)
if (Object.keys(result).length === 0) return null
return result as T
}

async hget(key: string, field: string): Promise<NullableString> {
return await this.redis().hget(key, field)
}

async hset(key: string, value: AnyObject): Promise<void> {
await this.redis().hset(key, value)
}

async hdel(key: string, fields: string[]): Promise<void> {
await this.redis().hdel(key, ...fields)
}

async hmget(key: string, fields: string[]): Promise<NullableString[]> {
return await this.redis().hmget(key, ...fields)
}

async hmgetBuffer(key: string, fields: string[]): Promise<NullableBuffer[]> {
return await this.redis().hmgetBuffer(key, ...fields)
}

async hincr(key: string, field: string, increment: number = 1): Promise<number> {
return await this.redis().hincrby(key, field, increment)
}

async setWithTTL(
key: string,
value: string | number | Buffer,
Expand All @@ -132,6 +165,16 @@ export class RedisClient implements CommonClient {
await this.redis().set(key, value, 'EXAT', expireAt)
}

async hsetWithTTL(key: string, value: AnyObject, expireAt: UnixTimestampNumber): Promise<void> {
const valueKeys = Object.keys(value)
const numberOfKeys = valueKeys.length
const keyList = valueKeys.join(' ')
const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
const [command, ...args] = commandString.split(' ')
await this.redis().hset(key, value)
await this.redis().call(command!, args)
}

async mset(obj: Record<string, string | number>): Promise<void> {
await this.redis().mset(obj)
}
Expand All @@ -140,8 +183,8 @@ export class RedisClient implements CommonClient {
await this.redis().mset(obj)
}

async incr(key: string): Promise<number> {
return await this.redis().incr(key)
async incr(key: string, by: number = 1): Promise<number> {
return await this.redis().incrby(key, by)
}

async ttl(key: string): Promise<number> {
Expand Down Expand Up @@ -205,6 +248,22 @@ export class RedisClient implements CommonClient {
return count
}

hscanStream(key: string, opt: ScanStreamOptions): ReadableTyped<string[]> {
return this.redis().hscanStream(key, opt)
}

async hscanCount(key: string, opt: ScanStreamOptions): Promise<number> {
let count = 0

const stream = this.redis().hscanStream(key, opt)

await stream.forEach((keyValueList: string[]) => {
count += keyValueList.length / 2
})

return count
}

async withPipeline(fn: (pipeline: ChainableCommander) => Promisable<void>): Promise<void> {
const pipeline = this.redis().pipeline()
await fn(pipeline)
Expand Down
144 changes: 144 additions & 0 deletions src/redisHashKeyValueDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {
CommonKeyValueDBSaveBatchOptions,
CommonDBCreateOptions,
CommonKeyValueDB,
KeyValueDBTuple,
} from '@naturalcycles/db-lib'
import { _chunk, StringMap } from '@naturalcycles/js-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { RedisClient } from './redisClient'
import { RedisKeyValueDBCfg } from './redisKeyValueDB'

export interface RedisHashKeyValueDBCfg extends RedisKeyValueDBCfg {
hashKey: string
}

export class RedisHashKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
client: RedisClient
keyOfHashField: string

constructor(cfg: RedisHashKeyValueDBCfg) {
this.client = cfg.client
this.keyOfHashField = cfg.hashKey
}

async ping(): Promise<void> {
await this.client.ping()
}

async [Symbol.asyncDispose](): Promise<void> {
await this.client.disconnect()
}

async getByIds(table: string, ids: string[]): Promise<KeyValueDBTuple[]> {
if (!ids.length) return []
// we assume that the order of returned values is the same as order of input ids
const bufs = await this.client.hmgetBuffer(this.keyOfHashField, this.idsToKeys(table, ids))
return bufs.map((buf, i) => [ids[i], buf] as KeyValueDBTuple).filter(([_k, v]) => v !== null)
}

async deleteByIds(table: string, ids: string[]): Promise<void> {
if (!ids.length) return
await this.client.hdel(this.keyOfHashField, this.idsToKeys(table, ids))
}

async saveBatch(
table: string,
entries: KeyValueDBTuple[],
opt?: CommonKeyValueDBSaveBatchOptions,
): Promise<void> {
if (!entries.length) return

const entriesWithKey = entries.map(([k, v]) => [this.idToKey(table, k), v])
const map: StringMap<any> = Object.fromEntries(entriesWithKey)

if (opt?.expireAt) {
await this.client.hsetWithTTL(this.keyOfHashField, map, opt.expireAt)
} else {
await this.client.hset(this.keyOfHashField, map)
}
}

streamIds(table: string, limit?: number): ReadableTyped<string> {
let stream = this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const keys: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 0) return
keys.push(keyOrValue)
})
return this.keysToIds(table, keys)
})

if (limit) {
stream = stream.take(limit)
}

return stream
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const values: string[] = []
keyValueList.forEach((keyOrValue, index) => {
if (index % 2 !== 1) return
values.push(keyOrValue)
})
return values.map(v => Buffer.from(v))
})
.take(limit || Infinity)
}

streamEntries(table: string, limit?: number | undefined): ReadableTyped<KeyValueDBTuple> {
return this.client
.hscanStream(this.keyOfHashField, {
match: `${table}:*`,
})
.flatMap(keyValueList => {
const entries = _chunk(keyValueList, 2)
return entries.map(([k, v]) => {
return [this.keyToId(table, String(k)), Buffer.from(String(v))] satisfies KeyValueDBTuple
})
})
.take(limit || Infinity)
}

async count(table: string): Promise<number> {
return await this.client.hscanCount(this.keyOfHashField, {
match: `${table}:*`,
})
}

async increment(table: string, id: string, by: number = 1): Promise<number> {
return await this.client.hincr(this.keyOfHashField, this.idToKey(table, id), by)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

await this.client.dropTable(table)
}

private idsToKeys(table: string, ids: string[]): string[] {
return ids.map(id => this.idToKey(table, id))
}

private idToKey(table: string, id: string): string {
return `${table}:${id}`
}

private keysToIds(table: string, keys: string[]): string[] {
return keys.map(key => this.keyToId(table, key))
}

private keyToId(table: string, key: string): string {
return key.slice(table.length + 1)
}
}
4 changes: 4 additions & 0 deletions src/redisKeyValueDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ export class RedisKeyValueDB implements CommonKeyValueDB, AsyncDisposable {
})
}

async increment(table: string, id: string, by: number = 1): Promise<number> {
return await this.client.incr(this.idToKey(table, id), by)
}

async createTable(table: string, opt?: CommonDBCreateOptions): Promise<void> {
if (!opt?.dropIfExists) return

Expand Down
Loading