Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#RI-4380 BE Upload data in bulk base implementation #1930

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.servic
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { BulkActionsGateway } from 'src/modules/bulk-actions/bulk-actions.gateway';
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
import { BulkImportController } from 'src/modules/bulk-actions/bulk-import.controller';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';

@Module({
controllers: [BulkImportController],
providers: [
BulkActionsGateway,
BulkActionsService,
BulkActionsProvider,
BulkActionsAnalyticsService,
BulkImportService,
],
})
export class BulkActionsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
Body,
ClassSerializerInterceptor,
Controller, Post,
UseInterceptors, UsePipes, ValidationPipe,
} from '@nestjs/common';
import {
ApiConsumes, ApiTags,
} from '@nestjs/swagger';
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
import { FormDataRequest } from 'nestjs-form-data';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { ClientMetadataParam } from 'src/common/decorators';
import { ClientMetadata } from 'src/common/models';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';

@UsePipes(new ValidationPipe({ transform: true }))
@UseInterceptors(ClassSerializerInterceptor)
@ApiTags('Bulk Actions')
@Controller('/bulk-actions/:id')
export class BulkImportController {
constructor(private readonly service: BulkImportService) {}

@Post('import')
@ApiConsumes('multipart/form-data')
@FormDataRequest()
@ApiEndpoint({
description: 'Import data from file',
responses: [
{
type: Object,
},
],
})
async import(
@Body() dto: UploadImportFileDto,
@ClientMetadataParam({
databaseIdParam: 'id',
}) clientMetadata: ClientMetadata,
): Promise<IBulkActionOverview> {
return this.service.import(clientMetadata, dto);
}
}
119 changes: 119 additions & 0 deletions redisinsight/api/src/modules/bulk-actions/bulk-import.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Injectable, Logger } from '@nestjs/common';
import { Readable } from 'stream';
import * as readline from 'readline';
import { wrapHttpError } from 'src/common/utils';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
import { ClientMetadata } from 'src/common/models';
import { splitCliCommandLine } from 'src/utils/cli-helper';
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';

const BATCH_LIMIT = 10_000;

@Injectable()
export class BulkImportService {
private logger = new Logger('BulkImportService');

constructor(
private readonly databaseConnectionService: DatabaseConnectionService,
) {}

private async executeBatch(client, batch: any[]): Promise<BulkActionSummary> {
const result = new BulkActionSummary();
result.addProcessed(batch.length);

try {
if (client?.isCluster) {
await Promise.all(batch.map(async ([command, args]) => {
try {
await client.call(command, args);
result.addSuccess(1);
} catch (e) {
result.addFailed(1);
}
}));
} else {
(await client.pipeline(batch).exec()).forEach(([err]) => {
if (err) {
result.addFailed(1);
} else {
result.addSuccess(1);
}
});
}
} catch (e) {
this.logger.error('Unable to execute batch of commands', e);
result.addFailed(batch.length);
}

return result;
}

/**
* @param clientMetadata
* @param dto
*/
public async import(clientMetadata: ClientMetadata, dto: UploadImportFileDto): Promise<IBulkActionOverview> {
const result: IBulkActionOverview = {
id: 'empty',
databaseId: clientMetadata.databaseId,
type: BulkActionType.Import,
summary: {
processed: 0,
succeed: 0,
failed: 0,
errors: [],
},
progress: null,
filter: null,
status: BulkActionStatus.Completed,
duration: Date.now(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe will be better to create another constant startBulkActionTime or something else, it will be more clear i guess. And in the end result.duration = Date.now() - startBulkActionTime. But it is up to you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. if this will be more easy to understand

};

try {
const client = await this.databaseConnectionService.createClient(clientMetadata);

const stream = Readable.from(dto.file.buffer);
let batch = [];

const batchResults: Promise<BulkActionSummary>[] = [];

await new Promise((res) => {
const rl = readline.createInterface(stream);
rl.on('line', (line) => {
const [command, ...args] = splitCliCommandLine((line));
if (batch.length >= BATCH_LIMIT) {
batchResults.push(this.executeBatch(client, batch));
batch = [];
} else {
batch.push([command.toLowerCase(), args]);
}
});
rl.on('error', (error) => {
result.summary.errors.push(error);
result.status = BulkActionStatus.Failed;
res(null);
});
rl.on('close', () => {
batchResults.push(this.executeBatch(client, batch));
res(null);
});
});

(await Promise.all(batchResults)).forEach((batchResult) => {
result.summary.processed += batchResult.getOverview().processed;
result.summary.succeed += batchResult.getOverview().succeed;
result.summary.failed += batchResult.getOverview().failed;
});

result.duration = Date.now() - result.duration;

return result;
} catch (e) {
this.logger.error('Unable to process an import file', e);
throw wrapHttpError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export enum BulkActionsServerEvents {

export enum BulkActionType {
Delete = 'delete',
Import = 'import',
}

export enum BulkActionStatus {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { ApiPropertyOptional } from '@nestjs/swagger';
import { IsNotEmpty } from 'class-validator';
import {
IsFile, MaxFileSize, MemoryStoredFile,
} from 'nestjs-form-data';

export class UploadImportFileDto {
@ApiPropertyOptional({
type: 'string',
format: 'binary',
description: 'Import file (with list of commands to execute',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you lost close round bracket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

})
@IsNotEmpty()
@IsFile()
@MaxFileSize(100 * 1024 * 1024)
file?: MemoryStoredFile;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ export class BulkActionSummary {
this.succeed += count;
}

addFailed(count: number) {
this.failed += count;
}

addErrors(err: Array<Record<string, string>>) {
if (err.length) {
this.failed += err.length;
Expand Down