Skip to content

Commit d10fe61

Browse files
feat: storage cleanup job (#29)
1 parent 53719b9 commit d10fe61

File tree

16 files changed

+196
-46
lines changed

16 files changed

+196
-46
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ data/
1010
.data
1111
.idea
1212
.DS_Store
13-
.test-data/
13+
.test-data/

docs/content/1.getting-started/1.index.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ The storage driver to use for storing cache data. For more information, see [Sto
4949

5050
The database driver to use for storing cache metadata. For more information, see [Database Drivers](/getting-started/database-drivers).
5151

52+
#### `CLEANUP_OLDER_THAN_DAYS`
53+
54+
- Default: `90`
55+
56+
The number of days to keep stale cache data before deleting it. Set to `0` to disable cache cleanup.
57+
5258
#### `NITRO_PORT`
5359

5460
- Default: `3000`

lib/db/index.ts

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { migrations } from '~/lib/db/migrations'
66
import { ENV } from '~/lib/env'
77
import { logger } from '~/lib/logger'
88

9+
import type { Selectable } from 'kysely'
10+
911
export interface Database {
1012
cache_keys: CacheKeysTable
1113
}
@@ -14,6 +16,7 @@ export interface CacheKeysTable {
1416
key: string
1517
version: string
1618
updated_at: string
19+
accessed_at: string
1720
}
1821

1922
async function initializeDatabase() {
@@ -106,31 +109,65 @@ export async function findKeyMatch(opts: { key: string; version: string; restore
106109
}
107110
}
108111

109-
export async function touchKey(key: string, version: string) {
110-
const now = new Date()
112+
export async function updateOrCreateKey(key: string, version: string, date?: Date) {
113+
const now = date ?? new Date()
111114
const updateResult = await db
112115
.updateTable('cache_keys')
113116
.set('updated_at', now.toISOString())
117+
.set('accessed_at', now.toISOString())
114118
.where('key', '=', key)
115119
.where('version', '=', version)
116120
.executeTakeFirst()
117121
if (Number(updateResult.numUpdatedRows) === 0) {
118-
await createKey(key, version)
122+
await createKey(key, version, date)
119123
}
120124
}
121125

122-
export async function createKey(key: string, version: string) {
123-
const now = new Date()
126+
export async function touchKey(key: string, version: string, date?: Date) {
127+
const now = date ?? new Date()
128+
await db
129+
.updateTable('cache_keys')
130+
.set('accessed_at', now.toISOString())
131+
.where('key', '=', key)
132+
.where('version', '=', version)
133+
.execute()
134+
}
135+
136+
export async function findStaleKeys(olderThanDays: number | undefined, date?: Date) {
137+
if (olderThanDays === undefined) return db.selectFrom('cache_keys').selectAll().execute()
138+
139+
const now = date ?? new Date()
140+
const threshold = new Date(now.getTime() - olderThanDays * 24 * 60 * 60 * 1000)
141+
return db
142+
.selectFrom('cache_keys')
143+
.where('accessed_at', '<', threshold.toISOString())
144+
.selectAll()
145+
.execute()
146+
}
147+
148+
export async function createKey(key: string, version: string, date?: Date) {
149+
const now = date ?? new Date()
124150
await db
125151
.insertInto('cache_keys')
126152
.values({
127153
key,
128154
version,
129155
updated_at: now.toISOString(),
156+
accessed_at: now.toISOString(),
130157
})
131158
.execute()
132159
}
133160

134-
export async function pruneKeys() {
135-
await db.deleteFrom('cache_keys').execute()
161+
export async function pruneKeys(keys?: Selectable<CacheKeysTable>[]) {
162+
if (!keys) await db.deleteFrom('cache_keys').execute()
163+
else
164+
await db.transaction().execute(async (tx) => {
165+
for (const { key, version } of keys) {
166+
await tx
167+
.deleteFrom('cache_keys')
168+
.where('key', '=', key)
169+
.where('version', '=', version)
170+
.execute()
171+
}
172+
})
136173
}

lib/db/migrations.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export const migrations = (dbType: DatabaseDriverName) =>
1313
col.notNull(),
1414
)
1515
.addColumn('updated_at', 'text', (col) => col.notNull())
16+
.addColumn('accessed_at', 'text', (col) => col.notNull())
1617
.addPrimaryKeyConstraint('pk', ['key', 'version'])
1718

1819
if (dbType === 'mysql') query = query.modifyEnd(sql`engine=InnoDB CHARSET=latin1`)

lib/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { logger } from '~/lib/logger'
44

55
const envSchema = z.object({
66
URL_ACCESS_TOKEN: z.string().min(1),
7+
CLEANUP_OLDER_THAN_DAYS: z.coerce.number().int().min(1).default(90),
78
API_BASE_URL: z.string().min(1),
89
STORAGE_DRIVER: z.string().toLowerCase().default('filesystem'),
910
DB_DRIVER: z.string().toLowerCase().default('sqlite'),

lib/storage/index.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { createHash, randomBytes, randomInt } from 'node:crypto'
33

44
import consola from 'consola'
55

6-
import { findKeyMatch, touchKey } from '~/lib/db'
6+
import { findKeyMatch, findStaleKeys, pruneKeys, touchKey, updateOrCreateKey } from '~/lib/db'
77
import { ENV } from '~/lib/env'
88
import { logger } from '~/lib/logger'
99
import { encodeCacheKey } from '~/lib/storage/driver'
@@ -30,7 +30,7 @@ async function initializeStorageDriver() {
3030

3131
return <StorageAdapter>{
3232
async reserveCache(key, version, cacheSize) {
33-
logger.debug('Reserve: Trying to reserve cache for', key, version, cacheSize)
33+
logger.debug('Reserve: Reserving cache for', key, version, cacheSize)
3434
const bufferKey = `${key}:${version}`
3535
const existingBuffer = uploadBuffers.get(bufferKey)
3636
if (existingBuffer) {
@@ -59,7 +59,7 @@ async function initializeStorageDriver() {
5959
}
6060
},
6161
async getCacheEntry(keys, version) {
62-
logger.debug('Get: Trying to get cache entry for', keys, version)
62+
logger.debug('Get: Getting cache entry for', keys, version)
6363
const primaryKey = keys[0]
6464
const restoreKeys = keys.length > 1 ? keys.slice(1) : undefined
6565

@@ -70,6 +70,8 @@ async function initializeStorageDriver() {
7070
return null
7171
}
7272

73+
await touchKey(cacheKey.key, cacheKey.version)
74+
7375
const cacheFileName = encodeCacheKey(cacheKey.key, cacheKey.version)
7476
const hashedKey = createHash('sha256')
7577
.update(cacheFileName + DOWNLOAD_SECRET_KEY)
@@ -83,7 +85,7 @@ async function initializeStorageDriver() {
8385
}
8486
},
8587
async commitCache(uploadId) {
86-
logger.debug('Commit: Trying to commit cache for upload', uploadId)
88+
logger.debug('Commit: Committing cache for upload', uploadId)
8789

8890
if (commitLocks.has(uploadId)) {
8991
logger.debug(`Commit: Commit for upload ${uploadId} already in progress. Ignoring...`)
@@ -109,7 +111,7 @@ async function initializeStorageDriver() {
109111
try {
110112
logger.debug('Commit: Committing cache for id', uploadId)
111113
await driver.upload(buffer, cacheFileName)
112-
await touchKey(cacheKey.key, cacheKey.version)
114+
await updateOrCreateKey(cacheKey.key, cacheKey.version)
113115
logger.debug('Commit: Cache committed for id', uploadId)
114116
} finally {
115117
cacheKeyByUploadId.delete(uploadId)
@@ -118,11 +120,11 @@ async function initializeStorageDriver() {
118120
}
119121
},
120122
async download(objectName) {
121-
logger.debug('Download: Trying to download', objectName)
123+
logger.debug('Download: Downloading', objectName)
122124
return driver.download(objectName)
123125
},
124126
async uploadChunk(uploadId, chunkStream, chunkStart) {
125-
logger.debug('Upload: Trying to upload chunk for upload', uploadId)
127+
logger.debug('Upload: Uploading chunk for upload', uploadId)
126128
const cacheKey = cacheKeyByUploadId.get(uploadId)
127129
if (!cacheKey) {
128130
logger.debug(`Upload: No cache key found for upload ${uploadId}. Ignoring...`)
@@ -148,9 +150,13 @@ async function initializeStorageDriver() {
148150
await chunkStream.pipeTo(bufferWriteStream)
149151
logger.debug('Upload: Chunks uploaded for id', uploadId)
150152
},
151-
async pruneCaches() {
152-
logger.debug('Prune: Trying to prune caches')
153-
await driver.prune()
153+
async pruneCaches(olderThanDays) {
154+
logger.debug('Prune: Pruning caches')
155+
156+
const keys = await findStaleKeys(olderThanDays)
157+
await driver.delete(keys.map((key) => encodeCacheKey(key.key, key.version)))
158+
await pruneKeys(keys)
159+
154160
logger.debug('Prune: Caches pruned')
155161
},
156162
}

lib/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { Readable } from 'node:stream'
44
export interface StorageDriver {
55
upload: (buffer: Buffer, objectName: string) => Promise<void> | void
66
download: (objectName: string) => Promise<Readable> | Readable
7-
prune: () => Promise<void> | void
7+
delete: (objectNames: string[]) => Promise<void> | void
88
}
99

1010
export interface StorageAdapter {
@@ -18,7 +18,7 @@ export interface StorageAdapter {
1818
) => Promise<void>
1919
commitCache: (uploadId: number, size: number) => Promise<void>
2020
reserveCache: (key: string, version: string, cacheSize: number) => Promise<ReserveCacheResponse>
21-
pruneCaches: () => Promise<void>
21+
pruneCaches: (olderThanDays?: number) => Promise<void>
2222
}
2323

2424
export interface ArtifactCacheEntry {

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"type": "module",
44
"version": "1.0.0",
55
"private": true,
6+
"packageManager": "pnpm@8.15.7",
67
"scripts": {
78
"prepare": "nitropack prepare && husky",
89
"dev": "nitropack dev",
@@ -24,6 +25,7 @@
2425
"@types/pg": "^8.11.5",
2526
"better-sqlite3": "^9.5.0",
2627
"consola": "^3.2.3",
28+
"croner": "^8.0.2",
2729
"kysely": "^0.27.3",
2830
"minio": "^7.1.3",
2931
"mysql2": "^3.9.7",

plugins/cleanup.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { colorize } from 'consola/utils'
2+
import { Cron } from 'croner'
3+
4+
import { ENV } from '@/lib/env'
5+
import { logger } from '@/lib/logger'
6+
import { storageAdapter } from '@/lib/storage'
7+
8+
export default defineNitroPlugin(() => {
9+
// daily
10+
const job = Cron('0 0 * * *')
11+
const nextRun = job.nextRun()
12+
logger.info(
13+
`Cleaning up cache entries older than ${colorize('blue', `${ENV.CLEANUP_OLDER_THAN_DAYS}d`)} with schedule ${colorize('blue', job.getPattern() ?? '')}${nextRun ? ` (next run: ${nextRun.toLocaleString()})` : ''}`,
14+
)
15+
job.schedule(async () => {
16+
await storageAdapter.pruneCaches(ENV.CLEANUP_OLDER_THAN_DAYS)
17+
})
18+
})

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)