Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): socket pushes not working #82

Merged
merged 13 commits into from
Mar 28, 2022
1 change: 1 addition & 0 deletions libraries/grpc-sdk/src/classes/Routing/RoutingManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export class RoutingManager {
for (let option in routeObject.options) {
if (!routeObject.options.hasOwnProperty(option)) continue;
if (option === 'middlewares') continue;
if (option === 'path') continue;
routeObject.options[option] = JSON.stringify(routeObject.options[option]);
}
let primary: string;
Expand Down
34 changes: 3 additions & 31 deletions libraries/grpc-sdk/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ export class GrpcError extends Error {
}
}

export type RouterRequest = GrpcRequest<{
params: string;
path: string;
headers: string;
context: string;
}>;
export type ParsedRouterRequest = GrpcRequest<{
params: { [key: string]: any };
path: string;
Expand All @@ -38,45 +32,23 @@ export type UnparsedRouterResponse =
| { [key: string]: any }
| string;

type Response = { result: string; redirect: string };
type ResponseWithResult = { result: string };
type ResponseWithRedirect = { redirect: string };

export type RouterResponse = GrpcResponse<Response | ResponseWithResult | ResponseWithRedirect>;

export type SetConfigRequest = GrpcRequest<{ newConfig: string }>;
export type SetConfigResponse = GrpcResponse<{ updatedConfig: string }>;

export type SocketRequest = GrpcRequest<{
event: string;
socketId: string;
params: string;
context: string;
}>;
export type ParsedSocketRequest = GrpcRequest<{
event: string;
socketId: string;
params: { [key: string]: any };
params: any[];
context: { [key: string]: any };
}>;
export type UnparsedSocketResponse =
| {
event: string;
data: { [key: string]: any };
receivers?: string[];
}
| {
rooms: string[];
}

type EventResponse = {
event: string;
data: string;
data: { [key: string]: any };
receivers?: string[];
};

type JoinRoomResponse = {
rooms: string[];
};

export type SocketResponse = GrpcResponse<EventResponse | JoinRoomResponse>;
export type UnparsedSocketResponse = EventResponse | JoinRoomResponse;
11 changes: 6 additions & 5 deletions modules/chat/src/routes/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ConduitGrpcSdk, {
ParsedRouterRequest,
RoutingManager,
TYPE,
ParsedSocketRequest,
UnparsedRouterResponse,
UnparsedSocketResponse,
} from '@conduitplatform/grpc-sdk';
Expand Down Expand Up @@ -289,16 +290,16 @@ export class ChatRoutes {
return 'Message updated successfully';
}

async connect(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async connect(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const rooms = await ChatRoom.getInstance()
.findMany({ participants: user._id });
return { rooms: (rooms as ChatRoom[]).map((room: any) => room._id) };
}

async onMessage(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessage(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId, message } = call.request.params;
const [ roomId, message ] = call.request.params;
const room = await ChatRoom.getInstance().findOne({ _id: roomId });

if (isNil(room) || !(room as ChatRoom).participants.includes(user._id)) {
Expand All @@ -320,9 +321,9 @@ export class ChatRoutes {
};
}

async onMessagesRead(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessagesRead(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId } = call.request.params;
const [ roomId ] = call.request.params;
const room = await ChatRoom.getInstance()
.findOne({ _id: roomId });
if (isNil(room) || !(room as ChatRoom).participants.includes(user._id)) {
Expand Down
10 changes: 8 additions & 2 deletions packages/router/src/controllers/Socket/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,15 @@ export class SocketController extends ConduitRouter {
handleSocketPush(push: SocketPush) {
if (isInstanceOfEventResponse(push)) {
if (isNil(push.receivers) || push.receivers!.length === 0) {
this.io.emit(push.event, push.data);
this.io.of(push.namespace).emit(push.event, push.data);
} else {
this.io.to(push.receivers).emit(push.event, push.data);
this.io.of(push.namespace).adapter.fetchSockets({
rooms: new Set(push.receivers)
}).then(sockets=>{
sockets.forEach(r=>{
r.to(push.receivers).emit(push.event, push.data)
})
})
}
} else {
throw new Error('Cannot join room in this context');
Expand Down
3 changes: 2 additions & 1 deletion packages/router/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
);
this.registerRouteMiddleware(r);
} else if (r instanceof ConduitSocket) {
console.log('New socker registered: ' + r.input.path + ' handler url: ' + url);
console.log('New socket registered: ' + r.input.path + ' handler url: ' + url);
this.registerSocket(r);
} else {
console.log(
Expand All @@ -209,6 +209,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
data: JSON.parse(call.request.data),
receivers: call.request.receivers,
rooms: call.request.rooms,
namespace: `/${(call as any).metadata.get('module-name')[0]}/`
};
await this._internalRouter.socketPush(socketData);
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/router/src/models/SocketPush.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export interface SocketPush {
data: any;
receivers: string[];
rooms: string[];
namespace: string;
}