Skip to content

Commit

Permalink
make logger/emitError optional
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed May 22, 2024
1 parent 3df967f commit 81d8f64
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ export const fetchCategories = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
logger?: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse[]> => {
const randomSamplerWrapper = createRandomSamplerWrapper({
Expand All @@ -122,14 +122,19 @@ export const fetchCategories = async (

function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}

if (emitError) {
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ export const fetchCategoryCounts = async (
categories: FetchCategoriesResponse,
from: number | undefined,
to: number | undefined,
logger: Logger,
emitError: (m: string) => void,
logger?: Logger,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse> => {
const updatedCategories = cloneDeep(categories);
Expand All @@ -101,14 +101,19 @@ export const fetchCategoryCounts = async (
);
} catch (error) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}

if (emitError) {
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
}
}
return updatedCategories;
}
Expand All @@ -118,14 +123,19 @@ export const fetchCategoryCounts = async (
updatedCategories.categories[index].count =
(resp.hits.total as estypes.SearchTotalHits).value ?? 0;
} else {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
if (logger) {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
}

if (emitError) {
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ export const fetchSignificantCategories = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
logger?: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
) => {
const categoriesOverall = await fetchCategories(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ export const fetchSignificantTermPValues = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
logger?: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<SignificantItem[]> => {
const randomSamplerWrapper = createRandomSamplerWrapper({
Expand All @@ -139,14 +139,19 @@ export const fetchSignificantTermPValues = async (

function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}

if (emitError) {
emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import { fetchSignificantTermPValues } from './fetch_significant_term_p_values';
// regarding a limit of abort signal listeners of more than 10.
const MAX_CONCURRENT_QUERIES = 5;

interface FieldCandidate {
fieldCandidate: string;
interface KeywordFieldCandidate {
keywordFieldCandidate: string;
}
const isFieldCandidate = (d: unknown): d is FieldCandidate =>
isPopulatedObject(d, ['fieldCandidate']);
const isKeywordFieldCandidate = (d: unknown): d is KeywordFieldCandidate =>
isPopulatedObject(d, ['keywordFieldCandidate']);

interface TextFieldCandidate {
textFieldCandidate: string;
}
const isTextFieldCandidate = (d: unknown): d is FieldCandidate =>
const isTextFieldCandidate = (d: unknown): d is TextFieldCandidate =>
isPopulatedObject(d, ['textFieldCandidate']);

type Candidate = FieldCandidate | TextFieldCandidate;
type QueueFieldCandidate = KeywordFieldCandidate | TextFieldCandidate;

export interface LogRateChange {
type: string;
Expand Down Expand Up @@ -118,7 +118,7 @@ export const fetchSimpleLogRateAnalysis = async (

// FIELD CANDIDATES

const fieldCandidates: string[] = [];
const keywordFieldCandidates: string[] = [];
const textFieldCandidates: string[] = [];

const indexInfoParams: AiopsLogRateAnalysisSchema = {
Expand Down Expand Up @@ -162,7 +162,7 @@ export const fetchSimpleLogRateAnalysis = async (
...analysisWindowParameters,
};

fieldCandidates.push(...indexInfo.fieldCandidates);
keywordFieldCandidates.push(...indexInfo.fieldCandidates);
textFieldCandidates.push(...indexInfo.textFieldCandidates);
const sampleProbability = getSampleProbability(
indexInfo.deviationTotalDocCount + indexInfo.baselineTotalDocCount
Expand All @@ -177,27 +177,20 @@ export const fetchSimpleLogRateAnalysis = async (
const significantTerms: SignificantItem[] = [];
const fieldsToSample = new Set<string>();

const pValuesQueue = queue(async function (payload: Candidate) {
if (isFieldCandidate(payload)) {
const { fieldCandidate } = payload;
const pValuesQueue = queue(async function (payload: QueueFieldCandidate) {
if (isKeywordFieldCandidate(payload)) {
const { keywordFieldCandidate } = payload;
let pValues: Awaited<ReturnType<typeof fetchSignificantTermPValues>> = [];

try {
pValues = await fetchSignificantTermPValues(
esClient,
params,
[fieldCandidate],
undefined,
sampleProbability,
(e) => {
console.log('fetchSignificantTermPValues ERROR', e);
},
abortSignal
);
} catch (e) {
console.log('catch fetchSignificantTermPValues ERROR', e);
return;
}
pValues = await fetchSignificantTermPValues(
esClient,
params,
[keywordFieldCandidate],
undefined,
sampleProbability,
undefined,
abortSignal
);

if (pValues.length > 0) {
pValues.forEach((d) => {
Expand All @@ -214,9 +207,7 @@ export const fetchSimpleLogRateAnalysis = async (
[textFieldCandidate],
undefined,
sampleProbability,
(e) => {
console.log('fetchSignificantCategories ERROR', e);
},
undefined,
abortSignal
);

Expand All @@ -229,11 +220,10 @@ export const fetchSimpleLogRateAnalysis = async (
pValuesQueue.push(
[
...textFieldCandidates.map((d) => ({ textFieldCandidate: d })),
...fieldCandidates.map((d) => ({ fieldCandidate: d })),
...keywordFieldCandidates.map((d) => ({ keywordFieldCandidate: d })),
],
(err) => {
if (err) {
console.error('queue push error', err);
pValuesQueue.kill();
}
}
Expand Down

0 comments on commit 81d8f64

Please sign in to comment.