Skip to content

Commit ff4b32d

Browse files
authored
[feature][connector][mysql-cdc] add MySQL CDC enumerator (#3481)
* [feature][connector][mysql-cdc] add mysql-cdc enumerator classes * [chore] cdc-base package rename to org.apache.seatunnel * [chore] reference static method * [feature][connector][mysql-cdc] support earliest startup mode * [feature][connector][mysql-cdc] add MySqlIncrementalSource * fix error
1 parent b247ff0 commit ff4b32d

File tree

82 files changed

+1148
-280
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1148
-280
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.engine.common.utils;
18+
package org.apache.seatunnel.common.utils.function;
1919

2020
@FunctionalInterface
2121
public interface ConsumerWithException<T> {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.common.utils.function;
19+
20+
/**
21+
* A functional interface for a {@link java.util.function.Function} that may throw exceptions.
22+
*
23+
* @param <T> The type of the argument to the function.
24+
* @param <R> The type of the result of the supplier.
25+
* @param <E> The type of Exceptions thrown by this function.
26+
*/
27+
@FunctionalInterface
28+
public interface FunctionWithException<T, R, E extends Throwable> {
29+
/**
30+
* Applies this function to the given argument.
31+
*
32+
* @param value The argument to the function.
33+
* @return The result of thus supplier.
34+
* @throws E This function may throw an exception.
35+
*/
36+
R apply(T value) throws E;
37+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.seatunnel.engine.common.utils;
19+
package org.apache.seatunnel.common.utils.function;
2020

2121
/**
2222
* Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.seatunnel.engine.common.utils;
19+
package org.apache.seatunnel.common.utils.function;
2020

2121
/**
2222
* A functional interface for a {@link java.util.function.Supplier} that may throw exceptions.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
1921

2022
import io.debezium.config.Configuration;
2123
import lombok.Getter;
22-
import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
2324

2425
import java.util.Properties;
2526

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
1921

2022
import io.debezium.relational.RelationalDatabaseConnectorConfig;
21-
import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
2223

2324
import java.time.Duration;
2425
import java.util.List;
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
1919

20-
import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
21-
import org.seatunnel.connectors.cdc.base.option.SourceOptions;
20+
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
21+
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
2222

2323
import java.time.Duration;
2424
import java.util.Arrays;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
1919

2020
import java.io.Serializable;
2121

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
21+
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
22+
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
1923

2024
import lombok.AllArgsConstructor;
2125
import lombok.EqualsAndHashCode;
2226
import lombok.Getter;
23-
import org.seatunnel.connectors.cdc.base.option.StartupMode;
24-
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
25-
import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
2627

2728
import java.io.Serializable;
2829

@@ -45,7 +46,7 @@ public Offset getStartupOffset(OffsetFactory offsetFactory) {
4546
case INITIAL:
4647
return null;
4748
case TIMESTAMP:
48-
return offsetFactory.timstamp(timestamp);
49+
return offsetFactory.timestamp(timestamp);
4950
default:
5051
throw new IllegalArgumentException(String.format("The %s mode is not supported.", startupMode));
5152
}
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.seatunnel.connectors.cdc.base.config;
18+
package org.apache.seatunnel.connectors.cdc.base.config;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
21+
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
22+
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
1923

2024
import lombok.AllArgsConstructor;
2125
import lombok.EqualsAndHashCode;
2226
import lombok.Getter;
23-
import org.seatunnel.connectors.cdc.base.option.StopMode;
24-
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
25-
import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
2627

2728
import java.io.Serializable;
2829

@@ -46,7 +47,7 @@ public Offset getStopOffset(OffsetFactory offsetFactory) {
4647
case SPECIFIC:
4748
return offsetFactory.specific(specificOffsetFile, specificOffsetPos);
4849
case TIMESTAMP:
49-
return offsetFactory.timstamp(timestamp);
50+
return offsetFactory.timestamp(timestamp);
5051
default:
5152
throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode));
5253
}

0 commit comments

Comments
 (0)