Skip to content

Commit

Permalink
Fixed #298
Browse files Browse the repository at this point in the history
  • Loading branch information
kantlin authored and peacewong committed May 20, 2020
1 parent 2151639 commit ec79dfe
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 0 deletions.
83 changes: 83 additions & 0 deletions datasource/metadatamanager/mysql/service/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Copyright 2019 WeBank
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<groupId>com.webank.wedatasphere.linkis</groupId>
<version>0.9.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>linkis-metadatamanager-service-mysql</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mysql.version>5.1.34</mysql.version>
</properties>

<dependencies>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-metadatamanager-common</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-module</artifactId>
<version>${linkis.version}</version>
<exclusions>
<exclusion>
<artifactId>asm</artifactId>
<groupId>org.ow2.asm</groupId>
</exclusion>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<finalName>${project.artifactId}-${project.version}</finalName>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.wedatasphere.linkis.metadatamanager.service;

import com.webank.wedatasphere.linkis.common.conf.CommonVars;
import com.webank.wedatasphere.linkis.metadatamanager.common.domain.MetaColumnInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author davidhua
* 2020/02/14
*/
public class SqlConnection implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(SqlConnection.class);

private static final CommonVars<String> SQL_DRIVER_CLASS =
CommonVars.apply("wds.linkis.server.mdm.service.sql.driver", "com.mysql.jdbc.Driver");

private static final CommonVars<String> SQL_CONNECT_URL =
CommonVars.apply("wds.linkis.server.mdm.service.sql.url", "jdbc:mysql://%s:%s/%s");

private Connection conn;

private ConnectMessage connectMessage;

public SqlConnection(String host, Integer port,
String username, String password,
Map<String, Object> extraParams ) throws ClassNotFoundException, SQLException {
connectMessage = new ConnectMessage(host, port, username, password, extraParams);
conn = getDBConnection(connectMessage, "");
//Try to create statement
Statement statement = conn.createStatement();
statement.close();
}

public List<String> getAllDatabases() throws SQLException {
java.util.List<java.lang.String> dataBaseName = new ArrayList<>();
Statement stmt = null;
ResultSet rs = null;
try{
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW DATABASES");
while (rs.next()){
dataBaseName.add(rs.getString(1));
}
} finally {
closeResource(null, stmt, rs);
}
return dataBaseName;
}

public List<String> getAllTables(String database) throws SQLException {
List<String> tableNames = new ArrayList<>();
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW TABLES FROM `" + database + "`");
while (rs.next()) {
tableNames.add(rs.getString(1));
}
return tableNames;
} finally{
closeResource(null, stmt, rs);
}
}

public List<MetaColumnInfo> getColumns(String database, String table) throws SQLException, ClassNotFoundException {
List<MetaColumnInfo> columns = new ArrayList<>();
String columnSql = "SELECT * FROM `" + database +"`.`" + table + "` WHERE 1 = 2";
PreparedStatement ps = null;
ResultSet rs = null;
ResultSetMetaData meta = null;
try {
List<String> primaryKeys = getPrimaryKeys(getDBConnection(connectMessage, database), table);
ps = conn.prepareStatement(columnSql);
rs = ps.executeQuery();
meta = rs.getMetaData();
int columnCount = meta.getColumnCount();
for (int i = 1; i < columnCount + 1; i++) {
MetaColumnInfo info = new MetaColumnInfo();
info.setIndex(i);
info.setName(meta.getColumnName(i));
info.setType(meta.getColumnTypeName(i));
if(primaryKeys.contains(meta.getColumnName(i))){
info.setPrimaryKey(true);
}
columns.add(info);
}
}finally {
closeResource(null, ps, rs);
}
return columns;
}

