Skip to content
This repository has been archived by the owner on May 16, 2019. It is now read-only.

Commit

Permalink
refactor: remake storage plugins to use streams (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Atamas committed Mar 23, 2019
1 parent 4775c9e commit d8df81a
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 53 deletions.
18 changes: 17 additions & 1 deletion packages/emeralt-server/src/handlers/packages/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ import ssri from 'ssri'

import { extractPackageData, useIf } from '@/utils'
import { endpoints } from '@/constants'
import { PassThrough, Writable } from 'stream'

const writeBufferToWritable = (writable: Writable, buffer: Buffer) =>
new Promise((resolve, reject) => {
writable.on('finish', resolve).on('error', reject)

const rs = new PassThrough()

rs.pipe(writable)
rs.push(buffer)
rs.push(null)
rs.end()
})

export const publishPackageHandler = ({
config,
Expand Down Expand Up @@ -51,7 +64,10 @@ export const publishPackageHandler = ({
await database.putVersion(metadata.name, version.version, version)

// upload tarball
await storage.putTarball(metadata.name, version.version, tarball.data)
await writeBufferToWritable(
await storage.createWriteStream(metadata.name, version.version),
tarball.data,
)

return res.status(200).json({})
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion packages/emeralt-server/src/handlers/packages/tarball.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const getPackageTarballHandler = ({
Router().get(endpoints.package.getTarball, async (req, res) => {
const { package_name, version } = req.params

const rs = await storage.getTarball(package_name, version)
const rs = await storage.createReadStream(package_name, version)

if (rs) {
res.header('content-encoding', 'application/octet-stream')
Expand Down
21 changes: 6 additions & 15 deletions packages/emeralt-storage-gcs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { IEmeraltStorage, CEmeraltStorage } from '@emeralt/types'
import { join } from 'path'
import { Storage, StorageOptions, Bucket } from '@google-cloud/storage'
import { Readable } from 'stream'
import { WriteStream } from 'fs'

type Options = {
storage: StorageOptions
Expand All @@ -26,30 +27,20 @@ class CEmeraltStorageGCS implements CEmeraltStorage {
this.prefix = path.prefix
}

public async getTarball(name, version) {
public async createReadStream(name, version) {
const file = this.bucket.file(join(this.prefix, name, version))

if (await file.exists()) {
return new Readable().wrap(file.createReadStream())
return file.createReadStream()
} else {
return undefined
}
}

public async putTarball(name, version, tarball) {
return new Promise(async (rs, rj) => {
const file = this.bucket.file(join(this.prefix, name, version))

const ws = file
.createWriteStream()
.on('finish', rs)
.on('close', rs)
.on('error', rj)
public async createWriteStream(name, version) {
const file = this.bucket.file(join(this.prefix, name, version))

ws.write(tarball, 'base64', () => {
ws.end()
})
})
return file.createWriteStream()
}

public async dropData() {
Expand Down
24 changes: 19 additions & 5 deletions packages/emeralt-storage-inmemory/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { IEmeraltStorage } from '@emeralt/types'
import { ReadStream } from 'fs'
import { path, assocPath } from 'ramda'
import { Readable } from 'stream'
import { PassThrough } from 'stream'

// @ts-ignore
export const EmeraltStorageInMemory: IEmeraltStorage = () => () => {
let storage = {}

return {
getTarball: (name, version) => {
createReadStream: (name, version) => {
const buffer = path([name, version], storage)

if (buffer) {
const rs = new Readable()
const rs = new PassThrough()

rs.push(buffer)
rs.push(null)
Expand All @@ -21,8 +23,20 @@ export const EmeraltStorageInMemory: IEmeraltStorage = () => () => {
}
},

putTarball: (name, version, tarball) => {
storage = assocPath([name, version], tarball, storage)
createWriteStream: (name, version) => {
const bufs = []

const stream = new PassThrough()
.on('data', (data) => {
bufs.push(data)
})
.on('end', () => {
storage = assocPath([name, version], Buffer.concat(bufs), storage)

stream.end()
})

return stream
},

dropData() {
Expand Down
8 changes: 4 additions & 4 deletions packages/emeralt-storage-localfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
remove,
createReadStream,
mkdirp,
writeFile,
createWriteStream,
} from 'fs-extra'
import { resolve } from 'path'

Expand All @@ -16,19 +16,19 @@ export const EmeraltStorageLocalFS: IEmeraltStorage<Options> = ({
path = 'node_modules/.data',
}) => () => {
return {
getTarball: async (name, version) => {
createReadStream: async (name, version) => {
const file = resolve(path, name, version)

if (await pathExists(file)) return createReadStream(file)
else return undefined
},

putTarball: async (name, version, tarball) => {
createWriteStream: async (name, version) => {
const dir = resolve(path, name)
const file = resolve(dir, version)

await mkdirp(dir)
await writeFile(file, tarball)
return createWriteStream(file)
},

dropData: () => {
Expand Down
12 changes: 4 additions & 8 deletions packages/emeralt-types/src/emeralt/storage.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import { OptionalPromise } from '../helpers'
import { TEmeraltServerConfig } from './server'
import { CEmeraltDatabase } from './database'
import { Readable } from 'stream'
import { CEmeraltPlugin } from './plugin'
import { OptionalPromise } from '../helpers'
import { Readable, Writable } from 'stream'

export interface CEmeraltStorage extends CEmeraltPlugin {
/* get raw data */
getTarball(name: string, version: string): OptionalPromise<Readable>
createReadStream(name: string, version: string): OptionalPromise<Readable>

/* put raw data */
putTarball(
name: string,
version: string,
tarball: Buffer,
): OptionalPromise<any>
createWriteStream(name: string, version: string): OptionalPromise<Writable>

/** drop all data (used for test purposes) */
dropData(): OptionalPromise<any>
Expand Down
2 changes: 2 additions & 0 deletions test/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ test<IEmeraltDatabase>('metadata', async (t, dbc) => {

t.log('put metadata')
await db.putMetadata(metadata.name, metadata)
await new Promise((r) => setTimeout(r, 1000)) // changes may take time to propogate in some dbs

t.log('with metadata')
t.deepEqual(await db.hasMetadata(metadata.name), true)
Expand All @@ -37,6 +38,7 @@ test<IEmeraltDatabase>('versions', async (t, dbc) => {

t.log('put version')
await db.putVersion(metadata.name, version.version, version)
await new Promise((r) => setTimeout(r, 1000)) // changes may take time to propogate in some dbs

t.log('with version')
t.deepEqual(await db.hasVersion(metadata.name, version.version), true)
Expand Down
48 changes: 29 additions & 19 deletions test/storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import { IEmeraltStorage } from '@emeralt/types'

import { WriteStream, ReadStream } from 'fs'
import { PassThrough } from 'stream'

import { test } from './utils'
import { Readable } from 'stream'

const readableToBuffer = (readable: Readable) =>
new Promise<Buffer>((resolve, reject) => {
const bufs = []
const writeToStream = (data: string, stream: WriteStream) =>
new Promise((resolve, reject) => {
stream.on('finish', resolve).on('error', reject)

const buf = Buffer.from(data)

readable
.on('data', (d) => bufs.push(d))
.on('end', () => resolve(Buffer.concat(bufs)))
.on('error', reject)
stream.write(buf)
stream.end()
})

const readFromStream = (stream: ReadStream) =>
new Promise((resolve, reject) => {
let bufs = []

stream.pipe(
new PassThrough()
.on('data', (buf) => bufs.push(buf))
.on('end', () => resolve(Buffer.concat(bufs).toString()))
.on('error', reject),
)
})

test<IEmeraltStorage>('storage', async (t, createStorage) => {
Expand All @@ -21,16 +35,12 @@ test<IEmeraltStorage>('storage', async (t, createStorage) => {
t.log('healthz')
t.deepEqual(await storage.healthz(), { ok: true })

t.log('putTarball')
await storage.putTarball('test', '1.0.0', Buffer.from(data))

t.log('getTarball')
const readable = await storage.getTarball('test', '1.0.0')
t.true(readable instanceof Readable)

const newData = await readableToBuffer(
await storage.getTarball('test', '1.0.0'),
).then((t) => t.toString())
t.log('createWriteStream')
await writeToStream(data, await storage.createWriteStream('test', '1.0.0'))

t.is(newData, data)
t.log('createReadableStream')
t.is(
await readFromStream(await storage.createReadStream('test', '1.0.0')),
data,
)
})

0 comments on commit d8df81a

Please sign in to comment.