Skip to content

Commit

Permalink
Updated logging for exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mservidio committed Nov 5, 2019
1 parent cc4e6e5 commit 463014e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion ingestion/bin/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ if [ -z "$FUNCTION_REGION" ]; then
exit 7
else
echo "Function region: ${FUNCTION_REGION}"
gcloud functions deploy ${FUNCTION_NAME:-processUpload} --region=${FUNCTION_REGION} --memory=256MB --source=../function --runtime=nodejs8 --entry-point=processEvent --timeout=540s --trigger-bucket="${BUCKET_NAME}"
gcloud functions deploy ${FUNCTION_NAME:-processUpload} --region=${FUNCTION_REGION} --memory=256MB --source=../function --runtime=nodejs8 --entry-point=processEvent --timeout=540s --trigger-bucket="${BUCKET_NAME}" --quiet
exit 0
fi
41 changes: 28 additions & 13 deletions ingestion/function/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const defaultTransformQuery = "*";
const acceptable = ['csv', 'gz', 'txt', 'avro', 'json'];
const stagingTableExpiryDays = 2;
const processPrefix = "bqds";
const batchIdColumnName = `${processPrefix}_batch_id`;

/**
* @param {} event
Expand All @@ -49,7 +50,7 @@ exports.processEvent = async (event, context) => {
await transform(config);
await deleteTable(config.dataset, config.stagingTable);
} catch (exception) {
console.error(`Exception processing ${event.name}: ` + JSON.stringify(exception));
console.error(`Exception processing ${event.name}: ${getExceptionString(exception)}`);
return;
}
} else {
Expand Down Expand Up @@ -126,12 +127,12 @@ async function transform(config) {
const transformQuery = await fromStorage(config.bucket,
`${processPrefix}/${config.destinationTable}.${transformFileName}`) || defaultTransformQuery;
const dataset = bigqueryClient.dataset(config.dataset);
const exists = tableExists(config.dataset, config.destinationTable);
if (!exists) {
console.log(`creating table ${config.destinationTable} with ${config.metadata.fields}`);
await dataset.createTable(config.destinationTable, { schema: config.metadata.fields });
}
const transform = `SELECT ${transformQuery}, '${batchId}' AS ${processPrefix}_batch_id FROM \`${config.dataset}.${config.stagingTable}\``;
// const exists = tableExists(config.dataset, config.destinationTable);
// if (!exists) {
// console.log(`creating table ${config.destinationTable} with ${config.metadata.fields}`);
// await dataset.createTable(config.destinationTable, { schema: config.metadata.fields });
// }
const transform = `SELECT ${transformQuery}, '${batchId}' AS ${batchIdColumnName} FROM \`${config.dataset}.${config.stagingTable}\``;
console.log(`executing transform query: ${transform}`);
const job = await runTransform(config, transform);
console.log(`${job[0].metadata.id} ${job[0].metadata.statistics.query.statementType} ${job[0].metadata.configuration.jobType} ${job[0].metadata.status.state}`);
Expand Down Expand Up @@ -197,10 +198,10 @@ async function fromStorage(bucket, file) {
.bucket(bucket)
.file(file)
.download();
console.log(`found gs://${bucket}/${file}: ${content}`);
console.log(`Found gs://${bucket}/${file}: ${content}`);
return content;
} catch (error) {
console.info(`file ${file} not found in bucket ${bucket}`);
console.info(`File ${file} not found in bucket ${bucket}: ${getExceptionString(error)}`);
return undefined;
}
}
Expand All @@ -213,8 +214,10 @@ async function fromStorage(bucket, file) {
async function tableExists(datasetId, tableName) {
const dataset = bigqueryClient.dataset(datasetId);
const table = dataset.table(tableName);
console.log(`${tableName}: ${table.exists}`);
return await table.exists();
const response = await table.exists();
const exists = response[0];
console.log(`Check if table exists: ${tableName}: ${exists}`);
return exists;
}

/**
Expand Down Expand Up @@ -265,7 +268,7 @@ async function runTransform(config, query) {
try {
return await bigqueryClient.createQueryJob(options);
} catch (exception) {
console.error("Exception encountered running transform: " + JSON.stringify(exception));
console.error(`Exception encountered running transform: ${getExceptionString(exception)}`);
logException(exception);
throw (exception);
}
Expand All @@ -278,7 +281,7 @@ function logException(exception) {
console.error('ERROR ' + (i + 1) + ": " + JSON.stringify(errors[i].message));
}
} else {
console.error("Exception thrown, but no error array was given: " + JSON.stringify(exception));
console.error(`Exception thrown, but no error array was given: ${getExceptionString(exception)}`);
}
}

Expand Down Expand Up @@ -332,6 +335,18 @@ function getDestination(fileName) {
return name;
}

/**
* @param {} exception
* Returns exception message in string format. Attempts to stringify JSON, if that's undefined, returns the exception as a string.
*/
function getExceptionString(exception) {
let str = JSON.stringify(exception);
if (!str) {
str = exception;
}
return str;
}

/**
* @param {} message
*/
Expand Down

0 comments on commit 463014e

Please sign in to comment.