Skip to content
Permalink
Browse files
[feature](mysql-table) support utf8mb4 for mysql external table (#9402)
This patch supports utf8mb4 for mysql external table.

if someone needs a mysql external table with utf8mb4 charset, but only support charset utf8 right now.

When create mysql external table, it can add an optional propertiy "charset" which can set character fom mysql connection, 
default value is "utf8". You can set "utf8mb4" instead of "utf8" when you need.
  • Loading branch information
nextdreamblue committed May 11, 2022
1 parent 092a12e commit 375c1bf5c0fbe742c6a16f92dbce847c756acef0
Showing 18 changed files with 51 additions and 12 deletions.
@@ -75,6 +75,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
_my_param.user = mysql_table->user();
_my_param.passwd = mysql_table->passwd();
_my_param.db = mysql_table->mysql_db();
_my_param.charset = mysql_table->charset();
// new one scanner
_mysql_scanner.reset(new (std::nothrow) MysqlScanner(_my_param));

@@ -73,7 +73,7 @@ Status MysqlScanner::open() {
return _error_status("mysql real connect failed.");
}

if (mysql_set_character_set(_my_conn, "utf8")) {
if (mysql_set_character_set(_my_conn, _my_param.charset.c_str())) {
return Status::InternalError("mysql set character set failed.");
}

@@ -40,6 +40,7 @@ struct MysqlScannerParam {
std::string user;
std::string passwd;
std::string db;
std::string charset;
unsigned long client_flag;
MysqlScannerParam() : client_flag(0) {}
};
@@ -186,13 +186,14 @@ MySQLTableDescriptor::MySQLTableDescriptor(const TTableDescriptor& tdesc)
_host(tdesc.mysqlTable.host),
_port(tdesc.mysqlTable.port),
_user(tdesc.mysqlTable.user),
_passwd(tdesc.mysqlTable.passwd) {}
_passwd(tdesc.mysqlTable.passwd),
_charset(tdesc.mysqlTable.charset) {}

std::string MySQLTableDescriptor::debug_string() const {
std::stringstream out;
out << "MySQLTable(" << TableDescriptor::debug_string() << " _db" << _mysql_db
<< " table=" << _mysql_table << " host=" << _host << " port=" << _port << " user=" << _user
<< " passwd=" << _passwd;
<< " passwd=" << _passwd << " charset=" << _charset;
return out.str();
}

@@ -233,6 +233,7 @@ class MySQLTableDescriptor : public TableDescriptor {
const std::string port() const { return _port; }
const std::string user() const { return _user; }
const std::string passwd() const { return _passwd; }
const std::string charset() const { return _charset; }

private:
std::string _mysql_db;
@@ -241,6 +242,7 @@ class MySQLTableDescriptor : public TableDescriptor {
std::string _port;
std::string _user;
std::string _passwd;
std::string _charset;
};

class ODBCTableDescriptor : public TableDescriptor {
@@ -48,6 +48,7 @@ Status MysqlTableSink::init(const TDataSink& t_sink) {
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_mysql_tbl = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
@@ -32,7 +32,7 @@ std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;

ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db
<< ",passwd=" << passwd << ")";
<< ",passwd=" << passwd << ",charset=" << charset << ")";
return ss.str();
}

@@ -62,7 +62,7 @@ Status MysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string&
}

// set character
if (mysql_set_character_set(_mysql_conn, "utf8")) {
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
std::stringstream ss;
ss << "mysql_set_character_set failed because " << mysql_error(_mysql_conn);
return Status::InternalError(ss.str());
@@ -32,6 +32,7 @@ struct MysqlConnInfo {
std::string passwd;
std::string db;
int port;
std::string charset;

std::string debug_string() const;
};
@@ -48,6 +48,7 @@ Status VMysqlTableSink::init(const TDataSink& t_sink) {
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_mysql_tbl = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
@@ -59,7 +59,7 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string
}

// set character
if (mysql_set_character_set(_mysql_conn, "utf8")) {
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
mysql_error(_mysql_conn));
@@ -48,6 +48,7 @@ Which type of external table is mainly identified by the ENGINE type, currently
"table" = "table_name"
)
````
and there is an optional propertiy "charset" which can set character fom mysql connection, default value is "utf8". You can set another value "utf8mb4" instead of "utf8" when you need.

Notice:

@@ -133,7 +134,8 @@ Which type of external table is mainly identified by the ENGINE type, currently
"user" = "mysql_user",
"password" = "mysql_passwd",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
"table" = "mysql_table_test",
"charset" = "utf8mb4"
)
````

@@ -48,6 +48,7 @@ CREATE EXTERNAL TABLE
"table" = "table_name"
)
```
以及一个可选属性"charset",可以用来设置mysql连接的字符集, 默认值是"utf8"。如有需要,你可以设置为另外一个字符集"utf8mb4"。

注意:

@@ -133,7 +134,8 @@ CREATE EXTERNAL TABLE
"user" = "mysql_user",
"password" = "mysql_passwd",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
"table" = "mysql_table_test",
"charset" = "utf8mb4"
)
```

@@ -207,7 +207,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
mysqlTable.getUserName(),
mysqlTable.getPasswd(),
mysqlTable.getMysqlDatabaseName(),
mysqlTable.getMysqlTableName());
mysqlTable.getMysqlTableName(),
mysqlTable.getCharset());
totalRows.add(row);
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, table.getType());
@@ -4290,6 +4290,7 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, Table table, List<
sb.append("\"port\" = \"").append(mysqlTable.getPort()).append("\",\n");
sb.append("\"user\" = \"").append(mysqlTable.getUserName()).append("\",\n");
sb.append("\"password\" = \"").append(hidePassword ? "" : mysqlTable.getPasswd()).append("\",\n");
sb.append("\"charset\" = \"").append(mysqlTable.getCharset()).append("\",\n");
} else {
sb.append("\"odbc_catalog_resource\" = \"").append(mysqlTable.getOdbcCatalogResourceName()).append("\",\n");
}
@@ -48,6 +48,7 @@ public class MysqlTable extends Table {
private static final String MYSQL_PASSWORD = "password";
private static final String MYSQL_DATABASE = "database";
private static final String MYSQL_TABLE = "table";
private static final String MYSQL_CHARSET = "charset";

private String odbcCatalogResourceName;
private String host;
@@ -56,6 +57,7 @@ public class MysqlTable extends Table {
private String passwd;
private String mysqlDatabaseName;
private String mysqlTableName;
private String charset;

public MysqlTable() {
super(TableType.MYSQL);
@@ -124,6 +126,15 @@ private void validate(Map<String, String> properties) throws DdlException {
throw new DdlException("Password of MySQL table is null. "
+ "Please set proper resource or add properties('password'='xxxx') when create table");
}

charset = properties.get(MYSQL_CHARSET);
if (charset == null) {
charset = "utf8";
}
if (!charset.equalsIgnoreCase("utf8") && !charset.equalsIgnoreCase("utf8mb4")) {
throw new DdlException("Unknown character set of MySQL table. "
+ "Please set charset 'utf8' or 'utf8mb4', other charsets not be unsupported now.");
}
}

mysqlDatabaseName = properties.get(MYSQL_DATABASE);
@@ -193,9 +204,16 @@ public String getMysqlTableName() {
return mysqlTableName;
}

public String getCharset() {
if (charset != null) {
return charset;
}
return "utf8";
}

public TTableDescriptor toThrift() {
TMySQLTable tMySQLTable =
new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(), mysqlDatabaseName, mysqlTableName);
TMySQLTable tMySQLTable = new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(),
mysqlDatabaseName, mysqlTableName, getCharset());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MYSQL_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setMysqlTable(tMySQLTable);
@@ -213,6 +231,7 @@ public String getSignature(int signatureVersion) {
sb.append(getPasswd());
sb.append(mysqlDatabaseName);
sb.append(mysqlTableName);
sb.append(getCharset());
String md5 = DigestUtils.md5Hex(sb.toString());
LOG.debug("get signature of mysql table {}: {}. signature string: {}", name, md5, sb.toString());
return md5;
@@ -230,6 +249,7 @@ public void write(DataOutput out) throws IOException {
serializeMap.put(MYSQL_PASSWORD, passwd);
serializeMap.put(MYSQL_DATABASE, mysqlDatabaseName);
serializeMap.put(MYSQL_TABLE, mysqlTableName);
serializeMap.put(MYSQL_CHARSET, charset);

int size = (int) serializeMap.values().stream().filter(v -> {
return v != null;
@@ -262,5 +282,6 @@ public void readFields(DataInput in) throws IOException {
passwd = serializeMap.get(MYSQL_PASSWORD);
mysqlDatabaseName = serializeMap.get(MYSQL_DATABASE);
mysqlTableName = serializeMap.get(MYSQL_TABLE);
charset = serializeMap.get(MYSQL_CHARSET);
}
}
@@ -30,6 +30,7 @@ public class MysqlTableSink extends DataSink {
private final String passwd;
private final String db;
private final String tbl;
private final String charset;

public MysqlTableSink(MysqlTable mysqlTable) {
host = mysqlTable.getHost();
@@ -38,6 +39,7 @@ public MysqlTableSink(MysqlTable mysqlTable) {
passwd = mysqlTable.getPasswd();
db = mysqlTable.getMysqlDatabaseName();
tbl = mysqlTable.getMysqlTableName();
charset = mysqlTable.getCharset();
}

@Override
@@ -52,7 +54,7 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) {
protected TDataSink toThrift() {
TDataSink tDataSink = new TDataSink(TDataSinkType.MYSQL_TABLE_SINK);

tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl));
tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl, charset));
return tDataSink;
}

@@ -94,6 +94,7 @@ struct TMysqlTableSink {
4: required string passwd
5: required string db
6: required string table
7: required string charset
}

struct TOdbcTableSink {
@@ -214,6 +214,7 @@ struct TMySQLTable {
4: required string passwd
5: required string db
6: required string table
7: required string charset
}

struct TOdbcTable {

0 comments on commit 375c1bf

Please sign in to comment.