Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be/feature/CR-28 #3314

Merged
merged 11 commits into from
May 30, 2024
3 changes: 3 additions & 0 deletions redisinsight/api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { AppModule } from './app.module';
import SWAGGER_CONFIG from '../config/swagger';
import LOGGER_CONFIG from '../config/logger';
import { createHttpOptions } from './utils/createHttpOptions';
import { SessionMetadataAdapter } from './modules/auth/session-metadata/adapters/session-metadata.adapter';

const serverConfig = get('server') as Config['server'];

Expand Down Expand Up @@ -68,6 +69,8 @@ export default async function bootstrap(apiPort?: number): Promise<IApp> {
app.useWebSocketAdapter(new WindowsAuthAdapter(app));
}

app.useWebSocketAdapter(new SessionMetadataAdapter(app));

const logFileProvider = app.get(LogFileProvider);

await app.listen(apiPort || port, host);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* eslint-disable no-param-reassign */
import { INestApplication, Injectable, Logger } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable } from 'rxjs';
import { Socket } from 'socket.io';

@Injectable()
export class SessionMetadataAdapter extends IoAdapter {
private logger = new Logger('SessionMetadataAdapter');

constructor(private app: INestApplication) {
super(app);
}

async bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
// TODO: [USER_CONTEXT]
// Assuming that we'll figure out session data from a JWT token
// but that is up to discovery later on
// eslint-disable-next-line no-param-reassign
const jwt = socket.handshake.headers.authorization?.split(' ')[1];
socket.data['jwt'] = jwt || null;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must calculate (basically hardcode) SessionMetadata here and put into socket.data
For other builds we will have own Adapter with own custom logic (like validation or whatever is needed for any input) but as result of this adapter we must put SessionMetadata into socket.data field

super.bindMessageHandlers(socket, handlers, transform);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { ExecutionContext, createParamDecorator } from '@nestjs/common';
import { SessionMetadata } from 'src/common/models';
import { DEFAULT_SESSION_ID, DEFAULT_USER_ID } from 'src/common/constants';

export const WSSessionMetadata = createParamDecorator(
(data: unknown, ctx: ExecutionContext): SessionMetadata => {
const socket = ctx.switchToWs().getClient();
// TODO: [USER_CONTEXT] - decode the JWT and return a session metadata for the user
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { jwt } = socket.data;
return {
userId: DEFAULT_SESSION_ID,
sessionId: DEFAULT_USER_ID,
};
},
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get SessionMetadata from socket.data. I don't think we need extra logic here in decorator

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { INestApplication, Logger } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { BaseWsInstance, MessageMappingProperties } from '@nestjs/websockets';
import { MessageMappingProperties } from '@nestjs/websockets';
import { get } from 'lodash';
import { Observable } from 'rxjs';
import { Socket } from 'socket.io';
Expand All @@ -10,6 +10,7 @@ import { WindowAuthService } from '../window-auth.service';

export class WindowsAuthAdapter extends IoAdapter {
private windowAuthService: WindowAuthService;

private logger = new Logger('WindowsAuthAdapter');

constructor(private app: INestApplication) {
Expand All @@ -30,6 +31,6 @@ export class WindowsAuthAdapter extends IoAdapter {
return;
}

return super.bindMessageHandlers(socket, handlers, transform);
super.bindMessageHandlers(socket, handlers, transform);
}
}
21 changes: 16 additions & 5 deletions redisinsight/api/src/modules/bulk-actions/bulk-actions.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ import { CreateBulkActionDto } from 'src/modules/bulk-actions/dto/create-bulk-ac
import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.service';
import { AckWsExceptionFilter } from 'src/modules/pub-sub/filters/ack-ws-exception.filter';
import { BulkActionIdDto } from 'src/modules/bulk-actions/dto/bulk-action-id.dto';
import { SessionMetadata } from 'src/common/models';
import { WSSessionMetadata } from 'src/modules/auth/session-metadata/decorators/ws-session-metadata.decorator';

const SOCKETS_CONFIG = config.get('sockets');

@UsePipes(new ValidationPipe({ transform: true }))
@UseFilters(AckWsExceptionFilter)
@WebSocketGateway({ path: SOCKETS_CONFIG.path, namespace: 'bulk-actions', cors: SOCKETS_CONFIG.cors, serveClient: SOCKETS_CONFIG.serveClient })
@WebSocketGateway({
path: SOCKETS_CONFIG.path,
namespace: 'bulk-actions',
cors: SOCKETS_CONFIG.cors,
serveClient: SOCKETS_CONFIG.serveClient,
})
export class BulkActionsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() wss: Server;

Expand All @@ -33,19 +40,23 @@ export class BulkActionsGateway implements OnGatewayConnection, OnGatewayDisconn
) {}

@SubscribeMessage(BulkActionsServerEvents.Create)
create(@ConnectedSocket() socket: Socket, @Body() dto: CreateBulkActionDto) {
create(
@WSSessionMetadata() sessionMetadata: SessionMetadata,
@ConnectedSocket() socket: Socket,
@Body() dto: CreateBulkActionDto,
) {
this.logger.log('Creating new bulk action.');
return this.service.create(dto, socket);
return this.service.create(sessionMetadata, dto, socket);
}

@SubscribeMessage(BulkActionsServerEvents.Get)
get(@Body() dto: BulkActionIdDto) {
get(@WSSessionMetadata() sessionMetadata: SessionMetadata, @Body() dto: BulkActionIdDto) {
this.logger.log('Subscribing to bulk action.');
return this.service.get(dto);
}

@SubscribeMessage(BulkActionsServerEvents.Abort)
abort(@Body() dto: BulkActionIdDto) {
abort(@WSSessionMetadata() sessionMetadata: SessionMetadata, @Body() dto: BulkActionIdDto) {
this.logger.log('Aborting bulk action.');
return this.service.abort(dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import {
MockType,
mockBulkActionsAnalytics,
mockSessionMetadata,
} from 'src/__mocks__';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { RedisDataType } from 'src/modules/browser/keys/dto';
Expand Down Expand Up @@ -76,7 +77,7 @@ describe('BulkActionsService', () => {

describe('create', () => {
it('should create and return overview', async () => {
expect(await service.create(mockCreateBulkActionDto, mockSocket1)).toEqual(mockOverview);
expect(await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1)).toEqual(mockOverview);
expect(bulkActionProvider.create).toHaveBeenCalledTimes(1);
expect(analyticsService.sendActionStarted).toHaveBeenCalledTimes(1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-act
import { CreateBulkActionDto } from 'src/modules/bulk-actions/dto/create-bulk-action.dto';
import { BulkActionIdDto } from 'src/modules/bulk-actions/dto/bulk-action-id.dto';
import { BulkActionsAnalytics } from 'src/modules/bulk-actions/bulk-actions.analytics';
import { SessionMetadata } from 'src/common/models';

@Injectable()
export class BulkActionsService {
Expand All @@ -12,8 +13,8 @@ export class BulkActionsService {
private readonly analytics: BulkActionsAnalytics,
) {}

async create(dto: CreateBulkActionDto, socket: Socket) {
const bulkAction = await this.bulkActionsProvider.create(dto, socket);
async create(sessionMetadata: SessionMetadata, dto: CreateBulkActionDto, socket: Socket) {
const bulkAction = await this.bulkActionsProvider.create(sessionMetadata, dto, socket);
const overview = bulkAction.getOverview();

this.analytics.sendActionStarted(overview);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import {
mockBulkActionsAnalytics,
mockDatabaseClientFactory,
mockSessionMetadata,
} from 'src/__mocks__';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { RedisDataType } from 'src/modules/browser/keys/dto';
Expand Down Expand Up @@ -60,27 +61,27 @@ describe('BulkActionsProvider', () => {
it('should create only once with the same id', async () => {
expect(service['bulkActions'].size).toEqual(0);

const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);

expect(bulkAction).toBeInstanceOf(BulkAction);
expect(service['bulkActions'].size).toEqual(1);

try {
await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
fail();
} catch (e) {
expect(e.message).toEqual('You already have bulk action with such id');
}

expect(service['bulkActions'].size).toEqual(1);

await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);
});
it('should fail when unsupported runner class', async () => {
try {
await service.create({
await service.create(mockSessionMetadata, {
...mockCreateBulkActionDto,
type: undefined,
}, mockSocket1);
Expand All @@ -92,8 +93,8 @@ describe('BulkActionsProvider', () => {
});
describe('get', () => {
it('should get by id', async () => {
const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);

Expand All @@ -113,8 +114,8 @@ describe('BulkActionsProvider', () => {
});
describe('abort', () => {
it('should abort by id and remove', async () => {
const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);

Expand All @@ -136,9 +137,9 @@ describe('BulkActionsProvider', () => {
});
describe('abortUsersBulkActions', () => {
it('should abort all users bulk actions', async () => {
await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one 2' }, mockSocket2);
await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one 2' }, mockSocket2);

expect(service['bulkActions'].size).toEqual(3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
DeleteBulkActionSimpleRunner,
} from 'src/modules/bulk-actions/models/runners/simple/delete.bulk-action.simple.runner';
import { BulkActionsAnalytics } from 'src/modules/bulk-actions/bulk-actions.analytics';
import { ClientContext } from 'src/common/models';
import { ClientContext, SessionMetadata } from 'src/common/models';
import { DatabaseClientFactory } from 'src/modules/database/providers/database.client.factory';

@Injectable()
Expand All @@ -28,7 +28,7 @@ export class BulkActionsProvider {
* @param dto
* @param socket
*/
async create(dto: CreateBulkActionDto, socket: Socket): Promise<BulkAction> {
async create(sessionMetadata: SessionMetadata, dto: CreateBulkActionDto, socket: Socket): Promise<BulkAction> {
if (this.bulkActions.get(dto.id)) {
throw new Error('You already have bulk action with such id');
}
Expand All @@ -40,10 +40,7 @@ export class BulkActionsProvider {
// todo: add multi user support
// todo: use own client and close it after
const client = await this.databaseClientFactory.getOrCreateClient({
sessionMetadata: {
userId: '1',
sessionId: '1',
},
sessionMetadata,
databaseId: dto.databaseId,
context: ClientContext.Common,
db: dto.db,
Expand Down
17 changes: 9 additions & 8 deletions redisinsight/api/src/modules/cloud/job/cloud-job.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Socket, Server } from 'socket.io';
import {
ConnectedSocket,
SubscribeMessage,
WebSocketGateway,
WebSocketServer, WsException,
} from '@nestjs/websockets';
import {
Body,
Logger, ValidationPipe,
} from '@nestjs/common';
import config from 'src/utils/config';
Expand All @@ -13,8 +15,9 @@ import { CloudJobService } from 'src/modules/cloud/job/cloud-job.service';
import { MonitorCloudJobDto } from 'src/modules/cloud/job/dto/monitor.cloud-job.dto';
import { Validator } from 'class-validator';
import { plainToClass } from 'class-transformer';
import { DEFAULT_SESSION_ID, DEFAULT_USER_ID } from 'src/common/constants';
import { CloudJobInfo } from 'src/modules/cloud/job/models';
import { SessionMetadata } from 'src/common/models';
import { WSSessionMetadata } from 'src/modules/auth/session-metadata/decorators/ws-session-metadata.decorator';

const SOCKETS_CONFIG = config.get('sockets');

Expand All @@ -33,7 +36,11 @@ export class CloudJobGateway {
) {}

@SubscribeMessage(CloudJobEvents.Monitor)
async monitor(client: Socket, data: MonitorCloudJobDto): Promise<CloudJobInfo> {
async monitor(
@WSSessionMetadata() sessionMetadata: SessionMetadata,
@ConnectedSocket() client: Socket,
@Body() data: MonitorCloudJobDto,
): Promise<CloudJobInfo> {
try {
const dto = plainToClass(MonitorCloudJobDto, data);

Expand All @@ -46,12 +53,6 @@ export class CloudJobGateway {
throw this.exceptionFactory(errors);
}

// todo: implement session handling for entire app
const sessionMetadata = {
userId: DEFAULT_USER_ID,
sessionId: DEFAULT_SESSION_ID,
};

return await this.cloudJobService.monitorJob(sessionMetadata, dto, client);
} catch (error) {
this.logger.error('Unable to add listener', error);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Socket, Server } from 'socket.io';
import {
ConnectedSocket,
OnGatewayConnection,
OnGatewayDisconnect,
WebSocketGateway,
Expand Down
Loading