Skip to content

Commit

Permalink
Add logging to the content stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dokmic committed Aug 16, 2021
1 parent 1a4c310 commit 9f3e9c2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
5 changes: 4 additions & 1 deletion x-pack/plugins/reporting/server/lib/content_stream.test.ts
Expand Up @@ -7,20 +7,23 @@

import { set } from 'lodash';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
import { createMockLevelLogger } from '../test_helpers';
import { ContentStream } from './content_stream';
import { ExportTypesRegistry } from './export_types_registry';

describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
let exportTypesRegistry: jest.Mocked<ExportTypesRegistry>;
let logger: ReturnType<typeof createMockLevelLogger>;
let stream: ContentStream;

beforeEach(() => {
client = elasticsearchServiceMock.createClusterClient().asInternalUser;
exportTypesRegistry = ({
get: jest.fn(() => ({})),
} as unknown) as typeof exportTypesRegistry;
stream = new ContentStream(client, exportTypesRegistry, {
logger = createMockLevelLogger();
stream = new ContentStream(client, exportTypesRegistry, logger, {
id: 'something',
index: 'somewhere',
});
Expand Down
24 changes: 22 additions & 2 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Expand Up @@ -13,6 +13,7 @@ import type { ElasticsearchClient } from 'src/core/server';
import { ReportingCore } from '..';
import { ReportSource } from '../../common/types';
import { ExportTypesRegistry } from './export_types_registry';
import { LevelLogger } from './level_logger';

/**
* @note The Elasticsearch `http.max_content_length` is including the whole POST body.
Expand Down Expand Up @@ -78,6 +79,7 @@ export class ContentStream extends Duplex {
constructor(
private client: ElasticsearchClient,
private exportTypesRegistry: ExportTypesRegistry,
private logger: LevelLogger,
private document: ContentStreamDocument
) {
super();
Expand Down Expand Up @@ -150,6 +152,8 @@ export class ContentStream extends Duplex {
jobContentEncoding === 'base64'
? ContentStream.getMaxBase64EncodedSize(maxContentSize)
: ContentStream.getMaxJsonEscapedSize(maxContentSize);

this.logger.debug(`Chunk size is ${this.maxChunkSize} bytes.`);
}

return this.maxChunkSize;
Expand All @@ -171,6 +175,8 @@ export class ContentStream extends Duplex {
size: 1,
};

this.logger.debug(`Reading report contents.`);

const response = await this.client.search<ReportSource>({ body, index });
const hits = response?.body.hits?.hits?.[0];

Expand All @@ -196,6 +202,8 @@ export class ContentStream extends Duplex {
size: 1,
};

this.logger.debug(`Reading chunk #${this.chunksRead}.`);

const response = await this.client.search<ChunkSource>({ body, index });
const hits = response?.body.hits?.hits?.[0];

Expand All @@ -210,6 +218,7 @@ export class ContentStream extends Duplex {
try {
const content = this.chunksRead ? await this.readChunk() : await this.readHead();
if (!content) {
this.logger.debug(`Chunk is empty.`);
this.push(null);
return;
}
Expand All @@ -221,6 +230,7 @@ export class ContentStream extends Duplex {
this.bytesRead += buffer.byteLength;

if (this.isRead()) {
this.logger.debug(`Read ${this.bytesRead} of ${this.jobSize} bytes.`);
this.push(null);
}
} catch (error) {
Expand All @@ -242,6 +252,8 @@ export class ContentStream extends Duplex {
}

private async writeHead(content: string) {
this.logger.debug(`Updating report contents.`);

const { body } = await this.client.update<ReportSource>({
...this.document,
body: {
Expand All @@ -258,6 +270,8 @@ export class ContentStream extends Duplex {
const { id: parentId, index } = this.document;
const id = this.puid.generate();

this.logger.debug(`Writing chunk #${this.chunksWritten} (${id}).`);

await this.client.index<ChunkSource>({
id,
index,
Expand Down Expand Up @@ -330,6 +344,12 @@ export class ContentStream extends Duplex {
export async function getContentStream(reporting: ReportingCore, document: ContentStreamDocument) {
const { asInternalUser: client } = await reporting.getEsClient();
const exportTypesRegistry = reporting.getExportTypesRegistry();

return new ContentStream(client, exportTypesRegistry, document);
const { logger } = reporting.getPluginSetupDeps();

return new ContentStream(
client,
exportTypesRegistry,
logger.clone(['content_stream', document.id]),
document
);
}

0 comments on commit 9f3e9c2

Please sign in to comment.