Skip to content
This repository has been archived by the owner on Dec 23, 2021. It is now read-only.

Commit

Permalink
feat: #25 basic support batch request in json format
Browse files Browse the repository at this point in the history
  • Loading branch information
Soontao committed Aug 9, 2020
1 parent f882af5 commit 4c70ecb
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 125 deletions.
8 changes: 7 additions & 1 deletion example/typed_simple_server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as express from "express";
import 'reflect-metadata';
import { createTypedODataServer } from '../src';
import { SchoolEntities } from "../test/typeorm/school_model";
Expand All @@ -14,7 +15,12 @@ const run = async () => {
entities: SchoolEntities
}, ...SchoolEntities);

const s = server.create(50000);

const app = express()

app.use(server.create())

const s = app.listen(50000)

s.once('listening', () => console.log(`server started at ${s.address()['port']}`));

Expand Down
6 changes: 6 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ export class UnsupportedMediaTypeError extends HttpRequestError {
super(415, message);
}
}

export class BadRequestError extends HttpRequestError {
constructor(message = 'Bad Request') {
super(400, message);
}
}
207 changes: 205 additions & 2 deletions src/middlewares.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { flatten } from '@newdash/newdash/flatten';
import { groupBy } from '@newdash/newdash/groupBy';
import { map } from '@newdash/newdash/map';
import { ServiceMetadata } from '@odata/metadata';
import { JsonBatchBundle, JsonBatchRequest } from '@odata/parser';
import { NextFunction, Request, Response } from 'express';
import { convert, parse } from 'odata2openapi';
import { dirname } from 'path';
import { isArray } from 'util';
import { ODataHttpContext, ODataServer } from '.';
import { HttpRequestError, UnsupportedMediaTypeError } from './error';
import { ODataMetadataType } from './processor';
import { createTransactionContext } from './typeorm/transaction';
import { createLogger } from './logger';
import { ODataMetadataType, ODataProcessorOptions } from './processor';
import { commitTransaction, createTransactionContext, rollbackTransaction } from './typeorm/transaction';

const logger = createLogger('middlewares');

/**
* with request id in `res.locals[tx_ctx]`
Expand Down Expand Up @@ -135,3 +142,199 @@ export function withSwaggerDocument(sm: ServiceMetadata) {
}
};
}

/**
* create $batch requests handler
*
* @param server
*/
export function withODataBatchRequestHandler(server: typeof ODataServer) {
const logger = createLogger('request:batch');
return async (req: Request, res: Response, next: NextFunction) => {
try {
const body: JsonBatchBundle = req.body;
// TO DO validation here
const groups: Record<string, JsonBatchRequest[]> = groupBy(body.requests, (bRequest) => bRequest.atomicityGroup || 'default');

const collectedResults = await Promise.all(map(groups, async (groupRequests, groupName) => {
// each atom group will run in SINGLE transaction
const groupResults = [];
const txContext = createTransactionContext();

// if any item process failed, this value will be true
let anyThingWrong = false;

for (let idx = 0; idx < groupRequests.length; idx++) {
const batchRequest = groupRequests[idx];

try {

const ctx: ODataHttpContext = {
url: batchRequest.url,
method: batchRequest.method,
protocol: req.secure ? 'https' : 'http',
host: req.headers.host,
base: req.baseUrl,
request: req,
response: res,
tx: txContext
};

const processor = server.createProcessor(ctx, { metadata: res['metadata'] });

const result = await processor.execute(batchRequest.body);

groupResults.push({
id: batchRequest.id,
status: result.statusCode || 200,
body: result.body
});

} catch (err) {

anyThingWrong = true;
const statusCode = err.statusCode || 500;
groupResults.push({
id: batchRequest.id,
status: statusCode,
body: {
error: {
code: statusCode,
message: err.message
}
}
});

}
}

if (anyThingWrong) {
await rollbackTransaction(txContext);
} else {
await commitTransaction(txContext);
}

return groupResults;

}));

res.json(flatten(collectedResults));

} catch (error) {
next(error);
}
};
}