/**
* Get primary keys
* @param connection connection
* @param table table name
* @return
* @throws SQLException
*/
private List<String> getPrimaryKeys(Connection connection, String table) throws SQLException {
ResultSet rs = null;
List<String> primaryKeys = new ArrayList<>();
try {
DatabaseMetaData dbMeta = connection.getMetaData();
rs = dbMeta.getPrimaryKeys(null, null, table);
while(rs.next()){
primaryKeys.add(rs.getString("column_name"));
}
return primaryKeys;
}finally{
if(null != rs){
closeResource(connection, null, rs);
}
}
}

/**
* close database resource
* @param connection connection
* @param statement statement
* @param resultSet result set
*/
private void closeResource(Connection connection, Statement statement, ResultSet resultSet){
try {
if(null != resultSet && !resultSet.isClosed()) {
resultSet.close();
}
if(null != statement && !statement.isClosed()){
statement.close();
}
if(null != connection && !connection.isClosed()){
connection.close();
}
}catch (SQLException e){
LOG.warn("Fail to release resource [" + e.getMessage() +"]", e);
}
}

@Override
public void close() throws IOException {
closeResource(conn, null, null);
}

/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
private Connection getDBConnection(ConnectMessage connectMessage, String database) throws ClassNotFoundException, SQLException {
String extraParamString = connectMessage.extraParams.entrySet()
.stream().map(e -> String.join("=", e.getKey(), String.valueOf(e.getValue())))
.collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url = String.format(SQL_CONNECT_URL.getValue(), connectMessage.host, connectMessage.port, database);
if(!connectMessage.extraParams.isEmpty()) {
url += "?" + extraParamString;
}
return DriverManager.getConnection(url, connectMessage.username, connectMessage.password);
}

/**
* Connect message
*/
private static class ConnectMessage{
private String host;

private Integer port;

private String username;

private String password;

private Map<String, Object> extraParams;

public ConnectMessage(String host, Integer port,
String username, String password,
Map<String, Object> extraParams){
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.extraParams = extraParams;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.wedatasphere.linkis.metadatamanager.service;

import com.webank.wedatasphere.linkis.metadatamanager.common.Json;
import com.webank.wedatasphere.linkis.metadatamanager.common.domain.MetaColumnInfo;
import com.webank.wedatasphere.linkis.metadatamanager.common.service.AbstractMetaService;
import com.webank.wedatasphere.linkis.metadatamanager.common.service.MetadataConnection;
import org.springframework.stereotype.Component;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author davidhua
* 2020/02/14
*/
@Component
public class SqlMetaService extends AbstractMetaService<SqlConnection> {
@Override
public MetadataConnection<SqlConnection> getConnection(String operator, Map<String, Object> params) throws Exception {
String host = String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_HOST.getValue(), ""));
//After deserialize, Integer will be Double, Why?
Integer port = (Double.valueOf(String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_PORT.getValue(), 0)))).intValue();
String username = String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_USERNAME.getValue(), ""));
String password = String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_PASSWORD.getValue(), ""));
Map<String, Object> extraParams = new HashMap<>();
Object sqlParamObj = params.get(SqlParamsMapper.PARAM_SQL_EXTRA_PARAMS.getValue());
if(null != sqlParamObj){
if(!(sqlParamObj instanceof Map)){
extraParams = Json.fromJson(String.valueOf(sqlParamObj), Map.class, String.class, Object.class);
}else{
extraParams = (Map<String, Object>)sqlParamObj;
}
}
assert extraParams != null;
return new MetadataConnection<>(new SqlConnection(host, port, username, password, extraParams));
}

@Override
public List<String> queryDatabases(SqlConnection connection) {
try {
return connection.getAllDatabases();
} catch (SQLException e) {
throw new RuntimeException("Fail to get Sql databases(获取数据库列表失败)", e);
}
}

@Override
public List<String> queryTables(SqlConnection connection, String database) {
try {
return connection.getAllTables(database);
} catch (SQLException e) {
throw new RuntimeException("Fail to get Sql tables(获取表列表失败)", e);
}
}

@Override
public List<MetaColumnInfo> queryColumns(SqlConnection connection, String database, String table) {
try {
return connection.getColumns(database, table);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
}

0 comments on commit ec79dfe

Please sign in to comment.