Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): socket pushes not working #82

Merged
merged 13 commits into from
Mar 28, 2022
1 change: 1 addition & 0 deletions libraries/grpc-sdk/src/classes/Routing/RoutingManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export class RoutingManager {
for (let option in routeObject.options) {
if (!routeObject.options.hasOwnProperty(option)) continue;
if (option === 'middlewares') continue;
if (option === 'path') continue;
routeObject.options[option] = JSON.stringify(routeObject.options[option]);
}
let primary: string;
Expand Down
2 changes: 1 addition & 1 deletion libraries/grpc-sdk/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export type SocketRequest = GrpcRequest<{
export type ParsedSocketRequest = GrpcRequest<{
event: string;
socketId: string;
params: { [key: string]: any };
params: string[];
context: { [key: string]: any };
}>;
export type UnparsedSocketResponse =
Expand Down
7 changes: 4 additions & 3 deletions modules/chat/src/routes/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ConduitGrpcSdk, {
ParsedRouterRequest,
RoutingManager,
TYPE,
ParsedSocketRequest,
UnparsedRouterResponse,
UnparsedSocketResponse,
} from '@conduitplatform/grpc-sdk';
Expand Down Expand Up @@ -289,14 +290,14 @@ export class ChatRoutes {
return 'Message updated successfully';
}

async connect(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async connect(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const rooms = await ChatRoom.getInstance()
.findMany({ participants: user._id });
return { rooms: (rooms as ChatRoom[]).map((room: any) => room._id) };
}

async onMessage(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessage(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId, message } = call.request.params;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be [ roomId, message ]. Socket params are an array.

const room = await ChatRoom.getInstance().findOne({ _id: roomId });
Expand All @@ -320,7 +321,7 @@ export class ChatRoutes {
};
}

async onMessagesRead(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessagesRead(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId } = call.request.params;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same ^

const room = await ChatRoom.getInstance()
Expand Down
10 changes: 8 additions & 2 deletions packages/router/src/controllers/Socket/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,15 @@ export class SocketController extends ConduitRouter {
handleSocketPush(push: SocketPush) {
if (isInstanceOfEventResponse(push)) {
if (isNil(push.receivers) || push.receivers!.length === 0) {
this.io.emit(push.event, push.data);
this.io.of(push.namespace).emit(push.event, push.data);
} else {
this.io.to(push.receivers).emit(push.event, push.data);
this.io.of(push.namespace).adapter.fetchSockets({
rooms: new Set(push.receivers)
}).then(sockets=>{
sockets.forEach(r=>{
r.to(push.receivers).emit(push.event, push.data)
})
})
}
} else {
throw new Error('Cannot join room in this context');
Expand Down
3 changes: 2 additions & 1 deletion packages/router/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
);
this.registerRouteMiddleware(r);
} else if (r instanceof ConduitSocket) {
console.log('New socker registered: ' + r.input.path + ' handler url: ' + url);
console.log('New socket registered: ' + r.input.path + ' handler url: ' + url);
this.registerSocket(r);
} else {
console.log(
Expand All @@ -209,6 +209,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
data: JSON.parse(call.request.data),
receivers: call.request.receivers,
rooms: call.request.rooms,
namespace: `/${(call as any).metadata.get('module-name')[0]}/`
};
await this._internalRouter.socketPush(socketData);
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/router/src/models/SocketPush.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export interface SocketPush {
data: any;
receivers: string[];
rooms: string[];
namespace: string;
}