Skip to content

Commit

Permalink
Implement multiple reader per table for MySQL.
Browse files Browse the repository at this point in the history
Experiment with the idea of splitting the read work in several concurrent
threads, where each reader is reading portions of the target table, using a
WHERE id <= x and id > y clause in its SELECT query.

For this to kick-in a number of conditions needs to be met, as described in
the documentation. The main interest might not be faster queries to overall
fetch the same data set, but better concurrency with as many readers as
writters and each couple its own dedicated queue.
  • Loading branch information
dimitri committed Jun 28, 2017
1 parent 6d66280 commit 0549e74
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 134 deletions.
31 changes: 28 additions & 3 deletions pgloader.1
Expand Up @@ -1774,7 +1774,7 @@ This command instructs pgloader to load data from a database connection\. The on
A default set of casting rules are provided and might be overloaded and appended to by the command\.
.
.P
Here\'s an example:
Here\'s an example using as many options as possible, some of them even being defaults\. Chances are you don\'t need that complex a setup, don\'t copy and paste it, use it only as a reference!
.
.IP "" 4
.
Expand All @@ -1785,7 +1785,8 @@ LOAD DATABASE
INTO postgresql://localhost:54393/sakila

WITH include drop, create tables, create indexes, reset sequences,
workers = 8, concurrency = 1
workers = 8, concurrency = 1,
multiple readers per thread, rows per range = 50000

SET PostgreSQL PARAMETERS
maintenance_work_mem to \'128MB\',
Expand All @@ -1796,7 +1797,7 @@ LOAD DATABASE
net_read_timeout = \'120\',
net_write_timeout = \'120\'

