Skip to content

Commit

Permalink
---迁移坑爹版 V0.0.5 基本上暂时可用
Browse files Browse the repository at this point in the history
  • Loading branch information
fireflyhoo committed Apr 3, 2016
1 parent 0154a4b commit 027abd9
Show file tree
Hide file tree
Showing 9 changed files with 505 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public void setPassword(String password) {
*/
private String schema;

private volatile BackendConnectionState state = BackendConnectionState.connecting;

public int getServerSecretKey() {
return serverSecretKey;
}

public void setServerSecretKey(int serverSecretKey) {
this.serverSecretKey = serverSecretKey;
}

/**
* 数据源配置
*/
Expand Down Expand Up @@ -102,7 +112,7 @@ public void setPool(PostgreSQLDataSource pool) {
private volatile int txIsolation;
private volatile boolean modifiedSQLExecuted = false;
private long lastTime;
private AtomicBoolean isQuit;
private AtomicBoolean isQuit = new AtomicBoolean(false);

// PostgreSQL服务端密码
private int serverSecretKey;
Expand Down Expand Up @@ -222,6 +232,10 @@ public boolean setResponseHandler(ResponseHandler commandHandler) {
return true;
}

public ResponseHandler getResponseHandler() {
return responseHandler;
}

@Override
public void commit() {
ByteBuffer buf = this.allocate();
Expand All @@ -240,10 +254,16 @@ public void query(String query) throws UnsupportedEncodingException {
}

@Override
public void execute(RouteResultsetNode node, ServerConnection source,
public void execute(RouteResultsetNode rrn, ServerConnection sc,
boolean autocommit) throws IOException {
// TODO Auto-generated method stub

LOGGER.warn("{}查询任务。。。。", id);
if (!modifiedSQLExecuted && rrn.isModifySQL()) {
modifiedSQLExecuted = true;
}
String xaTXID = sc.getSession2().getXaTXID();
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
autocommit);
}

/*******
Expand Down Expand Up @@ -464,29 +484,52 @@ private void updateConnectionInfo(PostgreSQLBackendConnection conn)
}

}

@Override
public void onConnectfinish() {
LOGGER.debug("连接后台真正完成");
try {
SocketChannel chan = (SocketChannel)this.channel;
SocketChannel chan = (SocketChannel) this.channel;
ByteBuffer buf = PacketUtils.makeStartUpPacket(user, schema);
buf.flip();
chan.write(buf);
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 读取可能的Socket字节流
*/
public void onReadData(int got) throws IOException {
int offset = readBufferOffset;
readBufferOffset = readBufferOffset +got;
readBuffer.position(offset);
int offset = readBufferOffset + 0;
readBufferOffset = readBufferOffset + got;
readBuffer.position(offset);
byte[] data = new byte[got];
readBuffer.get(data, 0, got);
readBuffer.clear();
readBufferOffset = 0;
handle(data);
}

public static enum BackendConnectionState {
connecting, connected, closed
}

public BackendConnectionState getState() {
return state;
}

public void setState(BackendConnectionState state) {
this.state = state;
}

public boolean isInTransaction() {
return inTransaction;
}

public void setInTransaction(boolean inTransaction) {
this.inTransaction = inTransaction;
}

}
Loading

0 comments on commit 027abd9

Please sign in to comment.