Skip to content

Commit

Permalink
Merge pull request #1930 from RedisInsight/be/feature/RI-4290-upload_…
Browse files Browse the repository at this point in the history
…data_in_bulk

#RI-4380 BE Upload data in bulk base implementation
  • Loading branch information
arthosofteq committed Apr 12, 2023
2 parents 31a1c8e + 0f40f21 commit 621bc95
Show file tree
Hide file tree
Showing 10 changed files with 527 additions and 2 deletions.
7 changes: 5 additions & 2 deletions redisinsight/api/src/__mocks__/redis.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import IORedis from 'ioredis';

const getRedisCommanderMockFunctions = () => ({
export const mockIORedisClientExec = jest.fn();
const getRedisCommanderMockFunctions = jest.fn(() => ({
sendCommand: jest.fn(),
info: jest.fn(),
monitor: jest.fn(),
Expand All @@ -12,9 +13,11 @@ const getRedisCommanderMockFunctions = () => ({
unsubscribe: jest.fn(),
punsubscribe: jest.fn(),
publish: jest.fn(),
pipeline: jest.fn().mockReturnThis(),
exec: mockIORedisClientExec,
cluster: jest.fn(),
quit: jest.fn(),
});
}));

export const mockIORedisClient = {
...Object.create(IORedis.prototype),
Expand Down
5 changes: 5 additions & 0 deletions redisinsight/api/src/app.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { SlowLogModule } from 'src/modules/slow-log/slow-log.module';
import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module';
import { ClusterMonitorModule } from 'src/modules/cluster-monitor/cluster-monitor.module';
import { DatabaseAnalysisModule } from 'src/modules/database-analysis/database-analysis.module';
import { BulkActionsModule } from 'src/modules/bulk-actions/bulk-actions.module';

export const routes: Routes = [
{
Expand Down Expand Up @@ -39,6 +40,10 @@ export const routes: Routes = [
path: '/:dbInstance',
module: DatabaseAnalysisModule,
},
{
path: '/:dbInstance',
module: BulkActionsModule,
},
],
},
];
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.servic
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { BulkActionsGateway } from 'src/modules/bulk-actions/bulk-actions.gateway';
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
import { BulkImportController } from 'src/modules/bulk-actions/bulk-import.controller';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';

@Module({
controllers: [BulkImportController],
providers: [
BulkActionsGateway,
BulkActionsService,
BulkActionsProvider,
BulkActionsAnalyticsService,
BulkImportService,
],
})
export class BulkActionsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
Body,
ClassSerializerInterceptor,
Controller, HttpCode, Post,
UseInterceptors, UsePipes, ValidationPipe,
} from '@nestjs/common';
import {
ApiConsumes, ApiTags,
} from '@nestjs/swagger';
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
import { FormDataRequest } from 'nestjs-form-data';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { ClientMetadataParam } from 'src/common/decorators';
import { ClientMetadata } from 'src/common/models';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';

@UsePipes(new ValidationPipe({ transform: true }))
@UseInterceptors(ClassSerializerInterceptor)
@ApiTags('Bulk Actions')
@Controller('/bulk-actions')
export class BulkImportController {
constructor(private readonly service: BulkImportService) {}

@Post('import')
@ApiConsumes('multipart/form-data')
@HttpCode(200)
@FormDataRequest()
@ApiEndpoint({
description: 'Import data from file',
responses: [
{
type: Object,
},
],
})
async import(
@Body() dto: UploadImportFileDto,
@ClientMetadataParam() clientMetadata: ClientMetadata,
): Promise<IBulkActionOverview> {
return this.service.import(clientMetadata, dto);
}
}
175 changes: 175 additions & 0 deletions redisinsight/api/src/modules/bulk-actions/bulk-import.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { Test, TestingModule } from '@nestjs/testing';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
import {
mockClientMetadata,
mockDatabaseConnectionService,
mockIORedisClient,
mockIORedisCluster, MockType
} from 'src/__mocks__';
import { MemoryStoredFile } from 'nestjs-form-data';
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';
import { NotFoundException } from '@nestjs/common';

const generateNCommandsBuffer = (n: number) => Buffer.from(
(new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]).join('\n'),
);
const generateNBatchCommands = (n: number) => (new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]);
const generateNBatchCommandsResults = (n: number) => (new Array(n)).fill(1).map(() => [null, 'OK']);
const mockBatchCommands = generateNBatchCommands(100);
const mockBatchCommandsResult = generateNBatchCommandsResults(100);
const mockBatchCommandsResultWithErrors = [...(new Array(99)).fill(1).map(() => [null, 'OK']), ['ReplyError']];
const mockSummary: BulkActionSummary = Object.assign(new BulkActionSummary(), {
processed: 100,
succeed: 100,
failed: 0,
errors: [],
});

