Skip to content

Commit

Permalink
fix: Errors during web socket subscribe returned with status 200 code
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Dec 7, 2019
1 parent c42a971 commit 6df008e
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions packages/cubejs-api-gateway/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,25 @@ class ApiGateway {
return new SubscriptionServer(this, sendMessage, this.subscriptionStore);
}

duration(requestStarted) {
return new Date().getTime() - requestStarted.getTime();
}

async meta({ context, res }) {
const requestStarted = new Date();
try {
const metaConfig = await this.getCompilerApi(context).metaConfig();
const cubes = metaConfig.map(c => c.config);
res({ cubes });
} catch (e) {
this.handleError({
e, context, res
e, context, res, requestStarted
});
}
}

async sql({ query, context, res }) {
const requestStarted = new Date();
try {
query = this.parseQueryParam(query);
const normalizedQuery = await this.queryTransformer(normalizeQuery(query), context);
Expand All @@ -282,12 +288,13 @@ class ApiGateway {
});
} catch (e) {
this.handleError({
e, context, query, res
e, context, query, res, requestStarted
});
}
}

async load({ query, context, res }) {
const requestStarted = new Date();
try {
query = this.parseQueryParam(query);
this.log(context, {
Expand All @@ -300,6 +307,11 @@ class ApiGateway {
this.getCompilerApi(context).metaConfig()
]);
const sqlQuery = compilerSqlResult;
this.log(context, {
type: 'Load Request SQL',
query,
sqlQuery
});
const annotation = prepareAnnotation(metaConfigResult, normalizedQuery);
const aliasToMemberNameMap = sqlQuery.aliasNameToMember;
const toExecute = {
Expand All @@ -315,6 +327,7 @@ class ApiGateway {
this.log(context, {
type: 'Load Request Success',
query,
duration: this.duration(requestStarted)
});
const flattenAnnotation = {
...annotation.measures,
Expand All @@ -333,14 +346,15 @@ class ApiGateway {
});
} catch (e) {
this.handleError({
e, context, query, res
e, context, query, res, requestStarted
});
}
}

async subscribe({
query, context, res, subscribe, subscriptionState
}) {
const requestStarted = new Date();
try {
this.log(context, {
type: 'Subscribe',
Expand All @@ -358,24 +372,24 @@ class ApiGateway {
await this.load({
query,
context,
res: (message) => {
res: (message, opts) => {
if (message.error) {
error = message;
error = { message, opts };
} else {
result = message;
result = { message, opts };
}
}
});
const state = await subscriptionState();
if (result && (!state || JSON.stringify(state.result) !== JSON.stringify(result))) {
res(result);
res(result.message, result.opts);
} else if (error) {
res(error);
res(error.message, error.opts);
}
await subscribe({ error, result });
} catch (e) {
this.handleError({
e, context, query, res
e, context, query, res, requestStarted
});
}
}
Expand Down Expand Up @@ -413,34 +427,38 @@ class ApiGateway {
}

handleError({
e, context, query, res
e, context, query, res, requestStarted
}) {
if (e instanceof UserError) {
this.log(context, {
type: 'User Error',
query,
error: e.message
error: e.message,
duration: this.duration(requestStarted)
});
res({ error: e.message }, { status: 400 });
} else if (e.error === 'Continue wait') {
this.log(context, {
type: 'Continue wait',
query,
error: e.message
error: e.message,
duration: this.duration(requestStarted)
});
res(e, { status: 200 });
} else if (e.error) {
this.log(context, {
type: 'Orchestrator error',
query,
error: e.error
error: e.error,
duration: this.duration(requestStarted)
});
res(e, { status: 400 });
} else {
this.log(context, {
type: 'Internal Server Error',
query,
error: e.stack || e.toString()
error: e.stack || e.toString(),
duration: this.duration(requestStarted)
});
res({ error: e.toString() }, { status: 500 });
}
Expand Down

0 comments on commit 6df008e

Please sign in to comment.