Skip to content

Commit

Permalink
only pick kafka input format by default when needed (#16180)
Browse files Browse the repository at this point in the history
  • Loading branch information
vogievetsky committed Apr 1, 2024
1 parent a818b8a commit 06268bf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
31 changes: 31 additions & 0 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
cleanSpec,
guessColumnTypeFromInput,
guessColumnTypeFromSampleResponse,
guessKafkaInputFormat,
guessSimpleInputFormat,
updateSchemaWithSample,
upgradeSpec,
Expand Down Expand Up @@ -669,6 +670,36 @@ describe('ingestion-spec', () => {
});
});
});

describe('guessKafkaInputFormat', () => {
const sample = [
{
'kafka.timestamp': 1710962988515,
'kafka.topic': 'kttm2',
'raw':
'{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South America","country":"Argentina","region":"Santa Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"http://www.koalastothemax.com/","loaded_image":"http://www.koalastothemax.com/img/koalas2.jpg","referrer":"Direct","referrer_host":"Direct","server_ip":"172.31.57.89","screen":"1680x1050","window":"1680x939","session_length":76261,"timezone":"N/A","timezone_offset":"180"}',
},
{
'kafka.timestamp': 1710962988518,
'kafka.topic': 'kttm2',
'raw':
'{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile Browser","category":"Smartphone","browser":"Chrome Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"https://koalastothemax.com/","loaded_image":"https://koalastothemax.com/img/koalas1.jpg","referrer":"https://www.google.com/","referrer_host":"www.google.com","server_ip":"172.31.11.5","screen":"320x570","window":"540x743","session_length":252689,"timezone":"CDT","timezone_offset":"300"}',
},
];

it('works when single topic', () => {
expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' });
});

it('works when multi-topic', () => {
expect(guessKafkaInputFormat(sample, true)).toEqual({
type: 'kafka',
valueFormat: {
type: 'json',
},
});
});
});
});

describe('spec utils', () => {
Expand Down
23 changes: 19 additions & 4 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded(
spec,
'spec.ioConfig.inputFormat',
getSpecType(spec) === 'kafka'
? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
? guessKafkaInputFormat(
filterMap(sampleResponse.data, l => l.input),
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
)
: guessSimpleInputFormat(
filterMap(sampleResponse.data, l => l.input?.raw),
isStreamingSpec(spec),
Expand All @@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean {
return xs.every(x => isNaN(Number(x)));
}

export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): InputFormat {
export function guessKafkaInputFormat(
sampleRaw: Record<string, any>[],
multiTopic: boolean,
): InputFormat {
const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.')));
const keys = filterMap(sampleRaw, x => x['kafka.key']);
const payloads = filterMap(sampleRaw, x => x.raw);
const valueFormat = guessSimpleInputFormat(
filterMap(sampleRaw, x => x.raw),
true,
);

if (!hasHeader && !keys.length && !multiTopic) {
// No headers or keys and just a single topic means do not pick the 'kafka' format by default as it is less performant
return valueFormat;
}

return {
type: 'kafka',
headerFormat: hasHeader ? { type: 'string' } : undefined,
keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
valueFormat: guessSimpleInputFormat(payloads, true),
valueFormat,
};
}

Expand Down

0 comments on commit 06268bf

Please sign in to comment.