Skip to content

Commit

Permalink
Merge pull request #3314 from RedisInsight/be/feature/CR-28
Browse files Browse the repository at this point in the history
Be/feature/CR-28
  • Loading branch information
nelsonwellsredis committed May 30, 2024
2 parents f37e9ee + bc7bf6a commit c41bf39
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 83 deletions.
3 changes: 3 additions & 0 deletions redisinsight/api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { AppModule } from './app.module';
import SWAGGER_CONFIG from '../config/swagger';
import LOGGER_CONFIG from '../config/logger';
import { createHttpOptions } from './utils/createHttpOptions';
import { SessionMetadataAdapter } from './modules/auth/session-metadata/adapters/session-metadata.adapter';

const serverConfig = get('server') as Config['server'];

Expand Down Expand Up @@ -68,6 +69,8 @@ export default async function bootstrap(apiPort?: number): Promise<IApp> {
app.useWebSocketAdapter(new WindowsAuthAdapter(app));
}

app.useWebSocketAdapter(new SessionMetadataAdapter(app));

const logFileProvider = app.get(LogFileProvider);

await app.listen(apiPort || port, host);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* eslint-disable no-param-reassign */
import { Injectable, Logger } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable } from 'rxjs';
import { Socket } from 'socket.io';
import { DEFAULT_SESSION_ID, DEFAULT_USER_ID } from 'src/common/constants';
import { SessionMetadata } from 'src/common/models';

@Injectable()
export class SessionMetadataAdapter extends IoAdapter {
private logger = new Logger('SessionMetadataAdapter');

async bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: any) => Observable<any>,
) {
const sessionMetadata: SessionMetadata = {
userId: DEFAULT_USER_ID,
sessionId: DEFAULT_SESSION_ID,
};

socket.data['sessionMetadata'] = sessionMetadata;

super.bindMessageHandlers(socket, handlers, transform);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { ExecutionContext, createParamDecorator } from '@nestjs/common';
import { SessionMetadata } from 'src/common/models';

export const WSSessionMetadata = createParamDecorator(
(data: unknown, ctx: ExecutionContext): SessionMetadata => {
const socket = ctx.switchToWs().getClient();
return socket.data.sessionMetadata;
},
);
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { INestApplication, Logger } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { BaseWsInstance, MessageMappingProperties } from '@nestjs/websockets';
import { MessageMappingProperties } from '@nestjs/websockets';
import { get } from 'lodash';
import { Observable } from 'rxjs';
import { Socket } from 'socket.io';
Expand All @@ -10,6 +10,7 @@ import { WindowAuthService } from '../window-auth.service';

export class WindowsAuthAdapter extends IoAdapter {
private windowAuthService: WindowAuthService;

private logger = new Logger('WindowsAuthAdapter');

constructor(private app: INestApplication) {
Expand All @@ -30,6 +31,6 @@ export class WindowsAuthAdapter extends IoAdapter {
return;
}

return super.bindMessageHandlers(socket, handlers, transform);
super.bindMessageHandlers(socket, handlers, transform);
}
}
21 changes: 16 additions & 5 deletions redisinsight/api/src/modules/bulk-actions/bulk-actions.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ import { CreateBulkActionDto } from 'src/modules/bulk-actions/dto/create-bulk-ac
import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.service';
import { AckWsExceptionFilter } from 'src/modules/pub-sub/filters/ack-ws-exception.filter';
import { BulkActionIdDto } from 'src/modules/bulk-actions/dto/bulk-action-id.dto';
import { SessionMetadata } from 'src/common/models';
import { WSSessionMetadata } from 'src/modules/auth/session-metadata/decorators/ws-session-metadata.decorator';

const SOCKETS_CONFIG = config.get('sockets');

@UsePipes(new ValidationPipe({ transform: true }))
@UseFilters(AckWsExceptionFilter)
@WebSocketGateway({ path: SOCKETS_CONFIG.path, namespace: 'bulk-actions', cors: SOCKETS_CONFIG.cors, serveClient: SOCKETS_CONFIG.serveClient })
@WebSocketGateway({
path: SOCKETS_CONFIG.path,
namespace: 'bulk-actions',
cors: SOCKETS_CONFIG.cors,
serveClient: SOCKETS_CONFIG.serveClient,
})
export class BulkActionsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() wss: Server;

Expand All @@ -33,19 +40,23 @@ export class BulkActionsGateway implements OnGatewayConnection, OnGatewayDisconn
) {}

