Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mutex around file mutation http handlers #247

Merged
merged 9 commits into from Jul 30, 2019
12 changes: 8 additions & 4 deletions README.md
Expand Up @@ -248,8 +248,13 @@ can be configured via the [config.json](#configuration-files).
# Driver model

Gaia hub drivers are fairly simple. The biggest requirement is the ability
to fulfill the _write-to/read-from_ URL guarantee. As currently implemented
a gaia hub driver must implement the following two functions:
to fulfill the _write-to/read-from_ URL guarantee.

A driver can expect that two modification operations to the same path will be mutually exclusive.
No writes, renames, or deletes to the same path will be concurrent.

As currently implemented
a gaia hub driver must implement the following functions:

```javascript
/**
Expand Down Expand Up @@ -281,11 +286,10 @@ performDelete (options: { path, storageToplevel })
* @param { String } options.path - path of the original file
* @param { String } options.storageTopLevel - the top level directory for the original file
* @param { String } options.newPath - new path for the file
* @param { String } options.newStorageTopLevel - new top level directory for the file
* @returns {Promise}
*/
performRename (options: { path, storageTopLevel,
newPath, newStorageTopLevel })
newPath })

/**
* Returns an object with a NodeJS stream.Readable for the file content
Expand Down
1 change: 0 additions & 1 deletion admin/.eslintrc.js
Expand Up @@ -34,7 +34,6 @@ module.exports = {
"new-cap": 0,
"brace-style": 2,
"semi": [2, "never"],
"valid-jsdoc": ["error"],

"@typescript-eslint/indent": [2, 2, {
"FunctionDeclaration": { "parameters": "first" },
Expand Down
1 change: 0 additions & 1 deletion hub/.eslintrc.js
Expand Up @@ -34,7 +34,6 @@ module.exports = {
"new-cap": 0,
"brace-style": 2,
"semi": [2, "never"],
"valid-jsdoc": ["error"],

"@typescript-eslint/indent": [2, 2, {
"FunctionDeclaration": { "parameters": "first" },
Expand Down
1 change: 0 additions & 1 deletion hub/src/server/driverModel.ts
Expand Up @@ -46,7 +46,6 @@ export interface PerformRenameArgs {
path: string;
storageTopLevel: string;
newPath: string;
newStorageTopLevel: string;
}

export interface DriverModel {
Expand Down
2 changes: 1 addition & 1 deletion hub/src/server/drivers/AzDriver.ts
Expand Up @@ -293,7 +293,7 @@ class AzDriver implements DriverModel, DriverModelTestMethods {
const origBlobUrl = azure.BlobURL.fromContainerURL(this.container, origAzBlob)
const origBlockBlobURL = azure.BlockBlobURL.fromBlobURL(origBlobUrl)

const newAzBlob = `${args.newStorageTopLevel}/${args.newPath}`
const newAzBlob = `${args.storageTopLevel}/${args.newPath}`
const newBlobURL = azure.BlobURL.fromContainerURL(this.container, newAzBlob)
const newBlockBlobURL = azure.BlockBlobURL.fromBlobURL(newBlobURL)

Expand Down
6 changes: 3 additions & 3 deletions hub/src/server/drivers/GcDriver.ts
Expand Up @@ -3,7 +3,7 @@ import { Storage, File } from '@google-cloud/storage'
import { BadPathError, InvalidInputError, DoesNotExist } from '../errors'
import { ListFilesResult, PerformWriteArgs, PerformDeleteArgs, PerformRenameArgs, StatResult, PerformStatArgs, PerformReadArgs, ReadResult } from '../driverModel'
import { DriverStatics, DriverModel, DriverModelTestMethods } from '../driverModel'
import { pipeline, logger } from '../utils'
import { pipelineAsync, logger } from '../utils'

export interface GC_CONFIG_TYPE {
gcCredentials?: {
Expand Down Expand Up @@ -189,7 +189,7 @@ class GcDriver implements DriverModel, DriverModelTestMethods {
})

try {
await pipeline(args.stream, fileWriteStream)
await pipelineAsync(args.stream, fileWriteStream)
logger.debug(`storing ${filename} in bucket ${this.bucket}`)
} catch (error) {
logger.error(`failed to store ${filename} in bucket ${this.bucket}`)
Expand Down Expand Up @@ -302,7 +302,7 @@ class GcDriver implements DriverModel, DriverModelTestMethods {
.bucket(this.bucket)
.file(filename)

const newFilename = `${args.newStorageTopLevel}/${args.newPath}`
const newFilename = `${args.storageTopLevel}/${args.newPath}`
const newBucketFile = this.storage
.bucket(this.bucket)
.file(newFilename)
Expand Down
2 changes: 1 addition & 1 deletion hub/src/server/drivers/S3Driver.ts
Expand Up @@ -301,7 +301,7 @@ class S3Driver implements DriverModel, DriverModelTestMethods {
}

const s3KeyOrig = `${args.storageTopLevel}/${args.path}`
const s3keyNew = `${args.newStorageTopLevel}/${args.newPath}`
const s3keyNew = `${args.storageTopLevel}/${args.newPath}`

const s3RenameParams: S3.Types.CopyObjectRequest = {
Bucket: this.bucket,
Expand Down
6 changes: 3 additions & 3 deletions hub/src/server/drivers/diskDriver.ts
Expand Up @@ -3,7 +3,7 @@ import { BadPathError, InvalidInputError, DoesNotExist } from '../errors'
import Path from 'path'
import { ListFilesResult, PerformWriteArgs, PerformDeleteArgs, PerformRenameArgs, PerformStatArgs, StatResult, PerformReadArgs, ReadResult } from '../driverModel'
import { DriverStatics, DriverModel } from '../driverModel'
import { pipeline, logger } from '../utils'
import { pipelineAsync, logger } from '../utils'

export interface DISK_CONFIG_TYPE {
diskSettings: { storageRootDirectory?: string },
Expand Down Expand Up @@ -199,7 +199,7 @@ class DiskDriver implements DriverModel {
await this.mkdirs(absdirname)

const writePipe = fs.createWriteStream(absoluteFilePath, { mode: 0o600, flags: 'w' })
await pipeline(args.stream, writePipe)
await pipelineAsync(args.stream, writePipe)


const contentTypeDirPath = Path.dirname(contentTypeFilePath)
Expand Down Expand Up @@ -308,7 +308,7 @@ class DiskDriver implements DriverModel {
async performRename(args: PerformRenameArgs): Promise<void> {
const pathsOrig = this.getFullFilePathInfo(args)
const pathsNew = this.getFullFilePathInfo({
storageTopLevel: args.newStorageTopLevel,
storageTopLevel: args.storageTopLevel,
path: args.newPath
})

Expand Down
62 changes: 49 additions & 13 deletions hub/src/server/http.ts
Expand Up @@ -5,7 +5,7 @@ import cors from 'cors'
import { ProofChecker } from './ProofChecker'
import { getChallengeText, LATEST_AUTH_VERSION } from './authentication'
import { HubServer } from './server'
import { getDriverClass, logger } from './utils'
import { getDriverClass, logger, AsyncMutexScope } from './utils'
import { DriverModel } from './driverModel'
import * as errors from './errors'
import { HubConfigInterface } from './config'
Expand All @@ -16,10 +16,12 @@ function writeResponse(res: express.Response, data: any, statusCode: number) {
res.end()
}

export function makeHttpServer(config: HubConfigInterface): { app: express.Application, server: HubServer, driver: DriverModel } {
export function makeHttpServer(config: HubConfigInterface): { app: express.Application, server: HubServer, driver: DriverModel, asyncMutex: AsyncMutexScope } {

const app: express.Application = express()

const asyncMutex = new AsyncMutexScope()

// Handle driver configuration
let driver: DriverModel

Expand Down Expand Up @@ -54,12 +56,13 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
filename = filename.substring(0, filename.length - 1)
}
const address = req.params[0]
const endpoint = `${address}/${filename}`

server.handleRequest(address, filename, req.headers, req)
.then((publicURL) => {
const handleRequest = async () => {
try {
const publicURL = await server.handleRequest(address, filename, req.headers, req)
writeResponse(res, { publicURL }, 202)
})
.catch((err: any) => {
} catch (err) {
logger.error(err)
if (err instanceof errors.ValidationError) {
writeResponse(res, { message: err.message, error: err.name }, 401)
Expand All @@ -74,7 +77,23 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
} else {
writeResponse(res, { message: 'Server Error' }, 500)
}
})
}
}

try {
if (!asyncMutex.tryAcquire(endpoint, handleRequest)) {
const errMsg = `Concurrent operation (store) attempted on ${endpoint}`
logger.error(errMsg)
writeResponse(res, {
message: errMsg,
error: errors.ConflictError.name
}, 409)
}
} catch (err) {
logger.error(err)
writeResponse(res, { message: 'Server Error' }, 500)
}

})

app.delete(/^\/delete\/([a-zA-Z0-9]+)\/(.+)/, (
Expand All @@ -86,13 +105,14 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
filename = filename.substring(0, filename.length - 1)
}
const address = req.params[0]
const endpoint = `${address}/${filename}`

server.handleDelete(address, filename, req.headers)
.then(() => {
const handleRequest = async () => {
try {
await server.handleDelete(address, filename, req.headers)
res.writeHead(202)
res.end()
})
.catch((err: any) => {
} catch (err) {
logger.error(err)
if (err instanceof errors.ValidationError) {
writeResponse(res, { message: err.message, error: err.name }, 401)
Expand All @@ -107,7 +127,23 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
} else {
writeResponse(res, { message: 'Server Error' }, 500)
}
})
}
}

try {
if (!asyncMutex.tryAcquire(endpoint, handleRequest)) {
const errMsg = `Concurrent operation (delete) attempted on ${endpoint}`
logger.error(errMsg)
writeResponse(res, {
message: errMsg,
error: errors.ConflictError.name
}, 409)
}
} catch (err) {
logger.error(err)
writeResponse(res, { message: 'Server Error' }, 500)
}

})

app.post(
Expand Down Expand Up @@ -191,5 +227,5 @@ export function makeHttpServer(config: HubConfigInterface): { app: express.Appli
})

// Instantiate express application
return { app, server, driver }
return { app, server, driver, asyncMutex }
}
38 changes: 34 additions & 4 deletions hub/src/server/utils.ts
Expand Up @@ -9,11 +9,10 @@ import DiskDriver from './drivers/diskDriver'
import { promisify } from 'util'
import winston from 'winston'

//$FlowFixMe - Flow is unaware of the stream.pipeline Node API
import { pipeline as _pipline } from 'stream'
import { pipeline } from 'stream'
import { DriverName } from './config'

export const pipeline = promisify(_pipline)
export const pipelineAsync = promisify(pipeline)

export const logger = winston.createLogger()

Expand Down Expand Up @@ -52,7 +51,7 @@ class MemoryStream extends stream.Writable {

export async function readStream(stream: stream.Readable): Promise<Buffer> {
const memStream = new MemoryStream()
await pipeline(stream, memStream)
await pipelineAsync(stream, memStream)
return memStream.getData()
}

Expand All @@ -63,3 +62,34 @@ export function timeout(milliseconds: number): Promise<void> {
}, milliseconds)
})
}


export class AsyncMutexScope {

private readonly _opened = new Set<string>()

public get openedCount() {
return this._opened.size
}

/**
* If no mutex of the given `id` is already taken, then a mutex is created and the
* given promise is invoked. The mutex is released once the promise resolves -- either by
* success or error.
* @param id A unique mutex name used in a Map.
* @param spawnOwner A function that creates a Promise if the mutex is acquired.
* @returns `true` if the mutex was acquired, otherwise returns `false`
*/
public tryAcquire(id: string, spawnOwner: () => Promise<void>): boolean {
if (this._opened.has(id)) {
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
return false
}
const owner = spawnOwner()
this._opened.add(id)
jcnelson marked this conversation as resolved.
Show resolved Hide resolved
owner.finally(() => {
this._opened.delete(id)
})
return true
}

}
10 changes: 10 additions & 0 deletions hub/test/src/testDrivers/InMemoryDriver.ts
Expand Up @@ -19,6 +19,8 @@ export class InMemoryDriver implements DriverModel {
lastWrite: PerformWriteArgs
initPromise: Promise<void>

onWriteMiddleware: Set<((PerformWriteArgs) => Promise<void>)> = new Set()

constructor(config: any) {
this.pageSize = (config && config.pageSize) ? config.pageSize : 100
this.files = new Map<string, { content: Buffer, contentType: string, lastModified: Date }>()
Expand Down Expand Up @@ -56,6 +58,7 @@ export class InMemoryDriver implements DriverModel {
await driver.start()
return driver
}

start() {
return new Promise<void>((resolve) => {
this.server = this.app.listen(0, 'localhost', () => {
Expand All @@ -65,10 +68,17 @@ export class InMemoryDriver implements DriverModel {
})
})
}

getReadURLPrefix() {
return this.readUrl
}

async performWrite(args: PerformWriteArgs) {

for (const middleware of this.onWriteMiddleware) {
await middleware(args)
}

// cancel write and return 402 if path is invalid
if (!InMemoryDriver.isPathValid(args.path)) {
throw new BadPathError('Invalid Path')
Expand Down