/**
* create simple simple request handler
*
* @param server
*/
export function withODataRequestHandler(server: typeof ODataServer) {

const logger = createLogger('request:simple');

return async (req: Request, res: Response, next: NextFunction) => {

// new transaction for request
const txContext = createTransactionContext();

const ctx: ODataHttpContext = {
url: req.url,
method: req.method,
protocol: req.secure ? 'https' : 'http',
host: req.headers.host,
base: req.baseUrl,
request: req,
response: res,
tx: txContext
};

let hasError = false;

try {

ensureODataHeaders(req, res);

const processor = server.createProcessor(ctx, <ODataProcessorOptions>{
metadata: res['metadata']
});

processor.on('header', (headers) => {
for (const prop in headers) {
if (prop.toLowerCase() == 'content-type') {
ensureODataContentType(req, res, headers[prop]);
} else {
res.setHeader(prop, headers[prop]);
}
}
});

processor.on('data', (chunk, encoding, done) => {
if (!hasError) { res.write(chunk, encoding, done); }
});

let body = req.body;

// if chunked upload, will use request stream as body
if (req.headers['transfer-encoding'] == 'chunked') {
body = req;
}

const origStatus = res.statusCode;

const result = await processor.execute(body);

if (result) {
res.status((origStatus != res.statusCode && res.statusCode) || result.statusCode || 200);
if (!res.headersSent) {
ensureODataContentType(req, res, result.contentType || 'text/plain');
}
if (typeof result.body != 'undefined') {
if (typeof result.body != 'object') {
res.send(`${result.body}`);
} else if (!res.headersSent) {
res.send(result.body);
}
}
}

await commitTransaction(txContext);
res.end();

} catch (err) {

await rollbackTransaction(txContext);
hasError = true;
next(err);

}
};
};


/** Create Express middleware for OData error handling */
export function withODataErrorHandler(err, _, res, next) {
if (err) {
if (res.headersSent) {
return next(err);
}
const statusCode = err.statusCode || err.status || (res.statusCode < 400 ? 500 : res.statusCode);
if (!res.statusCode || res.statusCode < 400) {
res.status(statusCode);
}

logger(err.stack);

res.send({
error: {
code: statusCode,
message: err.message
// stack: process.env.ODATA_V4_ENABLE_STACKTRACE ? undefined : err.stack
}
});
} else {
next();
}
}
12 changes: 5 additions & 7 deletions src/processor/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import { BaseODataModel } from '../typeorm';
import { isIterator, isPromise, isStream } from '../utils';
import { NavigationPart, ODATA_TYPE, ResourcePathVisitor } from '../visitor';
import { fnCaller } from './fnCaller';
import { getODataRoot } from './getODataRoot';

const getODataRoot = function (context: ODataHttpContext) {
return `${context.protocol || 'http'}://${context.host || 'localhost'}${context.base || ''}`;
};

const createODataContext = function (context: ODataHttpContext, entitySets, server: typeof ODataServer, resourcePath, processor) {
const odataContextBase = `${getODataRoot(context)}/$metadata#`;
Expand Down Expand Up @@ -466,9 +464,9 @@ export enum ODataMetadataType {
}

export interface ODataProcessorOptions {
disableEntityConversion: boolean
metadata: ODataMetadataType
objectMode: boolean
disableEntityConversion?: boolean
metadata?: ODataMetadataType
objectMode?: boolean
}

export class ODataProcessor extends Transform {
Expand Down Expand Up @@ -1878,7 +1876,7 @@ export class ODataProcessor extends Transform {
}

if (txContextParam) {
params[txContextParam] = this.context?.response?.locals['tx_ctx'];
params[txContextParam] = this.context?.tx;
}

if (streamParam) {
Expand Down

0 comments on commit 4c70ecb

Please sign in to comment.