CAST type datetime to timestamptz drop default drop not null using zero\-dates\-to\-null,
CAST type bigint when (= precision 20) to bigserial drop typemod,
type date drop not null drop default using zero\-dates\-to\-null,
\-\- type tinyint to boolean using tinyint\-to\-boolean,
type year to integer
Expand Down Expand Up @@ -1985,6 +1986,30 @@ When this option is listed pgloader refrains from migrating the data over\. Note
.IP
When this option is listed pgloader only issues the \fBCOPY\fR statements, without doing any other processing\.
.
.IP "\(bu" 4
\fIsingle reader per thread\fR, \fImultiple readers per thread\fR
.
.IP
The default is \fIsingle reader per thread\fR and it means that each MySQL table is read by a single thread as a whole, with a single \fBSELECT\fR statement using no \fBWHERE\fR clause\.
.
.IP
When using \fImultiple readers per thread\fR pgloader may be able to divide the reading work into several threads, as many as the \fIconcurrency\fR setting, which needs to be greater than 1 for this option to kick be activated\.
.
.IP
For each source table, pgloader searches for a primary key over a single numeric column, or a multiple\-column primary key index for which the first column is of a numeric data type (one of \fBinteger\fR or \fBbigint\fR)\. When such an index exists, pgloader runs a query to find the \fImin\fR and \fImax\fR values on this column, and then split that range into many ranges containing a maximum of \fIrows per range\fR\.
.
.IP
When the range list we then obtain contains at least as many ranges than our concurrency setting, then we distribute those ranges to each reader thread\.
.
.IP
So when all the conditions are met, pgloader then starts as many reader thread as the \fIconcurrency\fR setting, and each reader thread issues several queries with a \fBWHERE id >= x AND id < y\fR, where \fBy \- x = rows per range\fR or less (for the last range, depending on the max value just obtained\.
.
.IP "\(bu" 4
\fIrows per range\fR
.
.IP
How many rows are fetched per \fBSELECT\fR query when using \fImultiple readers per thread\fR, see above for details\.
.
.IP "" 0

.
Expand Down
42 changes: 39 additions & 3 deletions pgloader.1.md
Expand Up @@ -1508,14 +1508,17 @@ building.
A default set of casting rules are provided and might be overloaded and
appended to by the command.

Here's an example:
Here's an example using as many options as possible, some of them even being
defaults. Chances are you don't need that complex a setup, don't copy and
paste it, use it only as a reference!

LOAD DATABASE
FROM mysql://root@localhost/sakila
INTO postgresql://localhost:54393/sakila

WITH include drop, create tables, create indexes, reset sequences,
workers = 8, concurrency = 1
workers = 8, concurrency = 1,
multiple readers per thread, rows per range = 50000

SET PostgreSQL PARAMETERS
maintenance_work_mem to '128MB',
Expand All @@ -1526,7 +1529,7 @@ Here's an example:
net_read_timeout = '120',
net_write_timeout = '120'

CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null,
CAST type bigint when (= precision 20) to bigserial drop typemod,
type date drop not null drop default using zero-dates-to-null,
-- type tinyint to boolean using tinyint-to-boolean,
type year to integer
Expand Down Expand Up @@ -1723,6 +1726,39 @@ The `database` command accepts the following clauses and options:
When this option is listed pgloader only issues the `COPY`
statements, without doing any other processing.

- *single reader per thread*, *multiple readers per thread*
The default is *single reader per thread* and it means that each
MySQL table is read by a single thread as a whole, with a single
`SELECT` statement using no `WHERE` clause.
When using *multiple readers per thread* pgloader may be able to
divide the reading work into several threads, as many as the
*concurrency* setting, which needs to be greater than 1 for this
option to kick be activated.
For each source table, pgloader searches for a primary key over a
single numeric column, or a multiple-column primary key index for
which the first column is of a numeric data type (one of `integer`
or `bigint`). When such an index exists, pgloader runs a query to
find the *min* and *max* values on this column, and then split that
range into many ranges containing a maximum of *rows per range*.
When the range list we then obtain contains at least as many ranges
than our concurrency setting, then we distribute those ranges to
each reader thread.
So when all the conditions are met, pgloader then starts as many
reader thread as the *concurrency* setting, and each reader thread
issues several queries with a `WHERE id >= x AND id < y`, where `y -
x = rows per range` or less (for the last range, depending on the
max value just obtained.
- *rows per range*
How many rows are fetched per `SELECT` query when using *multiple
readers per thread*, see above for details.

- *SET MySQL PARAMETERS*

The *SET MySQL PARAMETERS* allows setting MySQL parameters using the
Expand Down
3 changes: 3 additions & 0 deletions src/package.lisp
Expand Up @@ -269,6 +269,8 @@
#:unquote
#:expand-user-homedir-pathname
#:pretty-print-bytes
#:split-range
#:distribute

;; threads
#:make-kernel
Expand Down Expand Up @@ -440,6 +442,7 @@
#:header

;; main protocol/api
#:concurrency-support
#:map-rows
#:copy-column-list
#:queue-raw-data
Expand Down
4 changes: 4 additions & 0 deletions src/params.lisp
Expand Up @@ -22,6 +22,7 @@
#:*preserve-index-names*
#:*copy-batch-rows*
#:*copy-batch-size*
#:*rows-per-range*
#:*prefetch-rows*
#:*pg-settings*
#:*mysql-settings*
Expand Down Expand Up @@ -136,6 +137,9 @@
(defparameter *prefetch-rows* 100000
"How many rows do read in advance in the reader queue.")

(defparameter *rows-per-range* 10000
"How many rows to read in each reader's thread, per SQL query.")

(defparameter *pg-settings* nil "An alist of GUC names and values.")
(defparameter *mysql-settings* nil "An alist of GUC names and values.")

Expand Down
7 changes: 7 additions & 0 deletions src/parsers/command-keywords.lisp
Expand Up @@ -130,6 +130,13 @@
(def-keyword-rule "including")
(def-keyword-rule "excluding")
(def-keyword-rule "like")
(def-keyword-rule "multiple")
(def-keyword-rule "single")
(def-keyword-rule "reader")
(def-keyword-rule "readers")
(def-keyword-rule "per")
(def-keyword-rule "thread")
(def-keyword-rule "range")
;; option for loading from an archive
(def-keyword-rule "archive")
(def-keyword-rule "before")
Expand Down
3 changes: 3 additions & 0 deletions src/parsers/command-mysql.lisp
Expand Up @@ -14,6 +14,9 @@
option-batch-size
option-prefetch-rows
option-max-parallel-create-index
option-single-reader
option-multiple-readers
option-rows-per-range
option-truncate
option-disable-triggers
option-data-only
Expand Down
22 changes: 19 additions & 3 deletions src/parsers/command-options.lisp
Expand Up @@ -82,11 +82,18 @@
(bind (((_ _ nb) prefetch-rows))
(cons :prefetch-rows (parse-integer (text nb))))))

