Skip to content

Commit

Permalink
Merge pull request #7 from WeBankFinTech/dev-0.8.0
Browse files Browse the repository at this point in the history
Dev 0.8.0
  • Loading branch information
liuyou2 committed Jun 22, 2020
2 parents 53db7a9 + ea4a482 commit c8f78db
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 30 deletions.
2 changes: 1 addition & 1 deletion bin/checkServices.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ MICRO_SERVICE_PORT=$3

local_host="`hostname --fqdn`"

ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

function isLocal(){
if [ "$1" == "127.0.0.1" ];then
Expand Down
2 changes: 1 addition & 1 deletion bin/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ source ${DISTRIBUTION}
isSuccess "load config"

local_host="`hostname --fqdn`"
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

function isLocal(){
if [ "$1" == "127.0.0.1" ];then
Expand Down
2 changes: 1 addition & 1 deletion bin/start-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fi
}
local_host="`hostname --fqdn`"

ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

function isLocal(){
if [ "$1" == "127.0.0.1" ];then
Expand Down
2 changes: 1 addition & 1 deletion bin/stop-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export DISTRIBUTION=${DISTRIBUTION:-"${CONF_DIR}/config.sh"}
source ${DISTRIBUTION}

local_host="`hostname --fqdn`"
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

function isSuccess(){
if [ $? -ne 0 ]; then
Expand Down
4 changes: 2 additions & 2 deletions docs/en_US/ch2/DSS Quick Installation Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ dss_port="8088"
linkis_url="http://127.0.0.1:9001"
# dss ip address
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
```

The environment is ready, click me to enter ****[4. Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)**
Expand Down Expand Up @@ -219,7 +219,7 @@ dss_port="8088"
linkis_url="http://127.0.0.1:9001"
# dss ip address
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
```

The environment is ready, click me to enter **[Four Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)**
Expand Down
6 changes: 3 additions & 3 deletions docs/zh_CN/ch2/DSS快速安装使用文档.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ dss_port="8088"
linkis_url="http://127.0.0.1:9001"
# dss ip address
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
```

环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)
Expand Down Expand Up @@ -243,7 +243,7 @@ dss_port="8088"
linkis_url="http://127.0.0.1:9001"
# dss ip address
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
```

环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)
Expand Down Expand Up @@ -365,7 +365,7 @@ dss_port="8088"
linkis_url="http://127.0.0.1:9001"
# dss ip address
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')
```

环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public Project createProject(Project project, Session session) throws AppJointEr
params.add(new BasicNameValuePair("name", project.getName()));
params.add(new BasicNameValuePair("description", project.getDescription()));
HttpPost httpPost = new HttpPost(projectUrl);
httpPost.addHeader(HTTP.CONTENT_ENCODING, "UTF-8");
httpPost.addHeader(HTTP.CONTENT_ENCODING, HTTP.IDENTITY_CODING);
CookieStore cookieStore = new BasicCookieStore();
cookieStore.addCookie(session.getCookies()[0]);
HttpEntity entity = EntityBuilder.create().setContentEncoding("UTF-8").
setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8))
HttpEntity entity = EntityBuilder.create()
.setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8))
.setParameters(params).build();
httpPost.setEntity(entity);
CloseableHttpClient httpClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,39 +80,62 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum
boolean result = false;
String vNewMsgID = "-1";
PreparedStatement updatePstmt = null;
PreparedStatement pstmtForGetID = null;
Connection msgConn = null;
vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo);
try {
if(StringUtils.isNotEmpty(vNewMsgID) && StringUtils.isNotBlank(vNewMsgID) && !"-1".equals(vNewMsgID)){
msgConn = getEventCheckerConnection(props,log);
if(msgConn == null) return false;
int vProcessID = jobId;
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
log.info("last message offset {} is:" + lastMsgId);
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
updatePstmt.setString(1, receiver);
updatePstmt.setString(2, topic);
updatePstmt.setString(3, msgName);
updatePstmt.setString(4, vReceiveTime);
updatePstmt.setString(5, vNewMsgID);
int updaters = updatePstmt.executeUpdate();
log.info("updateMsgOffset successful {} update result is:" + updaters);
if(updaters != 0){
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
//return true after update success
result = true;
msgConn.setAutoCommit(false);
String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=? for update";
pstmtForGetID = msgConn.prepareCall(sqlForReadMsgID);
pstmtForGetID.setString(1, receiver);
pstmtForGetID.setString(2, topic);
pstmtForGetID.setString(3, msgName);
ResultSet rs = pstmtForGetID.executeQuery();
String nowLastMsgId = rs.last()==true ? rs.getString("msg_id"):"0";
log.info("receive message successfully , Now check to see if the latest offset has changed ,nowLastMsgId is {} " + nowLastMsgId);
if("0".equals(nowLastMsgId) || nowLastMsgId.equals(lastMsgId)){

int vProcessID = jobId;
String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");;
String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END";
log.info("last message offset {} is:" + lastMsgId);
updatePstmt = msgConn.prepareCall(sqlForUpdateMsg);
updatePstmt.setString(1, receiver);
updatePstmt.setString(2, topic);
updatePstmt.setString(3, msgName);
updatePstmt.setString(4, vReceiveTime);
updatePstmt.setString(5, vNewMsgID);
int updaters = updatePstmt.executeUpdate();
log.info("updateMsgOffset successful {} update result is:" + updaters);
if(updaters != 0){
log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID);
//return true after update success
result = true;
}else{
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
result = false;
}
}else{
log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID);
log.info("the latest offset has changed , Keep waiting for the signal");
result = false;
}
msgConn.commit();
}else{
result = false;
}
}catch (SQLException e){
log.error("Error update Msg Offset" + e);
try {
msgConn.rollback();
} catch (SQLException ex) {
log.error("transaction rollback failed " + e);
}
return false;
}finally {
closeQueryStmt(pstmtForGetID, log);
closeQueryStmt(updatePstmt, log);
closeConnection(msgConn, log);
}
Expand Down
2 changes: 1 addition & 1 deletion web/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ dss_web_port="8088"
linkis_gateway_url="http://localhost:9001"

#dss nginx ip
dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}')
dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1')

0 comments on commit c8f78db

Please sign in to comment.