@SubscribeMessage(BulkActionsServerEvents.Create)
create(@ConnectedSocket() socket: Socket, @Body() dto: CreateBulkActionDto) {
create(
@WSSessionMetadata() sessionMetadata: SessionMetadata,
@ConnectedSocket() socket: Socket,
@Body() dto: CreateBulkActionDto,
) {
this.logger.log('Creating new bulk action.');
return this.service.create(dto, socket);
return this.service.create(sessionMetadata, dto, socket);
}

@SubscribeMessage(BulkActionsServerEvents.Get)
get(@Body() dto: BulkActionIdDto) {
get(@WSSessionMetadata() sessionMetadata: SessionMetadata, @Body() dto: BulkActionIdDto) {
this.logger.log('Subscribing to bulk action.');
return this.service.get(dto);
}

@SubscribeMessage(BulkActionsServerEvents.Abort)
abort(@Body() dto: BulkActionIdDto) {
abort(@WSSessionMetadata() sessionMetadata: SessionMetadata, @Body() dto: BulkActionIdDto) {
this.logger.log('Aborting bulk action.');
return this.service.abort(dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import {
MockType,
mockBulkActionsAnalytics,
mockSessionMetadata,
} from 'src/__mocks__';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { RedisDataType } from 'src/modules/browser/keys/dto';
Expand Down Expand Up @@ -76,7 +77,7 @@ describe('BulkActionsService', () => {

describe('create', () => {
it('should create and return overview', async () => {
expect(await service.create(mockCreateBulkActionDto, mockSocket1)).toEqual(mockOverview);
expect(await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1)).toEqual(mockOverview);
expect(bulkActionProvider.create).toHaveBeenCalledTimes(1);
expect(analyticsService.sendActionStarted).toHaveBeenCalledTimes(1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-act
import { CreateBulkActionDto } from 'src/modules/bulk-actions/dto/create-bulk-action.dto';
import { BulkActionIdDto } from 'src/modules/bulk-actions/dto/bulk-action-id.dto';
import { BulkActionsAnalytics } from 'src/modules/bulk-actions/bulk-actions.analytics';
import { SessionMetadata } from 'src/common/models';

@Injectable()
export class BulkActionsService {
Expand All @@ -12,8 +13,8 @@ export class BulkActionsService {
private readonly analytics: BulkActionsAnalytics,
) {}

async create(dto: CreateBulkActionDto, socket: Socket) {
const bulkAction = await this.bulkActionsProvider.create(dto, socket);
async create(sessionMetadata: SessionMetadata, dto: CreateBulkActionDto, socket: Socket) {
const bulkAction = await this.bulkActionsProvider.create(sessionMetadata, dto, socket);
const overview = bulkAction.getOverview();

this.analytics.sendActionStarted(overview);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import {
mockBulkActionsAnalytics,
mockDatabaseClientFactory,
mockSessionMetadata,
} from 'src/__mocks__';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { RedisDataType } from 'src/modules/browser/keys/dto';
Expand Down Expand Up @@ -60,27 +61,27 @@ describe('BulkActionsProvider', () => {
it('should create only once with the same id', async () => {
expect(service['bulkActions'].size).toEqual(0);

const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);

expect(bulkAction).toBeInstanceOf(BulkAction);
expect(service['bulkActions'].size).toEqual(1);

try {
await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
fail();
} catch (e) {
expect(e.message).toEqual('You already have bulk action with such id');
}

expect(service['bulkActions'].size).toEqual(1);

await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);
});
it('should fail when unsupported runner class', async () => {
try {
await service.create({
await service.create(mockSessionMetadata, {
...mockCreateBulkActionDto,
type: undefined,
}, mockSocket1);
Expand All @@ -92,8 +93,8 @@ describe('BulkActionsProvider', () => {
});
describe('get', () => {
it('should get by id', async () => {
const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);

Expand All @@ -113,8 +114,8 @@ describe('BulkActionsProvider', () => {
});
describe('abort', () => {
it('should abort by id and remove', async () => {
const bulkAction = await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
const bulkAction = await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);

expect(service['bulkActions'].size).toEqual(2);

Expand All @@ -136,9 +137,9 @@ describe('BulkActionsProvider', () => {
});
describe('abortUsersBulkActions', () => {
it('should abort all users bulk actions', async () => {
await service.create(mockCreateBulkActionDto, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create({ ...mockCreateBulkActionDto, id: 'new one 2' }, mockSocket2);
await service.create(mockSessionMetadata, mockCreateBulkActionDto, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one' }, mockSocket1);
await service.create(mockSessionMetadata, { ...mockCreateBulkActionDto, id: 'new one 2' }, mockSocket2);

expect(service['bulkActions'].size).toEqual(3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
DeleteBulkActionSimpleRunner,
} from 'src/modules/bulk-actions/models/runners/simple/delete.bulk-action.simple.runner';
import { BulkActionsAnalytics } from 'src/modules/bulk-actions/bulk-actions.analytics';
import { ClientContext } from 'src/common/models';
import { ClientContext, SessionMetadata } from 'src/common/models';
import { DatabaseClientFactory } from 'src/modules/database/providers/database.client.factory';

@Injectable()
Expand All @@ -28,7 +28,7 @@ export class BulkActionsProvider {
* @param dto
* @param socket
*/
async create(dto: CreateBulkActionDto, socket: Socket): Promise<BulkAction> {
async create(sessionMetadata: SessionMetadata, dto: CreateBulkActionDto, socket: Socket): Promise<BulkAction> {
if (this.bulkActions.get(dto.id)) {
throw new Error('You already have bulk action with such id');
}
Expand All @@ -40,10 +40,7 @@ export class BulkActionsProvider {
// todo: add multi user support
// todo: use own client and close it after
const client = await this.databaseClientFactory.getOrCreateClient({
sessionMetadata: {
userId: '1',
sessionId: '1',
},
sessionMetadata,
databaseId: dto.databaseId,
context: ClientContext.Common,
db: dto.db,
Expand Down
17 changes: 9 additions & 8 deletions redisinsight/api/src/modules/cloud/job/cloud-job.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Socket, Server } from 'socket.io';
import {
ConnectedSocket,
SubscribeMessage,
WebSocketGateway,
WebSocketServer, WsException,
} from '@nestjs/websockets';
import {
Body,
Logger, ValidationPipe,
} from '@nestjs/common';
import config from 'src/utils/config';
Expand All @@ -13,8 +15,9 @@ import { CloudJobService } from 'src/modules/cloud/job/cloud-job.service';
import { MonitorCloudJobDto } from 'src/modules/cloud/job/dto/monitor.cloud-job.dto';
import { Validator } from 'class-validator';
import { plainToClass } from 'class-transformer';
import { DEFAULT_SESSION_ID, DEFAULT_USER_ID } from 'src/common/constants';
import { CloudJobInfo } from 'src/modules/cloud/job/models';
import { SessionMetadata } from 'src/common/models';
import { WSSessionMetadata } from 'src/modules/auth/session-metadata/decorators/ws-session-metadata.decorator';

const SOCKETS_CONFIG = config.get('sockets');

Expand All @@ -33,7 +36,11 @@ export class CloudJobGateway {
) {}

@SubscribeMessage(CloudJobEvents.Monitor)
async monitor(client: Socket, data: MonitorCloudJobDto): Promise<CloudJobInfo> {
async monitor(
@WSSessionMetadata() sessionMetadata: SessionMetadata,
@ConnectedSocket() client: Socket,
@Body() data: MonitorCloudJobDto,
): Promise<CloudJobInfo> {
try {
const dto = plainToClass(MonitorCloudJobDto, data);

Expand All @@ -46,12 +53,6 @@ export class CloudJobGateway {
throw this.exceptionFactory(errors);
}

// todo: implement session handling for entire app
const sessionMetadata = {
userId: DEFAULT_USER_ID,
sessionId: DEFAULT_SESSION_ID,
};

return await this.cloudJobService.monitorJob(sessionMetadata, dto, client);
} catch (error) {
this.logger.error('Unable to add listener', error);
Expand Down
32 changes: 21 additions & 11 deletions redisinsight/api/src/modules/profiler/profiler.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
import { get } from 'lodash';
import { Socket, Server } from 'socket.io';
import {
ConnectedSocket,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsException,
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Body, Logger } from '@nestjs/common';
import { MonitorSettings } from 'src/modules/profiler/models/monitor-settings';
import { ProfilerClientEvents } from 'src/modules/profiler/constants';
import { ProfilerService } from 'src/modules/profiler/profiler.service';
import config from 'src/utils/config';
import { ConstantsProvider } from 'src/modules/constants/providers/constants.provider';
import { WSSessionMetadata } from 'src/modules/auth/session-metadata/decorators/ws-session-metadata.decorator';
import config, { Config } from 'src/utils/config';
import { SessionMetadata } from 'src/common/models';

const SOCKETS_CONFIG = config.get('sockets');
const SOCKETS_CONFIG = config.get('sockets') as Config['sockets'];

@WebSocketGateway({ path: SOCKETS_CONFIG.path, namespace: 'monitor', cors: SOCKETS_CONFIG.cors, serveClient: SOCKETS_CONFIG.serveClient })
@WebSocketGateway({
path: SOCKETS_CONFIG.path,
namespace: 'monitor',
cors: SOCKETS_CONFIG.cors,
serveClient: SOCKETS_CONFIG.serveClient,
})
export class ProfilerGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() wss: Server;

private logger: Logger = new Logger('MonitorGateway');

constructor(
private service: ProfilerService,
private readonly constantsProvider: ConstantsProvider,
) {}

@SubscribeMessage(ProfilerClientEvents.Monitor)
async monitor(client: Socket, settings: MonitorSettings = null): Promise<any> {
async monitor(
@WSSessionMetadata() sessionMetadata: SessionMetadata,
@ConnectedSocket() client: Socket,
@Body() settings: MonitorSettings = null,
): Promise<any> {
try {
const sessionMetadata = this.constantsProvider.getSystemSessionMetadata(); // todo: [USER_CONTEXT]

await this.service.addListenerForInstance(
sessionMetadata,
ProfilerGateway.getInstanceId(client),
Expand All @@ -47,7 +55,7 @@ export class ProfilerGateway implements OnGatewayConnection, OnGatewayDisconnect
}

@SubscribeMessage(ProfilerClientEvents.Pause)
async pause(client: Socket): Promise<any> {
async pause(@WSSessionMetadata() sessionMetadata: SessionMetadata, @ConnectedSocket() client: Socket): Promise<any> {
try {
await this.service.removeListenerFromInstance(ProfilerGateway.getInstanceId(client), client.id);
return { status: 'ok' };
Expand All @@ -58,7 +66,9 @@ export class ProfilerGateway implements OnGatewayConnection, OnGatewayDisconnect
}

@SubscribeMessage(ProfilerClientEvents.FlushLogs)
async flushLogs(client: Socket): Promise<any> {
async flushLogs(
@WSSessionMetadata() sessionMetadata: SessionMetadata, @ConnectedSocket() client: Socket,
): Promise<any> {
try {
await this.service.flushLogs(client.id);
return { status: 'ok' };
Expand Down
Loading

0 comments on commit c41bf39

Please sign in to comment.