Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mutex around file mutation http handlers #247

Merged
merged 9 commits into from Jul 30, 2019
9 changes: 7 additions & 2 deletions README.md
Expand Up @@ -248,8 +248,13 @@ can be configured via the [config.json](#configuration-files).
# Driver model

Gaia hub drivers are fairly simple. The biggest requirement is the ability
to fulfill the _write-to/read-from_ URL guarantee. As currently implemented
a gaia hub driver must implement the following two functions:
to fulfill the _write-to/read-from_ URL guarantee.

A driver can expect that each of its functions will be mutexed by a path.
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
As in, no function will be invoked concurrently with a given unique path.

As currently implemented
a gaia hub driver must implement the following functions:

```javascript
/**
Expand Down
1 change: 0 additions & 1 deletion admin/.eslintrc.js
Expand Up @@ -34,7 +34,6 @@ module.exports = {
"new-cap": 0,
"brace-style": 2,
"semi": [2, "never"],
"valid-jsdoc": ["error"],

"@typescript-eslint/indent": [2, 2, {
"FunctionDeclaration": { "parameters": "first" },
Expand Down
1 change: 0 additions & 1 deletion hub/.eslintrc.js
Expand Up @@ -34,7 +34,6 @@ module.exports = {
"new-cap": 0,
"brace-style": 2,
"semi": [2, "never"],
"valid-jsdoc": ["error"],

"@typescript-eslint/indent": [2, 2, {
"FunctionDeclaration": { "parameters": "first" },
Expand Down
4 changes: 2 additions & 2 deletions hub/src/server/drivers/GcDriver.ts
Expand Up @@ -3,7 +3,7 @@ import { Storage, File } from '@google-cloud/storage'
import { BadPathError, InvalidInputError, DoesNotExist } from '../errors'
import { ListFilesResult, PerformWriteArgs, PerformDeleteArgs, PerformRenameArgs, StatResult, PerformStatArgs, PerformReadArgs, ReadResult } from '../driverModel'
import { DriverStatics, DriverModel, DriverModelTestMethods } from '../driverModel'
import { pipeline, logger } from '../utils'
import { pipelineAsync, logger } from '../utils'

export interface GC_CONFIG_TYPE {
gcCredentials?: {
Expand Down Expand Up @@ -189,7 +189,7 @@ class GcDriver implements DriverModel, DriverModelTestMethods {
})

try {
await pipeline(args.stream, fileWriteStream)
await pipelineAsync(args.stream, fileWriteStream)
logger.debug(`storing ${filename} in bucket ${this.bucket}`)
} catch (error) {
logger.error(`failed to store ${filename} in bucket ${this.bucket}`)
Expand Down
4 changes: 2 additions & 2 deletions hub/src/server/drivers/diskDriver.ts
Expand Up @@ -3,7 +3,7 @@ import { BadPathError, InvalidInputError, DoesNotExist } from '../errors'
import Path from 'path'
import { ListFilesResult, PerformWriteArgs, PerformDeleteArgs, PerformRenameArgs, PerformStatArgs, StatResult, PerformReadArgs, ReadResult } from '../driverModel'
import { DriverStatics, DriverModel } from '../driverModel'
import { pipeline, logger } from '../utils'
import { pipelineAsync, logger } from '../utils'

export interface DISK_CONFIG_TYPE {
diskSettings: { storageRootDirectory?: string },
Expand Down Expand Up @@ -199,7 +199,7 @@ class DiskDriver implements DriverModel {
await this.mkdirs(absdirname)

const writePipe = fs.createWriteStream(absoluteFilePath, { mode: 0o600, flags: 'w' })
await pipeline(args.stream, writePipe)
await pipelineAsync(args.stream, writePipe)


const contentTypeDirPath = Path.dirname(contentTypeFilePath)
Expand Down
51 changes: 38 additions & 13 deletions hub/src/server/http.ts
Expand Up @@ -5,7 +5,7 @@ import cors from 'cors'
import { ProofChecker } from './ProofChecker'
import { getChallengeText, LATEST_AUTH_VERSION } from './authentication'
import { HubServer } from './server'
import { getDriverClass, logger } from './utils'
import { getDriverClass, logger, AsyncMutexScope } from './utils'
import { DriverModel } from './driverModel'
import * as errors from './errors'
import { HubConfigInterface } from './config'
Expand All @@ -16,10 +16,12 @@ function writeResponse(res: express.Response, data: any, statusCode: number) {
res.end()
}

export function makeHttpServer(config: HubConfigInterface): { app: express.Application, server: HubServer, driver: DriverModel } {
export function makeHttpServer(config: HubConfigInterface): { app: express.Application, server: HubServer, driver: DriverModel, asyncMutex: AsyncMutexScope } {

const app: express.Application = express()

const asyncMutex = new AsyncMutexScope()

// Handle driver configuration
let driver: DriverModel

Expand Down Expand Up @@ -54,12 +56,13 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
filename = filename.substring(0, filename.length - 1)
}
const address = req.params[0]
const endpoint = `${address}/${filename}`

server.handleRequest(address, filename, req.headers, req)
.then((publicURL) => {
const handleRequest = async () => {
try {
const publicURL = await server.handleRequest(address, filename, req.headers, req)
writeResponse(res, { publicURL }, 202)
})
.catch((err: any) => {
} catch (err) {
logger.error(err)
if (err instanceof errors.ValidationError) {
writeResponse(res, { message: err.message, error: err.name }, 401)
Expand All @@ -74,7 +77,18 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
} else {
writeResponse(res, { message: 'Server Error' }, 500)
}
})
}
}

if (!asyncMutex.tryAcquire(endpoint, handleRequest)) {
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
const errMsg = `Concurrent operation (store) attempted on ${endpoint}`
logger.error(errMsg)
writeResponse(res, {
message: errMsg,
error: errors.ConflictError.name
}, 409)
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
}

})

app.delete(/^\/delete\/([a-zA-Z0-9]+)\/(.+)/, (
Expand All @@ -86,13 +100,14 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
filename = filename.substring(0, filename.length - 1)
}
const address = req.params[0]
const endpoint = `${address}/${filename}`

server.handleDelete(address, filename, req.headers)
.then(() => {
const handleRequest = async () => {
try {
await server.handleDelete(address, filename, req.headers)
res.writeHead(202)
res.end()
})
.catch((err: any) => {
} catch (err) {
logger.error(err)
if (err instanceof errors.ValidationError) {
writeResponse(res, { message: err.message, error: err.name }, 401)
Expand All @@ -107,7 +122,17 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
} else {
writeResponse(res, { message: 'Server Error' }, 500)
}
})
}
}

if (!asyncMutex.tryAcquire(endpoint, handleRequest)) {
const errMsg = `Concurrent operation (delete) attempted on ${endpoint}`
logger.error(errMsg)
writeResponse(res, {
message: errMsg,
error: errors.ConflictError.name
}, 409)
}
})

app.post(
Expand Down Expand Up @@ -191,5 +216,5 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
})

// Instantiate express application
return { app, server, driver }
return { app, server, driver, asyncMutex }
}
40 changes: 36 additions & 4 deletions hub/src/server/utils.ts
Expand Up @@ -9,11 +9,10 @@ import DiskDriver from './drivers/diskDriver'
import { promisify } from 'util'
import winston from 'winston'

//$FlowFixMe - Flow is unaware of the stream.pipeline Node API
import { pipeline as _pipline } from 'stream'
import { pipeline } from 'stream'
import { DriverName } from './config'

export const pipeline = promisify(_pipline)
export const pipelineAsync = promisify(pipeline)

export const logger = winston.createLogger()

Expand Down Expand Up @@ -52,7 +51,7 @@ class MemoryStream extends stream.Writable {

export async function readStream(stream: stream.Readable): Promise<Buffer> {
const memStream = new MemoryStream()
await pipeline(stream, memStream)
await pipelineAsync(stream, memStream)
return memStream.getData()
}

Expand All @@ -63,3 +62,36 @@ export function timeout(milliseconds: number): Promise<void> {
}, milliseconds)
})
}


export class AsyncMutexScope {

private readonly _opened = new Set<string>()

public get openedCount() {
return this._opened.size
}

/**
* If no mutex of the given `id` is already taken, then a mutex is created and the
* given promise is invoked. The mutex is released once the promise resolves -- either by
* success or error.
* @param id A unique mutex name used in a Map.
* @param spawnOwner A function that creates a Promise if the mutex is acquired.
* @returns `true` if the mutex was acquired, otherwise returns `false`
*/
public tryAcquire(id: string, spawnOwner: () => Promise<void>): boolean {
if (this._opened.has(id)) {
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// Wrap in Promise.resolve to ensure potential synchronous errors are not throw within this function.
const owner = Promise.resolve().then(() => spawnOwner())
this._opened.add(id)
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
owner.finally(() => {
this._opened.delete(id)
})
return true
}

}
10 changes: 10 additions & 0 deletions hub/test/src/testDrivers/InMemoryDriver.ts
Expand Up @@ -19,6 +19,8 @@ export class InMemoryDriver implements DriverModel {
lastWrite: PerformWriteArgs
initPromise: Promise<void>

onWriteMiddleware: Set<((PerformWriteArgs) => Promise<void>)> = new Set()

constructor(config: any) {
this.pageSize = (config && config.pageSize) ? config.pageSize : 100
this.files = new Map<string, { content: Buffer, contentType: string, lastModified: Date }>()
Expand Down Expand Up @@ -56,6 +58,7 @@ export class InMemoryDriver implements DriverModel {
await driver.start()
return driver
}

start() {
return new Promise<void>((resolve) => {
this.server = this.app.listen(0, 'localhost', () => {
Expand All @@ -65,10 +68,17 @@ export class InMemoryDriver implements DriverModel {
})
})
}

getReadURLPrefix() {
return this.readUrl
}

async performWrite(args: PerformWriteArgs) {

for (const middleware of this.onWriteMiddleware) {
await middleware(args)
}

// cancel write and return 402 if path is invalid
if (!InMemoryDriver.isPathValid(args.path)) {
throw new BadPathError('Invalid Path')
Expand Down
98 changes: 98 additions & 0 deletions hub/test/src/testHttp.ts
Expand Up @@ -18,12 +18,110 @@ import { testPairs, testAddrs } from './common'
import InMemoryDriver from './testDrivers/InMemoryDriver'
import { MockAuthTimestampCache } from './MockAuthTimestampCache'
import { HubConfigInterface } from '../../src/server/config'
import { PassThrough } from 'stream';

const TEST_SERVER_NAME = 'test-server'
const TEST_AUTH_CACHE_SIZE = 10


export function testHttpWithInMemoryDriver() {

test('reject concurrent requests to same resource (InMemory driver)', async (t) => {
const inMemoryDriver = await InMemoryDriver.spawn()
try {
const makeResult = makeHttpServer({ driverInstance: inMemoryDriver, serverName: TEST_SERVER_NAME, authTimestampCacheSize: TEST_AUTH_CACHE_SIZE })
const app = makeResult.app
const server = makeResult.server
const asyncMutexScope = makeResult.asyncMutex
server.authTimestampCache = new MockAuthTimestampCache()

const sk = testPairs[1]
const fileContents = sk.toWIF()
const blob = Buffer.from(fileContents)

const address = ecPairToAddress(sk)

let response = await request(app)
.get('/hub_info/')
.expect(200)

const challenge = JSON.parse(response.text).challenge_text
const authPart = auth.V1Authentication.makeAuthPart(sk, challenge)
const authorization = `bearer ${authPart}`

const passThrough1 = new PassThrough()
passThrough1.write("stuff", "utf8")

const resolves: Set<() => void> = new Set()
inMemoryDriver.onWriteMiddleware.add((args) => {
return new Promise(resolve => {
resolves.add(resolve)
})
})

const reqPromise1 = request(app).post(`/store/${address}/helloWorld`)
.set('Content-Type', 'application/octet-stream')
.set('Authorization', authorization)
.send(blob)
.expect(202)

const reqPromise2 = request(app).post(`/store/${address}/helloWorld`)
.set('Content-Type', 'application/octet-stream')
.set('Authorization', authorization)
.send(blob)
.expect(409)

const reqPromise3 = request(app).delete(`/delete/${address}/helloWorld`)
.set('Content-Type', 'application/octet-stream')
.set('Authorization', authorization)
.send(blob)
.expect(409)

const releaseRequests = new Promise(resolve => {
setTimeout(() => {
for (const release of resolves) {
release()
}
resolve()
}, 50)
})

await Promise.all([reqPromise2, reqPromise1, reqPromise3, releaseRequests])

await reqPromise1
t.ok('First request (store) passes with no concurrent conflict')
await reqPromise2
t.ok('Second request (store) fails with concurrent conflict')
await reqPromise3
t.ok('Third request (delete) fails with concurrent conflict')

inMemoryDriver.onWriteMiddleware.clear()

await request(app).post(`/store/${address}/helloWorld`)
.set('Content-Type', 'application/octet-stream')
.set('Authorization', authorization)
.send(blob)
.expect(202)

t.ok('Fourth request (store) passes with no concurrent conflict')

await request(app).delete(`/delete/${address}/helloWorld`)
.set('Content-Type', 'application/octet-stream')
.set('Authorization', authorization)
.send(blob)
.expect(202)

t.ok('Fifth request (delete) passes with no concurrent conflict')

t.equals(asyncMutexScope.openedCount, 0, 'Should have no open mutexes when no requests are open')

} finally {
inMemoryDriver.dispose()
}
});



test('handle request (InMemory driver)', async (t) => {
const fetch = NodeFetch
const inMemoryDriver = await InMemoryDriver.spawn()
Expand Down
1 change: 0 additions & 1 deletion reader/.eslintrc.js
Expand Up @@ -34,7 +34,6 @@ module.exports = {
"new-cap": 0,
"brace-style": 2,
"semi": [2, "never"],
"valid-jsdoc": ["error"],

"@typescript-eslint/indent": [2, 2, {
"FunctionDeclaration": { "parameters": "first" },
Expand Down