Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions infra/dataform-service/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ async function handleExport (req, res) {
console.info('Cloud Storage export')
console.log(query, config)

const data = await bigquery.queryResults(query)
const stream = await bigquery.queryResultsStream(query)
const storage = new StorageUpload(config.bucket)
await storage.exportToJson(data, config.name)
await storage.exportToJson(stream, config.name)
} else if (destination === 'firestore') {
console.info('Firestore export')
const jobName = `projects/${projectId}/locations/${location}/jobs/${jobId}`
Expand Down
65 changes: 55 additions & 10 deletions infra/dataform-service/src/storage.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,75 @@
import { Storage } from '@google-cloud/storage'
import { Readable } from 'stream'
import { Transform } from 'stream'
import zlib from 'zlib'

const storage = new Storage()

export class StorageUpload {
constructor (bucket) {
this.bucket = bucket
this.stream = new Readable({
objectMode: true,
read () {}
})
}

async exportToJson (data, fileName) {
async exportToJson (stream, fileName) {
const bucket = storage.bucket(this.bucket)
const file = bucket.file(fileName)

const jsonData = JSON.stringify(data)
this.stream.push(jsonData)
this.stream.push(null)
let first = true
let batch = []
const BATCH_SIZE = 1000

const jsonTransform = new Transform({
writableObjectMode: true,
transform (chunk, encoding, callback) {
batch.push(chunk)
if (batch.length >= BATCH_SIZE) {
let str = ''
if (first) {
str = '[\n ' + JSON.stringify(batch[0])
for (let i = 1; i < batch.length; i++) {
str += ',\n ' + JSON.stringify(batch[i])
}
first = false
} else {
for (let i = 0; i < batch.length; i++) {
str += ',\n ' + JSON.stringify(batch[i])
}
}
batch = []
callback(null, str)
} else {
callback()
}
},
flush (callback) {
let str = ''
if (batch.length > 0) {
if (first) {
str = '[\n ' + JSON.stringify(batch[0])
for (let i = 1; i < batch.length; i++) {
str += ',\n ' + JSON.stringify(batch[i])
}
first = false
} else {
for (let i = 0; i < batch.length; i++) {
str += ',\n ' + JSON.stringify(batch[i])
}
}
}

if (first) {
str += '[]'
} else {
str += '\n]'
}
callback(null, str)
}
})

const gzip = zlib.createGzip()

await new Promise((resolve, reject) => {
this.stream
stream
.pipe(jsonTransform)
.pipe(gzip)
.pipe(file.createWriteStream({
metadata: {
Expand Down
Loading