From dfbb103cacb0c53b283b5fb9e1b9520440561018 Mon Sep 17 00:00:00 2001 From: max-ostapenko <1611259+max-ostapenko@users.noreply.github.com> Date: Thu, 26 Mar 2026 09:10:38 +0000 Subject: [PATCH] perf: Stream BigQuery results to Cloud Storage to prevent OOM errors Refactored the BigQuery to Google Cloud Storage export process to use streams instead of loading the entire result set into a massive memory array. This resolves potential Out-Of-Memory (OOM) errors in Cloud Run and significantly improves overall memory efficiency for large exports. - Updated `infra/dataform-service/src/index.js` to utilize `bigquery.queryResultsStream()`. - Refactored `StorageUpload.exportToJson` in `infra/dataform-service/src/storage.js` to accept a stream. - Implemented a custom `Transform` stream to efficiently format object chunks into a proper JSON array structure while buffering in batches of 1000 for high performance. - Removed unused memory-bound `Readable` initialization from `StorageUpload` constructor. --- infra/dataform-service/src/index.js | 4 +- infra/dataform-service/src/storage.js | 65 ++++++++++++++++++++++----- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/infra/dataform-service/src/index.js b/infra/dataform-service/src/index.js index ae4e6754..566b860f 100644 --- a/infra/dataform-service/src/index.js +++ b/infra/dataform-service/src/index.js @@ -95,9 +95,9 @@ async function handleExport (req, res) { console.info('Cloud Storage export') console.log(query, config) - const data = await bigquery.queryResults(query) + const stream = await bigquery.queryResultsStream(query) const storage = new StorageUpload(config.bucket) - await storage.exportToJson(data, config.name) + await storage.exportToJson(stream, config.name) } else if (destination === 'firestore') { console.info('Firestore export') const jobName = `projects/${projectId}/locations/${location}/jobs/${jobId}` diff --git a/infra/dataform-service/src/storage.js b/infra/dataform-service/src/storage.js index a9d1e8ff..116a04ee 100644 --- a/infra/dataform-service/src/storage.js +++ b/infra/dataform-service/src/storage.js @@ -1,5 +1,5 @@ import { Storage } from '@google-cloud/storage' -import { Readable } from 'stream' +import { Transform } from 'stream' import zlib from 'zlib' const storage = new Storage() @@ -7,24 +7,69 @@ const storage = new Storage() export class StorageUpload { constructor (bucket) { this.bucket = bucket - this.stream = new Readable({ - objectMode: true, - read () {} - }) } - async exportToJson (data, fileName) { + async exportToJson (stream, fileName) { const bucket = storage.bucket(this.bucket) const file = bucket.file(fileName) - const jsonData = JSON.stringify(data) - this.stream.push(jsonData) - this.stream.push(null) + let first = true + let batch = [] + const BATCH_SIZE = 1000 + + const jsonTransform = new Transform({ + writableObjectMode: true, + transform (chunk, encoding, callback) { + batch.push(chunk) + if (batch.length >= BATCH_SIZE) { + let str = '' + if (first) { + str = '[\n ' + JSON.stringify(batch[0]) + for (let i = 1; i < batch.length; i++) { + str += ',\n ' + JSON.stringify(batch[i]) + } + first = false + } else { + for (let i = 0; i < batch.length; i++) { + str += ',\n ' + JSON.stringify(batch[i]) + } + } + batch = [] + callback(null, str) + } else { + callback() + } + }, + flush (callback) { + let str = '' + if (batch.length > 0) { + if (first) { + str = '[\n ' + JSON.stringify(batch[0]) + for (let i = 1; i < batch.length; i++) { + str += ',\n ' + JSON.stringify(batch[i]) + } + first = false + } else { + for (let i = 0; i < batch.length; i++) { + str += ',\n ' + JSON.stringify(batch[i]) + } + } + } + + if (first) { + str += '[]' + } else { + str += '\n]' + } + callback(null, str) + } + }) const gzip = zlib.createGzip() await new Promise((resolve, reject) => { - this.stream + stream + .pipe(jsonTransform) .pipe(gzip) .pipe(file.createWriteStream({ metadata: {