- Eventual Consistency: Make data reach destination
- Order Problem: Make data reach in same order as it is
- update
item(id1)
setfield1
to1
; then, updateitem(id1)
setfield1
to2
;update t set name='a' where id = 1; update t set name='b' where id = 1;
- insert
item(id1)
withfield1
as1
; deleteitem(id1)
;insert into t values (1); delete from t where id = 1;
- update
- Master slave replication protocol: If network has problem, MySQL master will re-send lost packet
WAL
: Consumer module adoptswrite ahead log
, write what receive then try to process & send- Checkpoint: Consumer module remember where we leave, try to not miss data if syncer shutdown in accident
- Retry: If output channel fail to send to output target, retry until success or write to failure log
- Failure Log: If retry exceed configured num, write item to failure log for human recheck
- Event Scheduler: to solve Order Problem between events which has unchanged primary key
mod
:mod
integral primary key to make same row change always handled in order;hash
: hash primary key of data row, thenmod
hash value to schedule -- default value now;direct
:- If your data source has only insert operation, you can choose this scheduler, which is faster;
- No order promise for data source with insert/update/delete, higher output rate if you can endure some inconsistency;
If you are changing the id
of event, it always means you are doing joining like I do, which
- may fail consistency promise because the order between events may not scheduled as it should be;
- may cause dup item because Syncer only make sure
exactly once semantic
;
The business database query request is delayed as little as possible.
-
Support listening to both MySQL & MongoDB & DRDS of Aliyun (https://www.aliyun.com/product/drds)
-
If fail to connect to input data source, will abort
-
MySQL master source filter:
- Schema filter (naming as
repos
), support regex - Table name filter
- Interested column filter
- In a
UPDATE
, all interested column will be received even no change (different fromMongoDB
) - automatic primary key detection and set into
id
- If a table match multiple schema & table (because the usage of regex), an error message will be logged and syncer will use any one that match filter column
- If an event go through column filter, and only primary key is left:
- If event type is
UPDATE
, then discard this event -- because not support update id now; - Other event type, keep it.
- If event type is
- Support reading from binlog file to do data recovering in case of loss of data (
input.masters[x].file
) - Support specify binlog file/position to start reading (
input.masters[x].connection.syncMeta[]
)
- Schema filter (naming as
-
MongoDB master source filter:
- Version: 3.x
- Database filter (naming as
repos
), support regex - Collection name filter
- In a
UPDATE
, only changed column will be received (different fromMySQL
) - automatic
_id
detection and set intoid
- If an event match multiple schema & table, we will use the first specific match to filter/output,
i.e. the specific
repo
config will override the regexrepo
config - If an event go through column filter, and only primary key is left:
- If event type is
UPDATE
, then discard this event -- because not support update id now; - Other event type, keep it.
- If event type is
- If config user/password for auth, it should have permission of
[listDatabases, find]
- Only support listening first level field (Because MongoDB store json, it may have multiple levels)
-
DRDS:
- Same config as MySQL, but need to connect directly to RDS's MySQL because DRDS not support binlog dump
- Remember to fetch partition key in
fields
-
Remember where we leave last time by writing file/position of binlog/oplog, and resume from there so as to avoid any data loss
- More than once (at-least-once): we can ensure the at least once semantics now, so you need to make sure your output channel (the
consumer
of syncer output) is idempotent and your destination can handle it without dup. Counterexample: a table without primary key definitely can't handle it and cause duplicate data soon or later.
- More than once (at-least-once): we can ensure the at least once semantics now, so you need to make sure your output channel (the
-
Multiple consumer can share a common connection to same data source, i.e. MySQL/MongoDB, to reduce the burden of remote master
-
Automatically skip synced item for consumers according to register info
After data items come out from Input
module, it is converted to SyncData
(s) -- the abstraction of
a single data change. In other words, a single binlog item may contain multiple line change and convert
to multiple SyncData
s.
Manipulate SyncData
via (for more details, see input part of Consumer Pipeline Config):
method
: write a java method to handleSyncData
- Global variable:
logger
to do logging
- Already imported (May add more in future):
java.util.*
org.slf4j.Logger
com.github.zzt93.syncer.data.*
com.github.zzt93.syncer.data.util.*
- Use full class name if you need other class, like
java.util.function.Function
- Global variable:
if
switcher
foreach
- all public method in
SyncData
:addField(String key, Object value)
renameField(String oldKey, String newKey)
removeField(String key)
removeFields(String... keys)
containField(String key)
updateField(String key, Object value)
- ...
syncByQuery()
extraQuery(String schemaName, String tableName)
- all data field in
SyncData
:repo
entity
id
fields
extra
-
If output channel meet too many failure/error (exceeds
countLimit
), it will abort and change health tored
-
If fail to connect to output channel, will retry every 2**n seconds
-
Elasticsearch
- Version: 5.x
- Bulk operation
- Update/Delete documents by
UpdateByQuery
orDeleteByQuery
- Join/merge documents from different source when push to ES1
- ExtraQuery: do extra query to fetch extra needed info
- Support multiple extra dependent query via special mark
$var$
- Support multiple extra dependent query via special mark
- One to many relationship (parent-child relationship in ES)for document in different index
- Self referential relationship handle
- ExtraQuery: do extra query to fetch extra needed info
- Add
upsert
support, fixDocumentMissingException
useupsert
, can be used in following two scenarios- Init load for data, by creating index manually and update synced field to ES (only support
MySQL
input) - Fix some un-expected config/sync error
- Init load for data, by creating index manually and update synced field to ES (only support
- No need code for search data preparation except config
-
Http Endpoint (Deprecated, only for debug or test)
- Invoke
restful
interface according to event type: insert=PUT
, update=POST
, delete=DELETE
- Will connect to a remote repeatedly, may change to websocket or rpc
- Invoke
-
MySQL
- Version: 5.5, 5.6, 5.7, 8.0
- Bulk operation
- Simple nested sql:
insert into select
- Ignore
DuplicateKeyException
, not count as failure - Low latency
-
Kafka
- Version: 0.10.0 or later
- Bulk operation
- Using
id
of data source askey
of record, making sure the orders between records - Using
SyncResult
as msgdata
- Json serializer/deserializer (see here for future opt)
- Notice: Kafka msg consumer has to handle event idempotent;
- Notice: May in disorder if error happen;
- Easy to re-consume, rebuild without affect biz db;
[1]: Be careful about this feature, it may affect your performance
-
Http Endpoints
- Port decision:
- If no port config,
Syncer
will try ports between[40000, 40010)
- If port is configured via either command line or env var
port
orport
inconfig.yml
syncer will use that port - If port is configured in multiple locations: command line, env var and config file, the precedence will be
- command line option
- env var
- file config
- If no port config,
http://ip:port/health
: reportSyncer
status dynamically;
- Port decision:
-
JMX Endpoints
- Use
jconsole
to connect toSyncer
, you can change the logging level dynamically; (Or change log level by--debug
option when start)
- Use
-
Shutdown process
- Producer starter shutdown
- Connector shutdown
- Starter service shutdown
- Consumer starter shutdown
- Output stater shutdown
- Output channel shutdown
- Batch service shutdown
- Filter-output service shutdown
- Output stater shutdown
- Producer starter shutdown
- MySQL:
- Supported version: depend on this binlog connector lib
- Not support composite primary key
- Not support update primary key
- Only support update/delete by query exact value, i.e. no support query analyzed field (
text
query when update) - Data of numeric types (tinyint, etc) always returned signed regardless of whether column definition includes "unsigned" keyword or not. You may need to convert to unsigned if necessary.
Byte.toUnsignedInt((byte)(int) fields['xx']) // or SyncUtil.unsignedByte(sync, "xx");
- data of
*text
/*blob
types always returned as a byte array (forvar*
this is true in future). You may need to convert to string if necessary.
new String(fields['xx']) // or SyncUtil.toStr(sync, "xx");
- Mongo:
- Not delete field from ES if sync to ES
- Don't update/delete use
syncer
and other way (REST api or Java api) at the same time, it may cause version conflict and fail the change - Update/Delete by query will be executed at once, i.e. will not be buffered or use batch
- Don't change the numeric suffix naming of binlog, or it will fail the voting of binlog
- MySQL config
- binlog_format: row
- binlog_row_image: full
- MongoDB config:
- (optional) update
bind_ip
to allow listens for connections from applications on configured addresses. - start with enable replication set:
mongod --replSet myapp
- Or use docker:
docker run -d --name mongodb -p 27017:27017 -v /root/mongodb-container/db:/data/db mongo:3.2 mongod --replSet chat
- init replication set in shell:
rs.initiate()
- (optional) update
input.masters[x]
type
: MySQL, Mongoconnection
:ip
,address
,port
,user
,password
,passwordFile
file
: absolute path to binlog file
input:
masters:
- connection:
address: ${HOST_ADDRESS}
port: 3306
user: xxx
password: yyy
- connection:
address: ${HOST_ADDRESS}
port: 27018
type: mongo
input.master[x]
:type
: same as producerconnection
: same as producersyncMeta
:binlogFilename
: string name of remote master's binlog file namebinlogPosition
: position you want to start listening
repos[x]
:name
: repo name, allow regexentities[x]
:name
: entity namefields
: entity fields list
scheduler
:mod
:mod
integral primary key to make same row change always handled in order;hash
: hash primary key of data row, thenmod
hash value to schedule -- default value now;direct
:- If your data source has only insert operation, you can choose this scheduler, which is faster;
- No order promise for data source with insert/update/delete, higher output rate if you can endure some inconsistency;
onlyUpdated
: whether sync notupdated
event (only forMySQL
)updated
definition:Objects.deepEquals
== true
method
(preferred: more powerful and easier to wirte) : write a java class implementsMethodFilter
to handleSyncData
- Import dependency:
<dependency> <groupId>com.github.zzt93</groupId> <artifactId>syncer-data</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
- Write a class implement
MethodFilter
public class MenkorFilterImpl implements MethodFilter { @Override public void filter(List<SyncData> list) { SyncData data = list.get(0); if (data.getField("location") != null) { Map location = SyncUtil.fromJson((String) data.getField("location")); if (!location.isEmpty()) { data.addField("geom", SQLFunction.geomfromtext("point(" + location.get("longitude") + "," + location.get("latitude") + ")")); } } } }
- Copy method filter to config file:
filter: - method: ' public void filter(List<SyncData> list) { SyncData data = list.get(0); if (data.getField("location") != null) { Map location = SyncUtil.fromJson((String) data.getField("location")); if (!location.isEmpty()) { data.addField("geom", SQLFunction.geomfromtext("point(" + location.get("longitude") + "," + location.get("latitude") + ")")); } } }'
- Limitation:
- Not support Single Line Comments or Slash-slash Comments
The following part is implemented by Spring EL, i.e. you can use any syntax Spring EL supported even if I didn't listed.
statement
: list of String code to be executed.- e.g.
- statement: ["#type=entity", "isWrite()"]
switcher
- support
default
case - only execute one case
- e.g.
- switcher: switch: "entity" case: "file": ["#docType='plain'", "renameField('uploader_id', 'uploaderId').renameField('parent_id', 'parentId')"] "user": ["#suffix='' ", "renameField('superid', 'superId')"]
- support
foreach
: in most cases, you can use Spring EL's collection projection rather than this featureif
create
: create a new event (or a bunch) and cp value & execute post creation statementdrop
: drop this eventstatement
: same with outerstatement
switcher
: same as aboveforeach
- if: condition: "entity == 'user' && isUpdate()" ifBody: - create: copy: ["id", "entity", "#suffix", "#title", "#docType"] postCreation: ["addField('ownerTitle', #title)", "syncByQuery().filter('ownerId', id)", "id = null"] elseBody: - drop: {}
- all public method in
SyncData
:isWrite()
isUpdate()
isDelete()
toWrite()
toUpdate()
toDelete()
getField(String key)
addExtra(String key, Object value)
addField(String key, Object value)
renameField(String oldKey, String newKey)
removeField(String key)
removeFields(String... keys)
containField(String key)
updateField(String key, Object value)
syncByQuery()
: update/delete by query, supported by ES/MySQL output channelSyncByQueryES
extraQuery(String schemaName, String tableName)
: usually work withcreate
to convert one event to multiple eventsExtraQuery
: enhance syncer with multiple dependent query;
- all data field in
SyncData
:repo
: repo/db/indexentity
: entity or collectionid
: data primary key or similar thingfields
: data content of this sync event converted from log content according to yourrepo
config Notice:- if your interested column config (
fields
) has name ofprimary key
, records will have it. Otherwise, it will only inid
field;
- if your interested column config (
extra
: an extra map to store extra info
- Special expression to do output mapping:
- "fields.*": map.put('your_key',
fields
) - "fields.*.flatten": map.putAll(fields)
- "extra.*": map.put('your_key',
extra
) - "extra.*.flatten": map.putAll(
extra
)
- "fields.*": map.put('your_key',
batch
: support output change in batchsize
: flush if reach this size (ifsize
<= 0, it will be considered as buffer as large as possible)delay
: flush if every this time inMILLISECONDS
maxRetry
: max retry if met error
failureLog
: failure log configcountLimit
: failuretimeLimit
: failure log item in this time range
requestMapping
,rowMapping
: how to convertSyncData
to suitable format and send to whereelasticsearch
- When using this channel, you may prefer to not include
id
like field in interested column config (fields
), because it is always no need to include it in data field for ES and we will auto detect it and set it for you. - e.g.
elasticsearch: connection: clusterName: test-${ACTIVE_PROFILE} clusterNodes: ["${HOST_ADDRESS}:9300"] requestMapping: # mapping from input data to es request enableExtraQuery: true retryOnUpdateConflict: 3 upsert: false index: "entity + #suffix" # default: repo type: "#docType" # default: entity documentId: "id" # default: id fieldsMapping: # default: fields.*.flatten "fields": "fields.*.flatten" batch: size: 100 delay: 1000 maxRetry: 5 refreshInMillis: 1000 failureLog: countLimit: 1000
- When using this channel, you may prefer to not include
mysql
- Use
- e.g.:
mysql: connection: address: ${HOST_ADDRESS} port: 3306 user: xxx password: xxx rowMapping: schema: " 'test' " table: " 'someTable' " id: "id" rows: "fields": "fields.*.flatten" batch: size: 100 delay: 100 maxRetry: 5 failureLog:
Full and usable samples can be found under test/config/
port: 12345
ack:
flushPeriod: 100
input:
input-meta:
last-run-metadata-dir: /data/syncer/input/last_position/
filter:
worker: 3
filter-meta:
src: /data/syncer/filter/src
output:
worker: 2
batch:
worker: 2
output-meta:
failure-log-dir: /data/syncer/output/failure/
git clone https://github.com/zzt93/syncer
cd syncer/ && mvn package
# /path/to/config/: producer.yml, consumer.yml, password-file
# use `-XX:+UseParallelOldGC` if you have less memory and lower input pressure
# use `-XX:+UseG1GC` if you have at least 4g memory and event input rate larger than 2*10^4/s
java -server -XX:+UseG1GC -jar ./syncer-core/target/syncer-core-1.0-SNAPSHOT.jar [--debug] [--port=40000] [--config=/absolute/path/to/syncerConfig.yml] --producerConfig=/absolute/path/to/producer.yml --consumerConfig=/absolute/path/to/consumer1.yml,/absolute/path/to/consumer2.yml
- Docker
- vm.max_map_count may be need to be set for some os for ES docker image to run
- Docker compose
- size: 7M
- machines: 3
- databases: 3 in logic, after horizontal split is 24
- tables: 90+ for each database; listening: 5 for each database
- types: bigint, varchar, text, tinyint, timestamp, smallint, int, unsigned, longtext
- Insert/load data, count in mysql & es and compare numbers;
- Delete data, count in mysql & es and compare numbers;
- 10G & 10^8 lines
- load every 10^5 lines by
mysqlimport
- no pause between import
- load every 10^5 lines by
- Throughput
- MySQL output: 1300+ insert/s
time: 20190407-022652 src=800000 dst=9302 time: 20190407-022654 src=800000 dst=12070 time: 20190407-022656 src=800000 dst=14863 time: 20190407-022658 src=800000 dst=17536
- ES output: 10000+ insert/s
time: 20190406-083524 src=800000 dst=79441 time: 20190406-083527 src=800000 dst=130193 time: 20190406-083530 src=800000 dst=134752 time: 20190406-083533 src=800000 dst=190517
- CPU: 80-90
- Memory: 4g
- Increase batch size & flush period, increase performance in cost of higher memory usage (only for ES)
- IO
- Network
- Disk
- JVM
- Thread
- Lock contention
- Search system: search data sync
- Micro-service: auth/recommend/chat data sync
- Sync Requirement: low latency, high availability
- Join table: avoid join in production env, use space for speed by joining table
- Sync Requirement: low latency, high availability
- Kafka: sync data to kafka, for other heterogeneous system to use
- For data recovery: In case of drop entity mistakenly, or you know where to start & end
- For alter table sync:
- Elasticsearch & MySQL Sync Challenge(1)
- Elasticsearch & MySQL Sync Challenge(2): Event Driven
- Elasticsearch & MySQL Sync Challenge(3): Implementation
- Elasticsearch & MySQL Sync Challenge(4): Quality Attributes
- Elasticsearch & MySQL Sync Challenge(5): Redesign
- Load yml file into environment property source
- Bind property source into config model class
- For collection field: getter and setter will all be invoked -- this behavior depends on Spring Boot
- If only invoke getter?
- For list and map field, has different behavior:
- list:
list = getList(); list.clear(); list.add(xxx); setList(list);
- map:
map = getMap(); map.add(xxx); setMap(map);
- list:
- For now, mapping document value using
toString()
: {@link XContentBuilder#unknownValue}- java.sql.Timestamp format: 'yyyy-MM-dd HH:mm:ss.SSS'. For now, if you need other format, you have to format it to string by yourself
- Maybe change to jackson
- Replace in case sensitive
- "schemas" -> "repos"
- "tables" -> "entities"
- "rowName" -> "fields"
- "Record" -> "Field"
- "records" -> "fields"
If you have any problems with how to use Syncer
or bugs of it, write a issue.
I will handle it as soon as I can.
- Q: "Got error produce response in correlation id xxx on topic-partition xxx.xxPartition-0, splitting and retrying (5 attempts left). Error: MESSAGE_TOO_LARGE"?
- A: Adjust message
batch.size
to smaller number or configkafka
to receive large message
- A: Adjust message