const mockSummaryWithErrors = Object.assign(new BulkActionSummary(), {
processed: 100,
succeed: 99,
failed: 1,
errors: [],
});

const mockImportResult: IBulkActionOverview = {
id: 'empty',
databaseId: mockClientMetadata.databaseId,
type: BulkActionType.Import,
summary: mockSummary.getOverview(),
progress: null,
filter: null,
status: BulkActionStatus.Completed,
duration: 100,
};

const mockUploadImportFileDto = {
file: {
originalname: 'filename',
size: 1,
buffer: Buffer.from('SET foo bar'),
} as unknown as MemoryStoredFile,
};

describe('BulkImportService', () => {
let service: BulkImportService;
let databaseConnectionService: MockType<DatabaseConnectionService>;

beforeEach(async () => {
jest.clearAllMocks();

const module: TestingModule = await Test.createTestingModule({
providers: [
BulkImportService,
{
provide: DatabaseConnectionService,
useFactory: mockDatabaseConnectionService,
},
],
}).compile();

service = module.get(BulkImportService);
databaseConnectionService = module.get(DatabaseConnectionService);
});

describe('executeBatch', () => {
it('should execute batch in pipeline for standalone', async () => {
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResult);
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummary);
});
it('should execute batch in pipeline for standalone with errors', async () => {
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResultWithErrors);
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummaryWithErrors);
});
it('should return all failed in case of global error', async () => {
mockIORedisClient.exec.mockRejectedValueOnce(new Error());
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual({
...mockSummary.getOverview(),
succeed: 0,
failed: mockSummary.getOverview().processed,
});
});
it('should execute batch of commands without pipeline for cluster', async () => {
mockIORedisCluster.call.mockRejectedValueOnce(new Error());
mockIORedisCluster.call.mockResolvedValue('OK');
expect(await service['executeBatch'](mockIORedisCluster, mockBatchCommands)).toEqual(mockSummaryWithErrors);
});
});

describe('import', () => {
let spy;

beforeEach(() => {
spy = jest.spyOn(service as any, 'executeBatch');
});

it('should import data', async () => {
spy.mockResolvedValue(mockSummary);
expect(await service.import(mockClientMetadata, mockUploadImportFileDto)).toEqual({
...mockImportResult,
duration: jasmine.anything(),
});
});

it('should import data (100K) from file in batches 10K each', async () => {
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
processed: 10_000,
succeed: 10_000,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: generateNCommandsBuffer(100_000),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 100_000,
succeed: 100_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should not import any data due to parse error', async () => {
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
processed: 0,
succeed: 0,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: Buffer.from('{"incorrectdata"}\n{"incorrectdata"}'),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 2,
succeed: 0,
failed: 2,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should throw an error in case of global error', async () => {
try {
databaseConnectionService.createClient.mockRejectedValueOnce(new NotFoundException());

await service.import(mockClientMetadata, mockUploadImportFileDto);

fail();
} catch (e) {
expect(e).toBeInstanceOf(NotFoundException);
}
});
});
});
Loading

0 comments on commit 621bc95

Please sign in to comment.