Skip to content

Commit

Permalink
improve(rpc): promises abstraction for grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
flolu committed May 3, 2020
1 parent f93597d commit de890c9
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 104 deletions.
18 changes: 7 additions & 11 deletions packages/rpc/idea/idea-commands.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import * as grpc from '@grpc/grpc-js';

import { IIdeaState } from '@centsideas/models';

interface ICreateIdeaCommand {
Expand All @@ -20,14 +18,12 @@ interface IDeleteIdeaCommand {
ideaId: string;
}

export interface IIdeaCommands {
create: (payload: ICreateIdeaCommand, callback: grpc.requestCallback<IIdeaState>) => void;
update: (payload: IUpdateIdeaCommand, callback: grpc.requestCallback<IIdeaState>) => void;
delete: (payload: IDeleteIdeaCommand, callback: grpc.requestCallback<IIdeaState>) => void;
}
export type CreateIdea = (payload: ICreateIdeaCommand) => Promise<IIdeaState>;
export type UpdateIdea = (payload: IUpdateIdeaCommand) => Promise<IIdeaState>;
export type DeleteIdea = (payload: IDeleteIdeaCommand) => Promise<IIdeaState>;

export interface IIdeaCommandsImplementation {
create: grpc.handleUnaryCall<ICreateIdeaCommand, IIdeaState>;
update: grpc.handleUnaryCall<IUpdateIdeaCommand, IIdeaState>;
delete: grpc.handleUnaryCall<IDeleteIdeaCommand, IIdeaState>;
export interface IIdeaCommands {
create: CreateIdea;
update: UpdateIdea;
delete: DeleteIdea;
}
14 changes: 5 additions & 9 deletions packages/rpc/idea/idea-queries.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import * as grpc from '@grpc/grpc-js';

import { IIdeaViewModel } from '@centsideas/models';

interface IIdeaByIdQuery {
Expand All @@ -10,12 +8,10 @@ interface IIdeaViewList {
ideas: IIdeaViewModel[];
}

export interface IIdeaQueries {
getAll: (payload: undefined, callback: grpc.requestCallback<IIdeaViewList>) => void;
getById: (payload: IIdeaByIdQuery, callback: grpc.requestCallback<IIdeaViewModel>) => void;
}
export type GetAllIdeas = (payload: undefined) => Promise<IIdeaViewList>;
export type GetIdeaById = (payload: IIdeaByIdQuery) => Promise<IIdeaViewModel>;

export interface IIdeaQueriesImplementation {
getAll: grpc.handleUnaryCall<IIdeaByIdQuery, IIdeaViewList>;
getById: grpc.handleUnaryCall<IIdeaByIdQuery, IIdeaViewModel>;
export interface IIdeaQueries {
getAll: GetAllIdeas;
getById: GetIdeaById;
}
28 changes: 27 additions & 1 deletion packages/rpc/rpc.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { loadProtoPackage } from './util';

@injectable()
export class RpcClient<IClientService = any> {
private internalRpcClient!: grpc.Client;
client!: IClientService;

constructor(
Expand All @@ -14,9 +15,34 @@ export class RpcClient<IClientService = any> {
private serviceName: string,
) {
const protoPackage = loadProtoPackage(this.packageName);
this.client = new (protoPackage as any)[this.serviceName](
const serviceDefinition = (protoPackage as any)[this.serviceName];
this.internalRpcClient = new serviceDefinition(
`${this.host}:${this.port}`,
grpc.credentials.createInsecure(),
);

this.registerMethods(Object.keys(serviceDefinition.service));
}

/**
* Creates wrappers for grpc unary calls that allows to use
* promises instead of the default callback
*/
private registerMethods(methodNames: string[]) {
this.client = {} as any;
methodNames.forEach(mn => {
(this.client as any)[mn] = (payload: any) => {
return new Promise(resolve => {
const method: grpc.requestCallback<any> = (this.internalRpcClient as any)[mn](
payload,
(err: grpc.ServiceError, response: any) => {
if (err) throw err;
resolve(response);
},
);
(this.internalRpcClient as any)[mn].bind(method);
});
};
});
}
}
31 changes: 28 additions & 3 deletions packages/rpc/rpc.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ export class RpcServer {
this.startServer();
}

// FIXME better type experience for `implementation`
addService(service: grpc.ServiceDefinition, implementation: any) {
this.server.addService(service, implementation);
addService<IServiceImplementation>(
service: grpc.ServiceDefinition,
implementation: IServiceImplementation,
) {
const grpcImpl = this.convertToGrpcImplementation(implementation);
this.server.addService(service, grpcImpl);
}

loadService(packageName: string, serviceName: string): grpc.ServiceDefinition {
Expand All @@ -36,4 +39,26 @@ export class RpcServer {
},
);
}

/**
* Converts promise based implementation methods into the grpc callback
* based implementation methods
*/
private convertToGrpcImplementation(implementation: any): grpc.UntypedServiceImplementation {
const grpcImpl: grpc.UntypedServiceImplementation = {};
Object.keys(implementation).forEach(key => {
const handler = implementation[key];
(grpcImpl as any)[key] = this.convertPromiseToCallback(handler as any);
});
return grpcImpl;
}

private convertPromiseToCallback(
handler: (payload: any) => Promise<any>,
): grpc.handleUnaryCall<any, any> {
return async (call, callback) => {
const response = await handler(call.request).catch(err => callback(err, null));
callback(null, response);
};
}
}
28 changes: 14 additions & 14 deletions services/consumer/consumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { IdeasProjection } from './ideas.projection';
import { ReviewsProjection } from './reviews.projection';
import { ConsumerEnvironment } from './consumer.environment';
import { UsersProjection } from './users.projection';
import { IIdeaQueriesImplementation, RpcServer } from '@centsideas/rpc';
import { RpcServer, IIdeaQueries, GetAllIdeas, GetIdeaById } from '@centsideas/rpc';

@injectable()
export class ConsumerServer {
Expand All @@ -34,7 +34,10 @@ export class ConsumerServer {
this.messageBroker.events(EventTopics.Users).subscribe(this.usersProjection.handleEvent);

const ideaService = this.rpcServer.loadService('idea', 'IdeaQueries');
this.rpcServer.addService(ideaService, this.ideasImplementation);
this.rpcServer.addService<IIdeaQueries>(ideaService, {
getAll: this.getAll,
getById: this.getById,
});

this.app.use(bodyParser.json());

Expand All @@ -45,6 +48,15 @@ export class ConsumerServer {
this.app.listen(this.env.port);
}

getAll: GetAllIdeas = async () => {
const ideas = await this.queryService.getAllIdeas();
return { ideas };
};

getById: GetIdeaById = async ({ id }) => {
return this.queryService.getIdeaById(id);
};

private registerQueryRoutes() {
this.app.post(
`/${ApiEndpoints.Users}/${UsersApiRoutes.GetById}`,
Expand All @@ -55,16 +67,4 @@ export class ConsumerServer {
ExpressAdapters.json(this.queryService.getAllUsers),
);
}

private ideasImplementation: IIdeaQueriesImplementation = {
getAll: async (_call, callback) => {
const ideas = await this.queryService.getAllIdeas();
callback(null, { ideas });
},
getById: async (call, callback) => {
if (!call.request) return callback(Error('no payload sent'), null);
const idea = await this.queryService.getIdeaById(call.request?.id);
callback(null, idea);
},
};
}
42 changes: 12 additions & 30 deletions services/gateway/command.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
NotificationsApiRoutes,
AdminApiRoutes,
} from '@centsideas/enums';
import { IIdeaState } from '@centsideas/models';
import { IIdeaCommands, RpcClient } from '@centsideas/rpc';

import { ExpressAdapter } from './express-adapter';
Expand All @@ -33,43 +32,26 @@ export class CommandController implements interfaces.Controller {

// TODO error handling
@httpPost(`/${ApiEndpoints.Ideas}`, AuthMiddleware)
createIdea(req: express.Request, res: express.Response): Promise<IIdeaState> {
return new Promise(resolve => {
const { title, description } = req.body;
const { userId } = res.locals;

this.ideasRpc.client.create({ userId, title, description }, (err, response) => {
if (err) throw err;
resolve(response);
});
});
async createIdea(req: express.Request, res: express.Response) {
const { title, description } = req.body;
const { userId } = res.locals;
// TODO would be cool to just have `this.ideasRpc.create(...)` (maybe i can inject the client! or i add methods to rpc class at runtime?)
return this.ideasRpc.client.create({ userId, title, description });
}

@httpPut(`/${ApiEndpoints.Ideas}/:id`, AuthMiddleware)
updateIdea(req: express.Request, res: express.Response) {
return new Promise(resolve => {
const ideaId = req.params.id;
const { title, description } = req.body;
const { userId } = res.locals;

this.ideasRpc.client.update({ userId, title, description, ideaId }, (err, response) => {
if (err) throw err;
resolve(response);
});
});
const ideaId = req.params.id;
const { title, description } = req.body;
const { userId } = res.locals;
return this.ideasRpc.client.update({ userId, title, description, ideaId });
}

@httpDelete(`/${ApiEndpoints.Ideas}/:id`, AuthMiddleware)
deleteIdea(req: express.Request, res: express.Response) {
return new Promise(resolve => {
const ideaId = req.params.id;
const { userId } = res.locals;

this.ideasRpc.client.delete({ userId, ideaId }, (err, response) => {
if (err) throw err;
resolve(response);
});
});
const ideaId = req.params.id;
const { userId } = res.locals;
return this.ideasRpc.client.delete({ userId, ideaId });
}

@httpPut(`/${ApiEndpoints.Users}/:id`, AuthMiddleware)
Expand Down
18 changes: 4 additions & 14 deletions services/gateway/query.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,14 @@ export class QueryController implements interfaces.Controller {
) {}

@httpGet(`/${ApiEndpoints.Ideas}`)
getIdeas() {
return new Promise(resolve => {
this.ideasRpc.client.getAll(undefined, (err, response) => {
if (err) throw err;
if (!response) return resolve([]);
resolve(response.ideas || []);
});
});
async getIdeas() {
const { ideas } = await this.ideasRpc.client.getAll(undefined);
return ideas;
}

@httpGet(`/${ApiEndpoints.Ideas}/:id`)
getIdeaById(req: express.Request) {
return new Promise(resolve => {
this.ideasRpc.client.getById({ id: req.params.id }, (err, response) => {
if (err) throw err;
resolve(response);
});
});
return this.ideasRpc.client.getById({ id: req.params.id });
}

@httpGet(`/${ApiEndpoints.Users}`)
Expand Down
41 changes: 19 additions & 22 deletions services/ideas/ideas.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as http from 'http';
import { injectable } from 'inversify';

import { Logger } from '@centsideas/utils';
import { IIdeaCommandsImplementation, RpcServer } from '@centsideas/rpc';
import { IIdeaCommands, RpcServer, CreateIdea, DeleteIdea, UpdateIdea } from '@centsideas/rpc';

import { IdeasEnvironment } from './ideas.environment';
import { IdeasHandler } from './ideas.handler';
Expand All @@ -20,29 +20,26 @@ export class IdeasServer {
this.handleHealthchecks();

const commandsService = this.rpcServer.loadService('idea', 'IdeaCommands');
this.rpcServer.addService(commandsService, this.commandsImplementation);
this.rpcServer.addService<IIdeaCommands>(commandsService, {
create: this.create,
update: this.update,
delete: this.delete,
});
}

// TODO error handling
commandsImplementation: IIdeaCommandsImplementation = {
create: async (call, callback) => {
if (!call.request) return callback(Error('no payload sent'), null);
const { userId, title, description } = call.request;
const created = await this.handler.create(userId, title, description);
callback(null, created.persistedState);
},
update: async (call, callback) => {
if (!call.request) return callback(Error('no payload sent'), null);
const { userId, title, description, ideaId } = call.request;
const updated = await this.handler.update(userId, ideaId, title, description);
callback(null, updated.persistedState);
},
delete: async (call, callback) => {
if (!call.request) return callback(Error('no payload sent'), null);
const { userId, ideaId } = call.request;
const deleted = await this.handler.delete(userId, ideaId);
callback(null, deleted.persistedState);
},
create: CreateIdea = async ({ userId, title, description }) => {
const created = await this.handler.create(userId, title, description);
return created.persistedState;
};

update: UpdateIdea = async ({ userId, title, description, ideaId }) => {
const updated = await this.handler.update(userId, ideaId, title, description);
return updated.persistedState;
};

delete: DeleteIdea = async ({ userId, ideaId }) => {
const deleted = await this.handler.delete(userId, ideaId);
return deleted.persistedState;
};

private handleHealthchecks() {
Expand Down

0 comments on commit de890c9

Please sign in to comment.