Skip to content

Commit

Permalink
fix(core): socket pushes not working (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
kkopanidis committed Mar 28, 2022
1 parent a6f5d20 commit d4232ef
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 39 deletions.
1 change: 1 addition & 0 deletions libraries/grpc-sdk/src/classes/Routing/RoutingManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export class RoutingManager {
for (let option in routeObject.options) {
if (!routeObject.options.hasOwnProperty(option)) continue;
if (option === 'middlewares') continue;
if (option === 'path') continue;
routeObject.options[option] = JSON.stringify(routeObject.options[option]);
}
let primary: string;
Expand Down
34 changes: 3 additions & 31 deletions libraries/grpc-sdk/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ export class GrpcError extends Error {
}
}

export type RouterRequest = GrpcRequest<{
params: string;
path: string;
headers: string;
context: string;
}>;
export type ParsedRouterRequest = GrpcRequest<{
params: { [key: string]: any };
path: string;
Expand All @@ -38,45 +32,23 @@ export type UnparsedRouterResponse =
| { [key: string]: any }
| string;

type Response = { result: string; redirect: string };
type ResponseWithResult = { result: string };
type ResponseWithRedirect = { redirect: string };

export type RouterResponse = GrpcResponse<Response | ResponseWithResult | ResponseWithRedirect>;

export type SetConfigRequest = GrpcRequest<{ newConfig: string }>;
export type SetConfigResponse = GrpcResponse<{ updatedConfig: string }>;

export type SocketRequest = GrpcRequest<{
event: string;
socketId: string;
params: string;
context: string;
}>;
export type ParsedSocketRequest = GrpcRequest<{
event: string;
socketId: string;
params: { [key: string]: any };
params: any[];
context: { [key: string]: any };
}>;
export type UnparsedSocketResponse =
| {
event: string;
data: { [key: string]: any };
receivers?: string[];
}
| {
rooms: string[];
}

type EventResponse = {
event: string;
data: string;
data: { [key: string]: any };
receivers?: string[];
};

type JoinRoomResponse = {
rooms: string[];
};

export type SocketResponse = GrpcResponse<EventResponse | JoinRoomResponse>;
export type UnparsedSocketResponse = EventResponse | JoinRoomResponse;
11 changes: 6 additions & 5 deletions modules/chat/src/routes/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ConduitGrpcSdk, {
ParsedRouterRequest,
RoutingManager,
TYPE,
ParsedSocketRequest,
UnparsedRouterResponse,
UnparsedSocketResponse,
} from '@conduitplatform/grpc-sdk';
Expand Down Expand Up @@ -289,16 +290,16 @@ export class ChatRoutes {
return 'Message updated successfully';
}

async connect(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async connect(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const rooms = await ChatRoom.getInstance()
.findMany({ participants: user._id });
return { rooms: (rooms as ChatRoom[]).map((room: any) => room._id) };
}

async onMessage(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessage(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId, message } = call.request.params;
const [ roomId, message ] = call.request.params;
const room = await ChatRoom.getInstance().findOne({ _id: roomId });

if (isNil(room) || !(room as ChatRoom).participants.includes(user._id)) {
Expand All @@ -320,9 +321,9 @@ export class ChatRoutes {
};
}

async onMessagesRead(call: ParsedRouterRequest): Promise<UnparsedSocketResponse> {
async onMessagesRead(call: ParsedSocketRequest): Promise<UnparsedSocketResponse> {
const { user } = call.request.context;
const { roomId } = call.request.params;
const [ roomId ] = call.request.params;
const room = await ChatRoom.getInstance()
.findOne({ _id: roomId });
if (isNil(room) || !(room as ChatRoom).participants.includes(user._id)) {
Expand Down
10 changes: 8 additions & 2 deletions packages/router/src/controllers/Socket/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,15 @@ export class SocketController extends ConduitRouter {
handleSocketPush(push: SocketPush) {
if (isInstanceOfEventResponse(push)) {
if (isNil(push.receivers) || push.receivers!.length === 0) {
this.io.emit(push.event, push.data);
this.io.of(push.namespace).emit(push.event, push.data);
} else {
this.io.to(push.receivers).emit(push.event, push.data);
this.io.of(push.namespace).adapter.fetchSockets({
rooms: new Set(push.receivers)
}).then(sockets=>{
sockets.forEach(r=>{
r.to(push.receivers).emit(push.event, push.data)
})
})
}
} else {
throw new Error('Cannot join room in this context');
Expand Down
3 changes: 2 additions & 1 deletion packages/router/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
);
this.registerRouteMiddleware(r);
} else if (r instanceof ConduitSocket) {
console.log('New socker registered: ' + r.input.path + ' handler url: ' + url);
console.log('New socket registered: ' + r.input.path + ' handler url: ' + url);
this.registerSocket(r);
} else {
console.log(
Expand All @@ -209,6 +209,7 @@ export class ConduitDefaultRouter implements IConduitRouter {
data: JSON.parse(call.request.data),
receivers: call.request.receivers,
rooms: call.request.rooms,
namespace: `/${(call as any).metadata.get('module-name')[0]}/`
};
await this._internalRouter.socketPush(socketData);
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/router/src/models/SocketPush.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export interface SocketPush {
data: any;
receivers: string[];
rooms: string[];
namespace: string;
}

0 comments on commit d4232ef

Please sign in to comment.