@@ -15,12 +15,14 @@ const process = require('process');
1515
1616const argparse = require ( 'argparse' ) ;
1717const pg = require ( 'pg' ) ;
18+ const pgcopy = require ( 'pg-copy-streams' ) . from ;
19+ const csvwriter = require ( 'csv-write-stream' )
1820
1921
2022function _connect ( driverName , args , callback ) {
2123 let driver = null ;
2224
23- if ( driverName == 'pg' ) {
25+ if ( driverName == 'pg-js ' ) {
2426 driver = pg ;
2527 } else if ( driverName == 'pg-native' ) {
2628 driver = pg . native ;
@@ -111,84 +113,195 @@ function runner(args, querydata) {
111113 var run_start = _now ( ) ;
112114 var complete = 0 ;
113115 var stmt = { text : query , values : query_args } ;
116+ var copy = null ;
117+
118+ if ( query . startsWith ( 'COPY ' ) ) {
119+ var m = / C O P Y ( \w + ) \s * \( \s * ( (?: \w + ) (?: , \s * \w + ) * ) \s * \) / . exec ( query )
120+ if ( m == null ) {
121+ throw "Could not parse COPY query" ;
122+ }
123+
124+ var rowcount = query_args [ 0 ] [ "count" ] ;
125+ var row = query_args [ 0 ] [ "row" ] ;
126+ var rows = Array ( rowcount ) ;
127+ for ( var i = 0 ; i < rowcount ; i += 1 ) {
128+ rows [ i ] = row ;
129+ }
130+
131+ copy = {
132+ "table" : m [ 1 ] ,
133+ "columns" : m [ 2 ] . split ( "," ) . map ( v => v . trim ( ) ) ,
134+ "rows" : rows
135+ } ;
136+ }
137+
138+ if ( copy != null && driver == 'pg-native' ) {
139+ cb ( { code : 3 , msg : "pg-native does not support COPY" } ) ;
140+ }
114141
115142 if ( use_prepared_stmt ) {
116143 stmt . name = '_pgbench_query' ;
117144 }
118145
119- for ( var i = 0 ; i < concurrency ; i += 1 ) {
120- _connect ( driver , args , function ( err , conn ) {
146+ var query_runner = function ( err , conn ) {
147+ if ( err ) {
148+ throw err ;
149+ }
150+
151+ var queries = 0 ;
152+ var rows = 0 ;
153+ var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
154+ var min_latency = Infinity ;
155+ var max_latency = 0.0 ;
156+ var duration_in_us = run_duration * 1000000 ;
157+ var req_start ;
158+ var req_time ;
159+
160+ var _cb = function ( err , result ) {
121161 if ( err ) {
122162 throw err ;
123163 }
124164
125- var queries = 0 ;
126- var rows = 0 ;
127- var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
128- var min_latency = Infinity ;
129- var max_latency = 0.0 ;
130- var duration_in_us = run_duration * 1000000 ;
131- var req_start ;
132- var req_time ;
133-
134- var _cb = function ( err , result ) {
135- if ( err ) {
136- throw err ;
137- }
165+ // Request time in tens of microseconds
166+ req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
138167
139- // Request time in tens of microseconds
140- req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
168+ if ( req_time > max_latency ) {
169+ max_latency = req_time ;
170+ }
141171
142- if ( req_time > max_latency ) {
143- max_latency = req_time ;
144- }
172+ if ( req_time < min_latency ) {
173+ min_latency = req_time ;
174+ }
175+
176+ latency_stats [ req_time ] += 1 ;
177+ queries += 1 ;
178+ if ( Array . isArray ( result ) ) {
179+ result = result [ result . length - 1 ] ;
180+ }
145181
146- if ( req_time < min_latency ) {
147- min_latency = req_time ;
182+ rows += result . rows . length ;
183+
184+ if ( _now ( ) - run_start < duration_in_us ) {
185+ req_start = _now ( ) ;
186+ conn . query ( stmt , _cb ) ;
187+ } else {
188+ conn . end ( ) ;
189+ if ( report ) {
190+ _report_results ( queries , rows , latency_stats ,
191+ min_latency , max_latency ,
192+ run_start ) ;
148193 }
149194
150- latency_stats [ req_time ] += 1 ;
151- queries += 1 ;
152- rows += result . rows . length ;
153-
154- if ( _now ( ) - run_start < duration_in_us ) {
155- req_start = _now ( ) ;
156- conn . query ( stmt , _cb ) ;
157- } else {
158- conn . end ( ) ;
159- if ( report ) {
160- _report_results ( queries , rows , latency_stats ,
161- min_latency , max_latency ,
162- run_start ) ;
163- }
164-
165- complete += 1 ;
166- if ( complete == concurrency && cb ) {
167- cb ( ) ;
168- }
195+ complete += 1 ;
196+ if ( complete == concurrency && cb ) {
197+ cb ( ) ;
169198 }
170- } ;
199+ }
200+ } ;
201+
202+ req_start = _now ( ) ;
203+ conn . query ( stmt , _cb ) ;
204+ } ;
205+
206+ var copy_runner = function ( err , conn ) {
207+ if ( err ) {
208+ throw err ;
209+ }
210+
211+ var queries = 0 ;
212+ var rows = 0 ;
213+ var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
214+ var min_latency = Infinity ;
215+ var max_latency = 0.0 ;
216+ var duration_in_us = run_duration * 1000000 ;
217+ var req_start ;
218+ var req_time ;
171219
220+ var _start_copy = function ( _cb ) {
172221 req_start = _now ( ) ;
173- conn . query ( stmt , _cb ) ;
174- } ) ;
222+ var csvstream = csvwriter ( {
223+ sendHeaders : false ,
224+ separator : '\t' ,
225+ headers : copy . columns
226+ } ) ;
227+ var copystream = conn . query ( pgcopy ( stmt . text ) ) ;
228+ csvstream . pipe ( copystream ) ;
229+ copystream . on ( 'end' , _cb ) ;
230+ copystream . on ( 'error' , _cb ) ;
231+ for ( var row of copy . rows ) {
232+ csvstream . write ( row ) ;
233+ }
234+ csvstream . end ( ) ;
235+ }
236+
237+ var _cb = function ( err , result ) {
238+ if ( err ) {
239+ throw err ;
240+ }
241+
242+ // Request time in tens of microseconds
243+ req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
244+
245+ if ( req_time > max_latency ) {
246+ max_latency = req_time ;
247+ }
248+
249+ if ( req_time < min_latency ) {
250+ min_latency = req_time ;
251+ }
252+
253+ latency_stats [ req_time ] += 1 ;
254+ queries += 1 ;
255+ rows += copy . rows . length ;
256+
257+ if ( _now ( ) - run_start < duration_in_us ) {
258+ _start_copy ( _cb ) ;
259+ } else {
260+ conn . end ( ) ;
261+ if ( report ) {
262+ _report_results ( queries , rows , latency_stats ,
263+ min_latency , max_latency ,
264+ run_start ) ;
265+ }
266+
267+ complete += 1 ;
268+ if ( complete == concurrency && cb ) {
269+ cb ( ) ;
270+ }
271+ }
272+ } ;
273+
274+ _start_copy ( _cb ) ;
275+ } ;
276+
277+ var runner = copy != null ? copy_runner : query_runner ;
278+
279+ for ( var i = 0 ; i < concurrency ; i += 1 ) {
280+ _connect ( driver , args , runner ) ;
175281 }
176282 }
177283
178284 function _setup ( cb ) {
179285 if ( setup_query ) {
180286 // pg-native does not like multiple statements in queries
181- _do_run ( 'pg' , setup_query , [ ] , 1 , 0 , false , false , cb ) ;
287+ _do_run ( 'pg-js ' , setup_query , [ ] , 1 , 0 , false , false , cb ) ;
182288 } else {
183289 if ( cb ) {
184290 cb ( ) ;
185291 }
186292 }
187293 }
188294
295+ function _exit ( err ) {
296+ if ( err ) {
297+ console . error ( err . msg ) ;
298+ process . exit ( err . code ) ;
299+ }
300+ }
301+
189302 function _teardown ( cb ) {
190303 if ( teardown_query ) {
191- _do_run ( 'pg' , teardown_query , [ ] , 1 , 0 , false , false , cb ) ;
304+ _do_run ( 'pg-js ' , teardown_query , [ ] , 1 , 0 , false , false , cb ) ;
192305 } else {
193306 if ( cb ) {
194307 cb ( ) ;
@@ -198,7 +311,7 @@ function runner(args, querydata) {
198311
199312 function _run ( ) {
200313 _do_run ( args . driver , query , query_args , args . concurrency , duration ,
201- true , true , _teardown ) ;
314+ true , true , ( err ) => _teardown ( ( ) => _exit ( err ) ) ) ;
202315 }
203316
204317 function _warmup_and_run ( ) {
@@ -258,7 +371,7 @@ function main() {
258371 parser . addArgument (
259372 'driver' ,
260373 { type : String , help : 'driver implementation to use' ,
261- choices : [ 'pg' , 'pg-native' ] } )
374+ choices : [ 'pg-js ' , 'pg-native' ] } )
262375 parser . addArgument (
263376 'queryfile' ,
264377 { type : String ,
0 commit comments