From 6d4613eb379e22ad53e05a7aadf98c7fdffaa7dc Mon Sep 17 00:00:00 2001 From: dhaneesh Date: Wed, 24 Oct 2018 19:42:32 +0530 Subject: [PATCH 1/4] streaming support for CSV and geojsonl streaming support for geojsonl and csv upload. Upload supportted with 10 simulaneous upload chunks --- bin/here-xyz.ts | 277 +++++++++++++++++++++++++++---------------- bin/transformutil.ts | 46 +++++++ package-lock.json | 95 +++++++++++++++ package.json | 2 + 4 files changed, 315 insertions(+), 105 deletions(-) diff --git a/bin/here-xyz.ts b/bin/here-xyz.ts index 2e8bd53..ba59cef 100644 --- a/bin/here-xyz.ts +++ b/bin/here-xyz.ts @@ -34,6 +34,7 @@ import * as transform from "./transformutil"; import * as fs from "fs"; import * as tmp from "tmp"; import * as summary from "./summary"; +let cq = require("block-queue"); const request = require("request"); let choiceList: { name: string, value: string}[] = []; @@ -566,10 +567,39 @@ program "option to enforce uniqueness to the id by creating a hash of feature and use that as id" ) .option("-o, --override", "override the data even if it share same id") + .option("-s, --stream", "streaming data support for large file uploads") .action(function (id, options) { uploadToXyzSpace(id, options); }); +function collate(result:Array){ + return result.reduce((features: any, feature: any) => { + if (feature.type === "Feature") { + features.push(feature); + } else if (feature.type === "FeatureCollection") { + features = features.concat(feature.features); + } else { + console.log("Unknown type" + feature.type); + } + return features + }, []); +} + +function streamingQueue(){ + let queue = cq(10,function (task:any,done:Function) { + uploadData(task.id, task.options, task.tags, task.fc, + true, task.options.ptag, task.options.file, task.options.id) + .then(x=>{ + queue.uploadCount += task.fc.features.length; + console.log("uploaded feature count :"+queue.uploadCount); + done(); + }); + }); + queue.uploadCount=0; + return queue; +} + + function uploadToXyzSpace(id: string, options: any){ (async () => { let tags = ""; @@ -590,22 +620,27 @@ function uploadToXyzSpace(id: string, options: any){ options.unique = true; } + if(options.assign && options.stream){ + console.log( + "conflicting options together. You cannot choose assign mode while selecting streaming option" + ); + process.exit(1); + } + if (options.file) { const fs = require("fs"); if (options.file.indexOf(".geojsonl") != -1) { - transform.readLineFromFile(options.file, 100).then((result: any) => { - const totalFeatures = result.reduce((features: any, feature: any) => { - if (feature.type === "Feature") { - features.push(feature); - } else if (feature.type === "FeatureCollection") { - features = features.concat(feature.features); - } else { - console.log("Unknown type" + feature.type); - } - return features - }, []); - uploadData(id, options, tags, { type: "FeatureCollection", features: totalFeatures }, true, options.ptag, options.file, options.id); - }); + if(!options.stream){ + transform.readLineFromFile(options.file, 100).then((result: any) => { + uploadData(id, options, tags, { type: "FeatureCollection", features: collate(result) }, true, options.ptag, options.file, options.id); + }); + }else{ + let queue = streamingQueue(); + transform.readLineAsChunks(options.file, options.chunk?options.chunk:1000,function(result:any){ + if(result.length>0) + queue.push({id:id,options:options,tags:tags,fc:{ type: "FeatureCollection", features: collate(result) }}); + }); + } } else if (options.file.indexOf(".shp") != -1) { let result = await transform.readShapeFile( options.file, @@ -621,29 +656,48 @@ function uploadToXyzSpace(id: string, options: any){ options.id ); } else if (options.file.indexOf(".csv") != -1) { - let result = await transform.read( - options.file, - true - ); - const object = { - features: transform.transform( - result, - options.lat, - options.lon, - options.alt - ), - type: "FeatureCollection" - }; - uploadData( - id, - options, - tags, - object, - true, - options.ptag, + if(!options.stream){ + let result = await transform.read( options.file, - options.id - ); + true + ); + const object = { + features: transform.transform( + result, + options.lat, + options.lon, + options.alt + ), + type: "FeatureCollection" + }; + uploadData( + id, + options, + tags, + object, + true, + options.ptag, + options.file, + options.id + ); + }else{ + let queue = streamingQueue(); + transform.readCSVAsChunks(options.file, options.chunk?options.chunk:1000,function(result:any){ + if(result.length>0){ + const fc = { + features: transform.transform( + result, + options.lat, + options.lon, + options.alt + ), + type: "FeatureCollection" + }; + + queue.push({id:id,options:options,tags:tags,fc:fc}); + } + }); + } } else { let result = await transform.read( options.file, @@ -718,24 +772,39 @@ function uploadData( fileName: string | null, uid: string ) { - if (object.type == "Feature") { - object = { features: [object], type: "FeatureCollection" }; - } - if (options.assign) { - //console.log("assign mode on"); - const questions = createQuestionsList(object); - inquirer.prompt(questions).then((answers: any) => { - if (options.ptag === undefined) { - options.ptag = ""; - } - options.ptag = options.ptag + answers.tagChoices; - if (options.id === undefined) { - options.id = ""; - } - options.id = options.id + answers.idChoice; - //console.log(options.ptag); - //console.log("unique key - " + options.id); - //Need to be inside if, else this will be executed before user choice is inserted as its async + return new Promise((resolve, reject) => { + + if (object.type == "Feature") { + object = { features: [object], type: "FeatureCollection" }; + } + if (options.assign) { + //console.log("assign mode on"); + const questions = createQuestionsList(object); + inquirer.prompt(questions).then((answers: any) => { + if (options.ptag === undefined) { + options.ptag = ""; + } + options.ptag = options.ptag + answers.tagChoices; + if (options.id === undefined) { + options.id = ""; + } + options.id = options.id + answers.idChoice; + //console.log(options.ptag); + //console.log("unique key - " + options.id); + //Need to be inside if, else this will be executed before user choice is inserted as its async + uploadDataToSpaceWithTags( + id, + options, + tags, + object, + false, + options.ptag, + fileName, + options.id + ).then(x=>resolve(x)); + + }); + } else { uploadDataToSpaceWithTags( id, options, @@ -745,23 +814,14 @@ function uploadData( options.ptag, fileName, options.id - ); - }); - } else { - uploadDataToSpaceWithTags( - id, - options, - tags, - object, - false, - options.ptag, - fileName, - options.id - ); - } + ).then(x=>resolve(x)); + } + + }); + } -function uploadDataToSpaceWithTags( +async function uploadDataToSpaceWithTags( id: string, options: any, tags: any, @@ -771,46 +831,53 @@ function uploadDataToSpaceWithTags( fileName: string | null, uid: string ) { - const gsv = require("geojson-validation"); - gsv.valid(object, async function (valid: boolean, errs: any) { - if (!valid) { - console.log(errs); - return; - } - const featureOut = await mergeAllTags( - object.features, - tags, - tagProperties, - fileName, - uid, - options - ); - - const chunks = options.chunk - ? chunkify(featureOut, parseInt(options.chunk)) - : [featureOut]; - const chunkSize = chunks.length; - const index = 0; - await iterateChunks( - chunks, - "/hub/spaces/" + id + "/features", - index, - chunkSize, - ); - if (isFile) - console.log( - "'" + - options.file + - "' uploaded to xyzspace '" + - id + - "' successfully" + return new Promise((resolve, reject) => { + const gsv = require("geojson-validation"); + gsv.valid(object, async function (valid: boolean, errs: any) { + if (!valid) { + console.log(errs); + reject(errs); + return; + } + const featureOut = await mergeAllTags( + object.features, + tags, + tagProperties, + fileName, + uid, + options ); - else - console.log( - "data upload to xyzspace '" + id + "' completed successfully" + + const chunks = options.chunk + ? chunkify(featureOut, parseInt(options.chunk)) + : [featureOut]; + const chunkSize = chunks.length; + const index = 0; + await iterateChunks( + chunks, + "/hub/spaces/" + id + "/features", + index, + chunkSize, ); + if(!options.stream){ + if (isFile) + console.log( + "'" + + options.file + + "' uploaded to xyzspace '" + + id + + "' successfully" + ); + else + console.log( + "data upload to xyzspace '" + id + "' completed successfully" + ); - summary.summarize(featureOut, id, true); + summary.summarize(featureOut, id, true); + + } + resolve(true); + }); }); } diff --git a/bin/transformutil.ts b/bin/transformutil.ts index 1ceb86e..67e3fbe 100644 --- a/bin/transformutil.ts +++ b/bin/transformutil.ts @@ -30,6 +30,7 @@ import * as tmp from "tmp"; import * as request from "request"; import * as readline from "readline"; import { requestAsync } from "./requestAsync"; +import { deprecate } from "util"; const latArray = ["y", "ycoord", "ycoordinate", "coordy", "coordinatey", "latitude", "lat"]; const lonArray = ["x", "xcoord", "xcoordinate", "coordx", "coordinatex", "longitude", "lon"]; @@ -227,3 +228,48 @@ export function readLineFromFile(incomingPath: string, chunckSize = 100) { }); } + +export function readLineAsChunks(incomingPath: string, chunckSize:number,streamFuntion:Function) { + + return readData(incomingPath, 'geojsonl').then(path => { + return new Promise((resolve, reject) => { + let dataArray = new Array(); + const instream = fs.createReadStream(path); + const outstream = new (require('stream'))(); + const rl = readline.createInterface(instream, outstream); + + rl.on('line', (line: string) => { + dataArray.push(JSON.parse(line)); + if(dataArray.length>=chunckSize){ + streamFuntion(dataArray) + dataArray=new Array(); + } + }); + rl.on("error", err => console.log(err)); + rl.on('close', () => streamFuntion(dataArray)); + }); + }); +} + + +export function readCSVAsChunks(incomingPath: string, chunckSize:number,streamFuntion:Function) { + return readData(incomingPath, 'geojsonl').then(path => { + return new Promise((resolve, reject) => { + let dataArray = new Array(); + var csv = require("fast-csv"); + var stream = fs.createReadStream(incomingPath); + csv.fromStream(stream, {headers : true}).on("data", function(data:any){ + dataArray.push(data); + if(dataArray.length>=chunckSize){ + streamFuntion(dataArray) + dataArray=new Array(); + } + }).on("end", function(){ + streamFuntion(dataArray) + }); + }); + }); +} + + + diff --git a/package-lock.json b/package-lock.json index 8f85c76..6acd494 100644 --- a/package-lock.json +++ b/package-lock.json @@ -325,6 +325,25 @@ "sprintf-js": "~1.0.2" } }, + "arguments-extended": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/arguments-extended/-/arguments-extended-0.0.3.tgz", + "integrity": "sha1-YQfkkX0OtvCk3WYyD8Fa/HLvSUY=", + "requires": { + "extended": "~0.0.3", + "is-extended": "~0.0.8" + } + }, + "array-extended": { + "version": "0.0.11", + "resolved": "https://registry.npmjs.org/array-extended/-/array-extended-0.0.11.tgz", + "integrity": "sha1-1xRK50jek8pybxIQCdv/FibRZL0=", + "requires": { + "arguments-extended": "~0.0.3", + "extended": "~0.0.3", + "is-extended": "~0.0.3" + } + }, "array-find-index": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/array-find-index/-/array-find-index-1.0.2.tgz", @@ -586,6 +605,11 @@ "tweetnacl": "^0.14.3" } }, + "block-queue": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/block-queue/-/block-queue-0.0.2.tgz", + "integrity": "sha1-HTjGN2yxmAGHOdnTFAw+BNxKzG8=" + }, "bluebird": { "version": "3.5.2", "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.5.2.tgz", @@ -1013,6 +1037,16 @@ "assert-plus": "^1.0.0" } }, + "date-extended": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/date-extended/-/date-extended-0.0.6.tgz", + "integrity": "sha1-I4AtV90b94GIE/4MMuhRqG2iZ8k=", + "requires": { + "array-extended": "~0.0.3", + "extended": "~0.0.3", + "is-extended": "~0.0.3" + } + }, "debug": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", @@ -1026,6 +1060,11 @@ "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-1.2.0.tgz", "integrity": "sha1-9lNNFRSCabIDUue+4m9QH5oZEpA=" }, + "declare.js": { + "version": "0.0.8", + "resolved": "https://registry.npmjs.org/declare.js/-/declare.js-0.0.8.tgz", + "integrity": "sha1-BHit/5VkwAT1Hfc9i8E0AZ0o3N4=" + }, "decode-uri-component": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/decode-uri-component/-/decode-uri-component-0.2.0.tgz", @@ -1232,6 +1271,22 @@ "resolved": "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz", "integrity": "sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==" }, + "extended": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/extended/-/extended-0.0.6.tgz", + "integrity": "sha1-f7i/e52uOXWG5IVwrP1kLHjlBmk=", + "requires": { + "extender": "~0.0.5" + } + }, + "extender": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/extender/-/extender-0.0.10.tgz", + "integrity": "sha1-WJwHSCvmGhRgttgfnCSqZ+jzJM0=", + "requires": { + "declare.js": "~0.0.4" + } + }, "external-editor": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/external-editor/-/external-editor-3.0.3.tgz", @@ -1262,6 +1317,17 @@ "resolved": "https://registry.npmjs.org/eyes/-/eyes-0.1.8.tgz", "integrity": "sha1-Ys8SAjTGg3hdkCNIqADvPgzCC8A=" }, + "fast-csv": { + "version": "2.4.1", + "resolved": "https://registry.npmjs.org/fast-csv/-/fast-csv-2.4.1.tgz", + "integrity": "sha1-vX3SaDkfcpNntZRFuN0K0CaIGyY=", + "requires": { + "extended": "0.0.6", + "is-extended": "0.0.10", + "object-extended": "0.0.7", + "string-extended": "0.0.8" + } + }, "fast-deep-equal": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-1.1.0.tgz", @@ -1805,6 +1871,14 @@ "resolved": "https://registry.npmjs.org/is-es2016-keyword/-/is-es2016-keyword-1.0.0.tgz", "integrity": "sha1-9uVOEQxeT40mXmnS7Q6vjPX0dxg=" }, + "is-extended": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/is-extended/-/is-extended-0.0.10.tgz", + "integrity": "sha1-JE4UDfdbscmjEG9BL/GC+1NKbWI=", + "requires": { + "extended": "~0.0.3" + } + }, "is-finite": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/is-finite/-/is-finite-1.0.2.tgz", @@ -2700,6 +2774,16 @@ "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", "integrity": "sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM=" }, + "object-extended": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/object-extended/-/object-extended-0.0.7.tgz", + "integrity": "sha1-hP0j9WsVWCrrPoiwXLVdJDLWijM=", + "requires": { + "array-extended": "~0.0.4", + "extended": "~0.0.3", + "is-extended": "~0.0.3" + } + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -3540,6 +3624,17 @@ "resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-1.1.0.tgz", "integrity": "sha1-J5siXfHVgrH1TmWt3UNS4Y+qBxM=" }, + "string-extended": { + "version": "0.0.8", + "resolved": "https://registry.npmjs.org/string-extended/-/string-extended-0.0.8.tgz", + "integrity": "sha1-dBlX3/SHsCcqee7FpE8jnubxfM0=", + "requires": { + "array-extended": "~0.0.5", + "date-extended": "~0.0.3", + "extended": "~0.0.3", + "is-extended": "~0.0.3" + } + }, "string-width": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/string-width/-/string-width-1.0.2.tgz", diff --git a/package.json b/package.json index 244998b..8856668 100644 --- a/package.json +++ b/package.json @@ -13,11 +13,13 @@ "license": "MIT", "dependencies": { "available-versions": "^0.13.3", + "block-queue": "0.0.2", "colors": "1.3.2", "commander": "^2.14.1", "console.table": "0.10.0", "crypto-js": "3.1.9-1", "csvjson": "5.1.0", + "fast-csv": "^2.4.1", "geojson-validation": "^0.2.1", "get-stdin": "6.0.0", "getmac": "1.4.3", From 86599a5037121041f95df755be012d079c15cfe6 Mon Sep 17 00:00:00 2001 From: dhaneesh Date: Wed, 24 Oct 2018 20:39:48 +0530 Subject: [PATCH 2/4] changed type from geojsonl to csv --- bin/transformutil.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/transformutil.ts b/bin/transformutil.ts index 67e3fbe..6596615 100644 --- a/bin/transformutil.ts +++ b/bin/transformutil.ts @@ -253,7 +253,7 @@ export function readLineAsChunks(incomingPath: string, chunckSize:number,streamF export function readCSVAsChunks(incomingPath: string, chunckSize:number,streamFuntion:Function) { - return readData(incomingPath, 'geojsonl').then(path => { + return readData(incomingPath, 'csv').then(path => { return new Promise((resolve, reject) => { let dataArray = new Array(); var csv = require("fast-csv"); From 59ed432709fee9b5534e908346f06e3b3bb18af7 Mon Sep 17 00:00:00 2001 From: dhaneesh Date: Sat, 27 Oct 2018 04:41:44 +0530 Subject: [PATCH 3/4] Backpressure implementation while streaming to XYZ --- bin/here-xyz.ts | 51 ++++++++++++++++++++++++++++++-------------- bin/transformutil.ts | 24 ++++++++++++--------- package-lock.json | 5 +++++ package.json | 1 + 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/bin/here-xyz.ts b/bin/here-xyz.ts index ba59cef..5c1c122 100644 --- a/bin/here-xyz.ts +++ b/bin/here-xyz.ts @@ -592,10 +592,19 @@ function streamingQueue(){ .then(x=>{ queue.uploadCount += task.fc.features.length; console.log("uploaded feature count :"+queue.uploadCount); + queue.chunksize--; done(); }); }); queue.uploadCount=0; + queue.chunksize=0; + queue.send= async function(obj:any){ + while(this.chunksize>25){ + await new Promise(done => setTimeout(done, 1000)); + } + this.push(obj); + this.chunksize++; + } return queue; } @@ -637,8 +646,14 @@ function uploadToXyzSpace(id: string, options: any){ }else{ let queue = streamingQueue(); transform.readLineAsChunks(options.file, options.chunk?options.chunk:1000,function(result:any){ - if(result.length>0) - queue.push({id:id,options:options,tags:tags,fc:{ type: "FeatureCollection", features: collate(result) }}); + return new Promise((res,rej)=>{ + ( async()=>{ + if(result.length>0){ + await queue.send({id:id,options:options,tags:tags,fc:{ type: "FeatureCollection", features: collate(result) }}); + } + res(); + })(); + }); }); } } else if (options.file.indexOf(".shp") != -1) { @@ -683,19 +698,24 @@ function uploadToXyzSpace(id: string, options: any){ }else{ let queue = streamingQueue(); transform.readCSVAsChunks(options.file, options.chunk?options.chunk:1000,function(result:any){ - if(result.length>0){ - const fc = { - features: transform.transform( - result, - options.lat, - options.lon, - options.alt - ), - type: "FeatureCollection" - }; - - queue.push({id:id,options:options,tags:tags,fc:fc}); - } + return new Promise((res,rej)=>{ + ( async()=>{ + if(result.length>0){ + const fc = { + features: transform.transform( + result, + options.lat, + options.lon, + options.alt + ), + type: "FeatureCollection" + }; + await queue.send({id:id,options:options,tags:tags,fc:fc}); + res(); + } + })(); + }); + }); } } else { @@ -832,7 +852,6 @@ async function uploadDataToSpaceWithTags( uid: string ) { return new Promise((resolve, reject) => { - const gsv = require("geojson-validation"); gsv.valid(object, async function (valid: boolean, errs: any) { if (!valid) { console.log(errs); diff --git a/bin/transformutil.ts b/bin/transformutil.ts index 6596615..fd85f3a 100644 --- a/bin/transformutil.ts +++ b/bin/transformutil.ts @@ -230,23 +230,27 @@ export function readLineFromFile(incomingPath: string, chunckSize = 100) { export function readLineAsChunks(incomingPath: string, chunckSize:number,streamFuntion:Function) { - return readData(incomingPath, 'geojsonl').then(path => { return new Promise((resolve, reject) => { let dataArray = new Array(); - const instream = fs.createReadStream(path); - const outstream = new (require('stream'))(); - const rl = readline.createInterface(instream, outstream); - - rl.on('line', (line: string) => { + var LineByLineReader = require('line-by-line'), + lr = new LineByLineReader(path); + lr.on('error', function (err:any) { + console.log(err); + throw err; + }); + lr.on('line', async function (line:any) { dataArray.push(JSON.parse(line)); if(dataArray.length>=chunckSize){ - streamFuntion(dataArray) + lr.pause(); + await streamFuntion(dataArray); + lr.resume(); dataArray=new Array(); } }); - rl.on("error", err => console.log(err)); - rl.on('close', () => streamFuntion(dataArray)); + lr.on('end', function () { + streamFuntion(dataArray) + }); }); }); } @@ -258,7 +262,7 @@ export function readCSVAsChunks(incomingPath: string, chunckSize:number,streamFu let dataArray = new Array(); var csv = require("fast-csv"); var stream = fs.createReadStream(incomingPath); - csv.fromStream(stream, {headers : true}).on("data", function(data:any){ + let csvstream = csv.fromStream(stream, {headers : true}).on("data", function(data:any){ dataArray.push(data); if(dataArray.length>=chunckSize){ streamFuntion(dataArray) diff --git a/package-lock.json b/package-lock.json index 6acd494..5947280 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2139,6 +2139,11 @@ "invert-kv": "^1.0.0" } }, + "line-by-line": { + "version": "0.1.6", + "resolved": "https://registry.npmjs.org/line-by-line/-/line-by-line-0.1.6.tgz", + "integrity": "sha512-MmwVPfOyp0lWnEZ3fBA8Ah4pMFvxO6WgWovqZNu7Y4J0TNnGcsV4S1LzECHbdgqk1hoHc2mFP1Axc37YUqwafg==" + }, "load-json-file": { "version": "2.0.0", "resolved": "http://registry.npmjs.org/load-json-file/-/load-json-file-2.0.0.tgz", diff --git a/package.json b/package.json index 8856668..c334f2f 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "getmac": "1.4.3", "inquirer": "6.0.0", "latest-version": "4.0.0", + "line-by-line": "^0.1.6", "npm-check": "^5.7.1", "open": "0.0.5", "project-version": "1.2.0", From c78186d7d902577ca00989d784972a25ebc6a083 Mon Sep 17 00:00:00 2001 From: dhaneesh Date: Sat, 27 Oct 2018 04:55:40 +0530 Subject: [PATCH 4/4] minor updates --- bin/here-xyz.ts | 1 + bin/transformutil.ts | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/bin/here-xyz.ts b/bin/here-xyz.ts index 5c1c122..e9ec48c 100644 --- a/bin/here-xyz.ts +++ b/bin/here-xyz.ts @@ -35,6 +35,7 @@ import * as fs from "fs"; import * as tmp from "tmp"; import * as summary from "./summary"; let cq = require("block-queue"); +const gsv = require("geojson-validation"); const request = require("request"); let choiceList: { name: string, value: string}[] = []; diff --git a/bin/transformutil.ts b/bin/transformutil.ts index fd85f3a..4e38bf1 100644 --- a/bin/transformutil.ts +++ b/bin/transformutil.ts @@ -264,9 +264,14 @@ export function readCSVAsChunks(incomingPath: string, chunckSize:number,streamFu var stream = fs.createReadStream(incomingPath); let csvstream = csv.fromStream(stream, {headers : true}).on("data", function(data:any){ dataArray.push(data); - if(dataArray.length>=chunckSize){ - streamFuntion(dataArray) - dataArray=new Array(); + if(dataArray.length >=chunckSize){ + //console.log('dataArray '+chunckSize); + csvstream.pause(); + (async()=>{ + await streamFuntion(dataArray); + csvstream.resume(); + dataArray=new Array(); + })(); } }).on("end", function(){ streamFuntion(dataArray)