Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For #3617, Add delay time information for real-time sync task progress. #3675

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,12 +19,11 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import org.apache.shardingsphere.shardingscaling.core.config.SyncConfiguration;

import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Sharding scaling out job.
Expand All @@ -35,7 +34,9 @@
@RequiredArgsConstructor
public final class ShardingScalingJob {

private final String jobId = UUID.randomUUID().toString();
private static final AtomicInteger ID_AUTO_INCREASE_GENERATOR = new AtomicInteger();

private final int jobId = ID_AUTO_INCREASE_GENERATOR.incrementAndGet();

private final List<SyncConfiguration> syncConfigurations = new LinkedList<>();

Expand Down
Expand Up @@ -34,9 +34,9 @@
*/
public class ScalingJobController {

private final ConcurrentMap<String, ShardingScalingJob> scalingJobMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, ShardingScalingJob> scalingJobMap = new ConcurrentHashMap<>();

private final ConcurrentMap<String, List<SyncTaskController>> syncTaskControllerMaps = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, List<SyncTaskController>> syncTaskControllerMaps = new ConcurrentHashMap<>();

/**
* Start data nodes migrate.
Expand All @@ -59,7 +59,7 @@ public void start(final ShardingScalingJob shardingScalingJob) {
*
* @param shardingScalingJobId sharding scaling job id
*/
public void stop(final String shardingScalingJobId) {
public void stop(final int shardingScalingJobId) {
if (!scalingJobMap.containsKey(shardingScalingJobId)) {
return;
}
Expand All @@ -74,7 +74,7 @@ public void stop(final String shardingScalingJobId) {
* @param shardingScalingJobId sharding scaling job id
* @return data nodes migrate progress
*/
public SyncProgress getProgresses(final String shardingScalingJobId) {
public SyncProgress getProgresses(final int shardingScalingJobId) {
if (!scalingJobMap.containsKey(shardingScalingJobId)) {
throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", shardingScalingJobId));
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
@RequiredArgsConstructor
public final class ScalingJobProgress implements SyncProgress {

private final String id;
private final int id;

private final String jobName;

Expand Down
Expand Up @@ -27,7 +27,7 @@
*/
@Data
@AllArgsConstructor
public class Column {
public final class Column {

private Object value;

Expand Down
Expand Up @@ -29,7 +29,7 @@
* @author avalon566
*/
@Data
public class DataRecord extends Record {
public final class DataRecord extends Record {

private String type;

Expand Down
Expand Up @@ -20,15 +20,19 @@
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPosition;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

/**
* Record interface.
*
* @author avalon566
*/
@Getter
@Setter
@RequiredArgsConstructor
public abstract class Record {

private final LogPosition logPosition;

private long commitTime;
}
Expand Up @@ -34,5 +34,7 @@ public final class RealTimeDataSyncTaskProgress implements SyncProgress {

private final String id;

private final long delayMillisecond;

private final LogPosition logPosition;
}
Expand Up @@ -55,6 +55,8 @@ public final class RealtimeDataSyncTask implements SyncTask {

private Reader reader;

private long delayMillisecond;

public RealtimeDataSyncTask(final SyncConfiguration syncConfiguration) {
this.syncConfiguration = syncConfiguration;
DataSourceMetaData dataSourceMetaData = syncConfiguration.getReaderConfiguration().getDataSourceConfiguration().getDataSourceMetaData();
Expand Down Expand Up @@ -87,7 +89,9 @@ private RealtimeSyncChannel instanceChannel(final int channelSize) {
return new RealtimeSyncChannel(channelSize, Collections.<AckCallback>singletonList(new AckCallback() {
@Override
public void onAck(final List<Record> records) {
logPositionManager.updateCurrentPosition(records.get(records.size() - 1).getLogPosition());
Record lastHandledRecord = records.get(records.size() - 1);
logPositionManager.updateCurrentPosition(lastHandledRecord.getLogPosition());
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
}
}));
}
Expand All @@ -101,6 +105,6 @@ public void stop() {

@Override
public SyncProgress getProgress() {
return new RealTimeDataSyncTaskProgress(syncTaskId, logPositionManager.getCurrentPosition());
return new RealTimeDataSyncTaskProgress(syncTaskId, delayMillisecond, logPositionManager.getCurrentPosition());
}
}
Expand Up @@ -18,8 +18,10 @@
package org.apache.shardingsphere.shardingscaling.mysql;

import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPosition;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;

/**
* Binlog Position.
Expand All @@ -28,15 +30,16 @@
* @author yangyi
*/
@Data
@RequiredArgsConstructor
@AllArgsConstructor
public class BinlogPosition implements LogPosition<BinlogPosition> {

private String serverId;

private String filename;

private long position;

private final String filename;
private final long position;
private long serverId;
@Override
public final int compareTo(final BinlogPosition binlogPosition) {
if (null == binlogPosition) {
Expand Down
Expand Up @@ -32,13 +32,13 @@
import org.apache.shardingsphere.shardingscaling.core.metadata.JdbcUri;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.MySQLConnector;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.WriteRowsEvent;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;

/**
Expand Down Expand Up @@ -101,59 +101,60 @@ public void read(final Channel channel) {
}

private void handleWriteRowsEvent(final Channel channel, final JdbcUri uri, final WriteRowsEvent event) {
WriteRowsEvent wred = event;
for (Serializable[] each : wred.getAfterColumns()) {
if (filter(uri.getDatabase(), wred.getTableName())) {
continue;
}
DataRecord record = new DataRecord(new BinlogPosition("1", wred.getFileName(), wred.getPosition()), each.length);
record.setFullTableName(wred.getTableName());
if (filter(uri.getDatabase(), event.getTableName())) {
return;
}
for (Serializable[] each : event.getAfterRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType("insert");
for (int i = 0; i < each.length; i++) {
record.addColumn(new Column(getColumnValue(record.getTableName(), i, each[i]), true));
for (Serializable eachColumnValue : each) {
record.addColumn(new Column(eachColumnValue, true));
}
pushRecord(channel, record);
}
}

private void handleUpdateRowsEvent(final Channel channel, final JdbcUri uri, final UpdateRowsEvent event) {
UpdateRowsEvent ured = event;
for (int i = 0; i < ured.getBeforeColumns().size(); i++) {
if (filter(uri.getDatabase(), event.getTableName())) {
continue;
}
Serializable[] beforeValues = ured.getBeforeColumns().get(i);
Serializable[] afterValues = ured.getAfterColumns().get(i);
DataRecord record = new DataRecord(new BinlogPosition("1", ured.getFileName(), ured.getPosition()), beforeValues.length);
record.setFullTableName(event.getTableName());
if (filter(uri.getDatabase(), event.getTableName())) {
return;
}
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Serializable[] afterValues = event.getAfterRows().get(i);
DataRecord record = createDataRecord(event, beforeValues.length);
record.setType("update");
for (int j = 0; j < beforeValues.length; j++) {
Object oldValue = getColumnValue(record.getTableName(), j, beforeValues[j]);
Object newValue = getColumnValue(record.getTableName(), j, afterValues[j]);
Object oldValue = beforeValues[j];
Object newValue = afterValues[j];
record.addColumn(new Column(newValue, !newValue.equals(oldValue)));
}
pushRecord(channel, record);
}
}

private void handleDeleteRowsEvent(final Channel channel, final JdbcUri uri, final DeleteRowsEvent event) {
DeleteRowsEvent dred = event;
for (Serializable[] each : dred.getBeforeColumns()) {
if (filter(uri.getDatabase(), dred.getTableName())) {
continue;
}
DataRecord record = new DataRecord(new BinlogPosition("1", dred.getFileName(), dred.getPosition()), each.length);
record.setFullTableName(dred.getTableName());
if (filter(uri.getDatabase(), event.getTableName())) {
return;
}
for (Serializable[] each : event.getBeforeRows()) {
DataRecord record = createDataRecord(event, each.length);
record.setType("delete");
for (int i = 0; i < each.length; i++) {
record.addColumn(new Column(getColumnValue(record.getTableName(), i, each[i]), true));
for (Serializable eachColumnValue : each) {
record.addColumn(new Column(eachColumnValue, true));
}
pushRecord(channel, record);
}
}

private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
result.setFullTableName(rowsEvent.getTableName());
result.setCommitTime(rowsEvent.getTimestamp());
return result;
}

private void handlePlaceholderEvent(final Channel channel, final PlaceholderEvent event) {
PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition("1", event.getFileName(), event.getPosition()));
PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
pushRecord(channel, record);
}

Expand All @@ -167,9 +168,4 @@ private void pushRecord(final Channel channel, final Record record) {
private boolean filter(final String database, final String fullTableName) {
return !fullTableName.startsWith(database + ".");
}

private Object getColumnValue(final String tableName, final int index, final Serializable data) {
//var columns = dbMetaDataUtil.getColumnNames(tableName);
return data;
}
}
Expand Up @@ -56,11 +56,11 @@ private void getCurrentPositionFromSource() {
PreparedStatement ps = connection.prepareStatement("show master status");
ResultSet rs = ps.executeQuery();
rs.next();
currentPosition = new BinlogPosition(null, rs.getString(1), rs.getLong(2));
currentPosition = new BinlogPosition(rs.getString(1), rs.getLong(2));
ps = connection.prepareStatement("show variables like 'server_id'");
rs = ps.executeQuery();
rs.next();
currentPosition.setServerId(rs.getString(2));
currentPosition.setServerId(rs.getLong(2));
} catch (SQLException e) {
throw new RuntimeException("markPosition error", e);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.shardingscaling.mysql.binlog.codec;

import org.apache.shardingsphere.shardingscaling.mysql.binlog.BinlogContext;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.shardingscaling.mysql.binlog.event.UpdateRowsEvent;
Expand Down Expand Up @@ -119,10 +120,8 @@ private DeleteRowsEvent decodeDeleteRowsEventV2(final BinlogEventHeader binlogEv
rowsEvent.parsePostHeader(in);
rowsEvent.parsePayload(binlogContext, in);
DeleteRowsEvent result = new DeleteRowsEvent();
result.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
result.setBeforeColumns(rowsEvent.getColumnValues1());
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getEndLogPos());
initRowsEvent(result, binlogEventHeader, rowsEvent.getTableId());
result.setBeforeRows(rowsEvent.getRows1());
return result;
}

Expand All @@ -131,11 +130,9 @@ private UpdateRowsEvent decodeUpdateRowsEventV2(final BinlogEventHeader binlogEv
rowsEvent.parsePostHeader(in);
rowsEvent.parsePayload(binlogContext, in);
UpdateRowsEvent result = new UpdateRowsEvent();
result.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
result.setBeforeColumns(rowsEvent.getColumnValues1());
result.setAfterColumns(rowsEvent.getColumnValues2());
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getEndLogPos());
initRowsEvent(result, binlogEventHeader, rowsEvent.getTableId());
result.setBeforeRows(rowsEvent.getRows1());
result.setAfterRows(rowsEvent.getRows2());
return result;
}

Expand All @@ -144,13 +141,19 @@ private WriteRowsEvent decodeWriteRowsEventV2(final BinlogEventHeader binlogEven
rowsEvent.parsePostHeader(in);
rowsEvent.parsePayload(binlogContext, in);
WriteRowsEvent result = new WriteRowsEvent();
result.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
result.setAfterColumns(rowsEvent.getColumnValues1());
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getEndLogPos());
initRowsEvent(result, binlogEventHeader, rowsEvent.getTableId());
result.setAfterRows(rowsEvent.getRows1());
return result;
}

private void initRowsEvent(final AbstractRowsEvent rowsEvent, final BinlogEventHeader binlogEventHeader, final long tableId) {
rowsEvent.setTableName(binlogContext.getFullTableName(tableId));
rowsEvent.setFileName(binlogContext.getFileName());
rowsEvent.setPosition(binlogEventHeader.getEndLogPos());
rowsEvent.setTimestamp(binlogEventHeader.getTimeStamp());
rowsEvent.setServerId(binlogEventHeader.getServerId());
}

private PlaceholderEvent createPlaceholderEvent(final BinlogEventHeader binlogEventHeader) {
PlaceholderEvent result = new PlaceholderEvent();
result.setFileName(binlogContext.getFileName());
Expand Down
Expand Up @@ -26,9 +26,13 @@
*/
@Data
public abstract class AbstractBinlogEvent {

private long serverId;

private String fileName;

private long position;

private long timestamp;

}