Skip to content

Commit

Permalink
fix(critical problems)
Browse files Browse the repository at this point in the history
  • Loading branch information
bytadaniel committed Oct 17, 2023
1 parent ec58a64 commit 67a3064
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 52 deletions.
3 changes: 2 additions & 1 deletion jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import type { Config } from 'jest'

const config: Config = {
preset: 'ts-jest',
testEnvironment: 'node'
testEnvironment: 'node',
testTimeout: 60_000
}

export default config
55 changes: 27 additions & 28 deletions src/chunk-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable no-async-promise-executor */
import cuid from 'cuid'
import { Chunk } from './chunk/chunk'
import { ResolverOptions, InsertRow } from './interface'
import { ResolverOptions, InsertRow, Table } from './interface'
import { E_CODES } from './constants'
import { ChunkRegistry } from './chunk-registry'
import { DataWatcher } from './watchers/abstract'
Expand All @@ -12,7 +12,7 @@ import { DiskWatcher } from './watchers/disk.watcher'
import { ProcessWatcher } from './watchers/process.watcher'
import { ChunkFacade } from './chunk/chunk-facade'
import { CacheError } from './errors'
import { sleep } from './utils'
import { chunkList, sleep } from './utils'
import { ConfigError } from './errors/config.error'

type OnResolved = (chunk: ChunkFacade) => void
Expand Down Expand Up @@ -118,10 +118,7 @@ export class ChunkResolver {
* the system removes it from registry and puts to a resolve queue
*/
async #watchRegistry () {
const hasElements = !this.#toResolveQueue.isEmpty()
const hasStopCommand = this.#commandStop

while (hasElements && !hasStopCommand) {
while (!this.#registry.isEmpty() && !this.#commandStop) {
const snapshot = this.#registry.getAll()

for (const state of snapshot) {
Expand Down Expand Up @@ -164,10 +161,7 @@ export class ChunkResolver {
* Resolve is a required process to clenup chunk data from storage (process memory, disk space or cloud)
*/
async #watchToResolveQueue () {
const hasElements = !this.#toResolveQueue.isEmpty()
const hasStopCommand = this.#commandStop

while (hasElements && !hasStopCommand) {
while (!this.#toResolveQueue.isEmpty() && !this.#commandStop) {
const chunk = this.#toResolveQueue.dequeue()
if (!chunk) {
continue
Expand Down Expand Up @@ -233,25 +227,30 @@ export class ChunkResolver {
* @returns
*/
public async cache (table: string, rows: InsertRow[]) {
while (rows.length > 0) {
let chunk = this.#registry.getAll().find(state => state.chunkRef.isUnblocked())?.chunkRef
if (!chunk) {
chunk = new ScratchChunk(this.#dataWatcher, {
table,
liveAtLeastMs: this.#options.chunkLifeMs
})
this.#registry.register(chunk)
}

const rowCountToOverfill = Math.abs(this.#options.chunkSize - chunk.size)
const chunkedRowsList = chunkList(rows, this.#options.chunkSize)

for (const chunkedRows of chunkedRowsList) {
while (chunkedRows.length) {
let chunk: Chunk | undefined = (this.#registry
.getAll()
.find(state => state.chunkRef.size < this.#options.chunkSize)
)?.chunkRef
if (!chunk) {
chunk = new ScratchChunk(this.#dataWatcher, {
table,
liveAtLeastMs: this.#options.chunkLifeMs
})
this.#registry.register(chunk)
}

const chunkRows = rows.splice(0, rowCountToOverfill)
const sizeToSave = Math.abs(this.#options.chunkSize - chunk.size)
const rowsToSave = chunkedRows.splice(0, sizeToSave)

await this.#dataWatcher.save({
chunkRef: chunk,
insertRows: chunkRows
})

await this.#dataWatcher.save({
chunkRef: chunk,
insertRows: rowsToSave
})
}
}

this.#startWatching()
Expand All @@ -275,7 +274,7 @@ export class ChunkResolver {
this.#commandStop = true

// Syncronously backup runtime data
this.#dataWatcher.backup()
this.#dataWatcher.backupRuntimeCache()

process.exit(0)
}
Expand Down
7 changes: 4 additions & 3 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export const enum E_CODES {
E_CACHE_FORBIDDEN = 'E_CACHE_FORBIDDEN',
E_NO_HANDLER = 'E_NO_HANDLER',
E_CONFIG_PARAM_REQUIRED = "E_CONFIG_PARAM_REQUIRED"
E_CACHE_FORBIDDEN = 'E_CACHE_FORBIDDEN',
E_NO_HANDLER = 'E_NO_HANDLER',
E_CONFIG_PARAM_REQUIRED = "E_CONFIG_PARAM_REQUIRED",
E_EMPTY_SAVE = "E_EMPTY_SAVE"
}


Expand Down
2 changes: 1 addition & 1 deletion src/watchers/abstract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export abstract class DataWatcher<
public abstract save (storeContract: SC): Promise<void>
public abstract load(chunkId: ChunkId): Promise<LC>

public abstract backup(): void
public abstract backupRuntimeCache(): void
public abstract restore(): Promise<void>

public abstract cleanup(chunkId: ChunkId): Promise<void>
Expand Down
51 changes: 33 additions & 18 deletions src/watchers/disk.watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { promisify } from 'util';
import { ExistingChunk } from '../chunk/existing-chunk';
import { ChunkRegistry } from '../chunk-registry';
import { Chunk } from '../chunk/chunk';
import { ChunkId } from '../interface';
import { ChunkId, Table } from '../interface';
import { Row } from '../row';
import cuid from 'cuid';
import { CacheError } from '../errors';
import { E_CODES } from '../constants';

export interface FsWatcherOptions extends DataWatcherOptions {
disk: {
Expand Down Expand Up @@ -37,7 +39,10 @@ type OperationId = string
export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsWatcherOptions> {
readonly #registry: ChunkRegistry;
readonly #options: FsWatcherOptions;
readonly #chunkRows: Record<ChunkId, Record<OperationId, Row[]>>
readonly #chunks: Record<ChunkId, {
ref: Chunk,
operations: Record<OperationId, Row[]>
}>

constructor(registry: ChunkRegistry, options: FsWatcherOptions) {
super(options)
Expand All @@ -46,23 +51,31 @@ export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsW
this.#options = options

/**
* Temporal cache storage using to save data on inconsistent period
* Runtime cache storage
*/
this.#chunkRows = {}
this.#chunks = {}
}

public async save(saveContract: SaveContract): Promise<void> {
if (!saveContract.insertRows.length) {
throw new CacheError(E_CODES.E_EMPTY_SAVE)
return
}

if (!this.$isWriteable()) {
await this.$toBeUnblocked()
}

const operationId = cuid()

saveContract.chunkRef.setConsistency(false)
if (!this.#chunkRows[saveContract.chunkRef.id]) {
this.#chunkRows[saveContract.chunkRef.id] = {}
if (!this.#chunks[saveContract.chunkRef.id]) {
this.#chunks[saveContract.chunkRef.id] = {
ref: saveContract.chunkRef,
operations: {}
}
}
this.#chunkRows[saveContract.chunkRef.id][operationId] = saveContract.insertRows
this.#chunks[saveContract.chunkRef.id].operations[operationId] = saveContract.insertRows

const chunkDirectoryExists = await fsExistsAsync(this.#options.disk.outputDirectory)
if (!chunkDirectoryExists) {
Expand Down Expand Up @@ -96,7 +109,7 @@ export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsW
saveContract.insertRows.length
)
saveContract.chunkRef.setConsistency(true)
delete this.#chunkRows[saveContract.chunkRef.id][operationId]
delete this.#chunks[saveContract.chunkRef.id].operations[operationId]
})

}
Expand All @@ -123,8 +136,8 @@ export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsW
}
}

public backup(): void {
for (const chunkId in this.#chunkRows) {
public backupRuntimeCache(): void {
for (const chunkId in this.#chunks) {
const chunkDirectoryExists = fs.existsSync(this.#options.disk.outputDirectory)
if (!chunkDirectoryExists) {
fs.mkdirSync(this.#options.disk.outputDirectory)
Expand All @@ -133,19 +146,19 @@ export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsW
const chunkFilename = `${chunkId}.txt`
const chunkPathname = path.resolve(this.#options.disk.outputDirectory, chunkFilename)

const state = this.#registry.getOne(chunkId)
const chunk = this.#chunks[chunkId].ref

const chunkExists = fs.existsSync(chunkPathname)
if (!chunkExists) {
const metadata = {
table: state.chunkRef.table,
expiresAt: state.chunkRef.expiresAt
table: chunk.table,
expiresAt: chunk.expiresAt
}
fs.writeFileSync(chunkPathname, `${JSON.stringify(metadata)}\n`)
}

for (const operationId in this.#chunkRows[chunkId]) {
const runtimeRows = this.#chunkRows[chunkId][operationId]
for (const operationId in this.#chunks[chunkId].operations) {
const runtimeRows = this.#chunks[chunkId].operations[operationId]

/**
* Need some kind of schema to optimize
Expand All @@ -156,9 +169,11 @@ export class DiskWatcher extends DataWatcher<SaveContract, DiskLoadContract, FsW
.concat('\n')

fs.appendFileSync(chunkPathname, storeData)
this.#registry.increaseSize(chunkId, runtimeRows.length)
state.chunkRef.setConsistency(true)
delete this.#chunkRows[chunkId][operationId]

chunk.setConsistency(true)
delete this.#chunks[chunkId].operations[operationId]

console.log(`Succesfully backed up operation ${operationId} of chunk ${chunkId} with ${runtimeRows.length} rows`)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/watchers/process.watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class ProcessWatcher extends DataWatcher {
public async cleanup(): Promise<void> {}

// eslint-disable-next-line @typescript-eslint/no-empty-function
public backup(): void {}
public backupRuntimeCache(): void {}

// eslint-disable-next-line @typescript-eslint/no-empty-function
public async restore() {}
Expand Down

0 comments on commit 67a3064

Please sign in to comment.