(defrule option-rows-per-range (and kw-rows kw-per kw-range
equal-sign
(+ (digit-char-p character)))
(:lambda (rows-per-range)
(cons :rows-per-range (parse-integer (text (fifth rows-per-range))))))

(defun batch-control-bindings (options)
"Generate the code needed to add batch-control"
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
(*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*))))
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
(*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*))
(*rows-per-range* (or ,(getf options :rows-per-range) *rows-per-range*))))

(defun identifier-case-binding (options)
"Generate the code needed to bind *identifer-case* to the proper value."
Expand All @@ -97,6 +104,7 @@
(option-list '(:batch-rows
:batch-size
:prefetch-rows
:rows-per-range
:identifier-case))
extras)
"Given a list of options, remove the generic ones that should already have
Expand Down Expand Up @@ -129,6 +137,14 @@
(make-option-rule reset-sequences (and kw-reset (? kw-no) kw-sequences))
(make-option-rule foreign-keys (and (? kw-no) kw-foreign kw-keys))

(defrule option-single-reader (and kw-single kw-reader kw-per kw-thread)
(:constant (cons :multiple-readers nil)))

(defrule option-multiple-readers (and kw-multiple
(or kw-readers kw-reader)
kw-per kw-thread)
(:constant (cons :multiple-readers t)))

(defrule option-schema-only (and kw-schema kw-only)
(:constant (cons :schema-only t)))

Expand Down
5 changes: 5 additions & 0 deletions src/sources/common/api.lisp
Expand Up @@ -34,6 +34,11 @@
(setf (slot-value source 'transforms)
(make-list (length (slot-value source 'columns))))))))

(defgeneric concurrency-support (copy concurrency)
(:documentation
"Returns nil when no concurrency is supported, or a list of copy ojbects
prepared to run concurrently."))

(defgeneric map-rows (source &key process-row-fn)
(:documentation
"Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in
Expand Down
80 changes: 42 additions & 38 deletions src/sources/common/db-methods.lisp
Expand Up @@ -212,6 +212,7 @@
(on-error-stop *on-error-stop*)
(worker-count 4)
(concurrency 1)
(multiple-readers nil)
max-parallel-create-index
(truncate nil)
(disable-triggers nil)
Expand Down Expand Up @@ -271,7 +272,9 @@
max-indexes))))
(idx-channel (when idx-kernel
(let ((lp:*kernel* idx-kernel))
(lp:make-channel)))))
(lp:make-channel))))

(task-count 0))

;; apply catalog level transformations to support the database migration
;; that's CAST rules, index WHERE clause rewriting and ALTER commands
Expand Down Expand Up @@ -327,12 +330,15 @@
;; copy-from, we have concurrency tasks writing.
(progn ; when copy-data
(setf (gethash table writers-count) concurrency)
(copy-from table-source
:concurrency concurrency
:kernel copy-kernel
:channel copy-channel
:on-error-stop on-error-stop
:disable-triggers disable-triggers)))))

(incf task-count
(copy-from table-source
:concurrency concurrency
:multiple-readers multiple-readers
:kernel copy-kernel
:channel copy-channel
:on-error-stop on-error-stop
:disable-triggers disable-triggers))))))

;; now end the kernels
;; and each time a table is done, launch its indexing
Expand All @@ -341,37 +347,35 @@
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (hash-table-count writers-count)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table seconds)
(lp:receive-result copy-channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the
;; corresponding table, that's when the writers
;; count is down to zero.
;;
(decf (gethash table writers-count))
(log-message :debug "writers-counts[~a] = ~a"
(format-table-name table)
(gethash table writers-count))

(when (and create-indexes
(zerop (gethash table writers-count)))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
table
idx-kernel
idx-channel))))))
(prog1
worker-count
(lp:end-kernel :wait nil))))))
(loop :repeat task-count
:do (destructuring-bind (task table seconds)
(lp:receive-result copy-channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the
;; corresponding table, that's when the writers
;; count is down to zero.
;;
(decf (gethash table writers-count))
(log-message :debug "writers-counts[~a] = ~a"
(format-table-name table)
(gethash table writers-count))

(when (and create-indexes
(zerop (gethash table writers-count)))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
table
idx-kernel
idx-channel)))))
:finally (progn
(lp:end-kernel :wait nil)
(return worker-count))))))

(log-message :info "Done with COPYing data, waiting for indexes")

Expand Down

0 comments on commit 0549e74

Please sign in to comment.