Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState;
import com.codingapi.dbstream.listener.SQLExecuteListener;

public class MySQLListener implements SQLExecuteListener {

@Override
public int order() {
return 0;
}

@Override
public void before(SQLExecuteState executeState) {
System.out.println("执行前 - SQL: " + executeState.getSql());
Expand Down Expand Up @@ -258,8 +264,9 @@ mvn clean test -P travis
- 手动事务模式下,事件会在 `commit()` 时批量推送
- 事务回滚时,相关事件会被丢弃

3. **数据表限制**:
- 执行数据拦截事件的分析,要求表必须存在主键的定义
3. **使用场景限制**:
- 数据库表必须有主键的定义,在DELETE事件需要明确主键信息,主键物理表不存在时可通过外部key文件配置的方式添加。
- 若INSERT INTO SELECT 语句中,采用主键自增模式,受限于JDBC的支持将无法解析到自增ID,建议修改单条保存或修改ID为手动传递。

4. **元数据缓存**:
- 数据库元数据会在首次连接时自动扫描并缓存
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.codingapi.dbstream</groupId>
<artifactId>dbstream-driver</artifactId>
<version>1.0.6</version>
<version>1.0.7</version>

<url>https://github.com/codingapi/dbstream-driver</url>
<name>dbstream-driver</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import lombok.Getter;

import java.sql.SQLException;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class SQLRunningContext implements SQLExecuteListener {
public class SQLRunningContext {

@Getter
private final static SQLRunningContext instance = new SQLRunningContext();
Expand All @@ -23,10 +24,11 @@ public void addListener(SQLExecuteListener listener) {
if (listener != null) {
listeners.add(listener);
}

listeners.sort(Comparator.comparingInt(SQLExecuteListener::order));
}


@Override
public void after(SQLExecuteState executeState, Object result) throws SQLException {
executeState.setResult(result);
executeState.after();
Expand All @@ -36,7 +38,6 @@ public void after(SQLExecuteState executeState, Object result) throws SQLExcepti
}


@Override
public void before(SQLExecuteState executeState) throws SQLException {
executeState.begin();
for (SQLExecuteListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class SQLDeleteExecuteListener implements SQLExecuteListener {

private final static ThreadLocal<DeleteDBEventParser> threadLocal = new ThreadLocal<>();

@Override
public int order() {
return 100;
}

@Override
public void before(SQLExecuteState executeState) throws SQLException {
String sql = executeState.getSql();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

public interface SQLExecuteListener {

/**
* 执行顺序,越小越靠前
* @return index
*/
int order();

/**
* before sql execute
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class SQLInsertExecuteListener implements SQLExecuteListener {

private final static ThreadLocal<InsertDBEventParser> threadLocal = new ThreadLocal<>();

@Override
public int order() {
return 100;
}

@Override
public void before(SQLExecuteState executeState) throws SQLException {
String sql = executeState.getSql();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class SQLUpdateExecuteListener implements SQLExecuteListener {

private final static ThreadLocal<UpdateDBEventParser> threadLocal = new ThreadLocal<>();

@Override
public int order() {
return 100;
}

@Override
public void before(SQLExecuteState executeState) throws SQLException {
String sql = executeState.getSql();
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/codingapi/dbstream/stream/DBEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ void setTransactionKey(String transactionKey) {
}

public void addPrimaryKey(String primaryKey) {
this.primaryKeys.add(primaryKey);
if(!this.primaryKeys.contains(primaryKey)) {
this.primaryKeys.add(primaryKey);
}
}


public boolean hasPrimaryKeys(){
return this.primaryKeys!=null && !this.primaryKeys.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ public class DefaultDBEventPusher implements DBEventPusher {

@Override
public void push(List<DBEvent> events) {
System.out.println(events);
System.out.println("<=== DBStream DBEvent Total "+events.size()+" ===> ");
for(DBEvent event:events){
System.out.println(event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
@Slf4j
public class MySQLListener implements SQLExecuteListener {

@Override
public int order() {
return 0;
}

@Override
public void after(SQLExecuteState executeState, Object result) throws SQLException {
log.info("after sql:{},params:{},execute timestamp:{}", executeState.getSql(), executeState.getListParams(), executeState.getExecuteTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ public interface User1Repository extends JpaRepository<User1, Long> {
@Query("select count(u) from User1 as u")
int counts();

@Modifying
@Query("delete from User1 where id > ?1")
int clearData(long id);

@Modifying
@Query("insert into User1(email,username,password,nickname) select u.email,u.username,u.password,u.nickname from User1 u")
int insertIntoFromSelect();
void insertIntoFromSelect();
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import org.springframework.test.annotation.Rollback;

import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertThrows;


@SpringBootTest
Expand All @@ -25,6 +26,7 @@ class User1RepositoryTest {
@Autowired
private User1Repository userRepository;


@BeforeEach
void setUp() {
DBStreamContext.getInstance().addEventPusher(new DBEventPusher() {
Expand Down Expand Up @@ -164,4 +166,38 @@ void test7() {
});
}

/**
* 异常回滚测试
*/
@Test
@Transactional
@Rollback(false)
void test8() {

DBStreamContext.getInstance().addListener(new MySQLListener());

List<User1> list = new ArrayList<>();


for (int i=0;i<10;i++){
User1 user1 = new User1();
user1.setUsername("admin1");
user1.setPassword("admin1");
user1.setEmail("admin1@example.com");
user1.setNickname("admin1");
list.add(user1);

User1 user2 = new User1();
user2.setUsername("admin2");
user2.setPassword("admin2");
user2.setEmail("admin2@example.com");
user2.setNickname("admin2");
list.add(user2);
}

userRepository.saveAll(list);


}

}