Skip to content

Commit

Permalink
support create table with clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaspeng committed Apr 29, 2022
1 parent a3095f7 commit f7268fe
Show file tree
Hide file tree
Showing 16 changed files with 892 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import org.apache.inlong.manager.common.pojo.query.ck.ClickHouseTableQueryBean;
import org.apache.inlong.manager.common.pojo.query.hive.HiveTableQueryBean;

/**
Expand All @@ -28,6 +29,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "tableType", defaultImpl = TableQueryBean.class, visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = HiveTableQueryBean.class, name = "hive"),
@JsonSubTypes.Type(value = ClickHouseTableQueryBean.class, name = "clickhouse"),
})
@Data
public class TableQueryBean {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.pojo.query.ck;

import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;

public class ClickHouseColumnInfoBean extends ColumnInfoBean {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.pojo.query.ck;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.pojo.query.ColumnQueryBean;

@Data
@EqualsAndHashCode(callSuper = true)
public class ClickHouseColumnQueryBean extends ColumnQueryBean {

private String columnType;

private String columnCompressionCode;
private String columnCompressionCodeExpr;

private String columnTTL;
private String columnTTLExpr;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.pojo.query.ck;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.pojo.query.TableQueryBean;

import java.util.List;

@Data
@EqualsAndHashCode(callSuper = true)
public class ClickHouseTableQueryBean extends TableQueryBean {

private String jdbcUrl;
private String username;
private String password;

private String tableEngine;
private String partition;
private String partitionExpr;
private String order;
private String orderExpr;
private String primary;
private String primaryExpr;

private List<ClickHouseColumnQueryBean> columns;
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class ClickHouseSinkDTO {
@ApiModelProperty("Key field names, separate with commas")
private String keyFieldNames;

@ApiModelProperty("Table engine, support MergeTree Mem and so on")
private String tableEngine;

@ApiModelProperty("Properties for clickhouse")
private Map<String, Object> properties;

Expand All @@ -98,6 +101,7 @@ public static ClickHouseSinkDTO getFromRequest(ClickHouseSinkRequest request) {
.partitionStrategy(request.getPartitionStrategy())
.partitionFields(request.getPartitionFields())
.keyFieldNames(request.getKeyFieldNames())
.tableEngine(request.getTableEngine())
.properties(request.getProperties())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,7 @@ public class ClickHouseSinkRequest extends SinkRequest {
@ApiModelProperty("Key field names, separate with commas")
private String keyFieldNames;

@ApiModelProperty("table engine, support MergeTree Mem and so on")
private String tableEngine;

}
4 changes: 4 additions & 0 deletions inlong-manager/manager-dao/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.dao;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import ru.yandex.clickhouse.ClickHouseDatabaseMetadata;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

@Repository
public class ClickHouseServerDao {

private static final String CLICKHOUSE_DRIVER_CLASS = "ru.yandex.clickhouse.ClickHouseDriver";
private static final Logger LOG = LoggerFactory.getLogger(HiveServerDao.class);

public void executeDDL(String ddl, String url, String user, String password) throws Exception {
try (Connection conn = this.getClickHouseConnection(url, user, password)) {
Statement stmt = conn.createStatement();
stmt.execute(ddl);
}
}

/**
* Get Hive tables
*/
public List<String> getTables(String url, String user, String password, String dbname) throws Exception {
List<String> tables = new ArrayList<>();

try (Connection conn = getClickHouseConnection(url, user, password)) {
// get DatabaseMetaData
ClickHouseDatabaseMetadata metaData = (ClickHouseDatabaseMetadata) conn.getMetaData();
// Get the table in the specified database
LOG.info("dbname is {}", dbname);
ResultSet rs = metaData.getTables(dbname, dbname, null, new String[]{"TABLE"});
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
tables.add(tableName);
}
} catch (Exception e) {
LOG.error("a database access error occurs or this method is called on a closed connection", e);
throw e;
}
return tables;
}

/**
* Get Hive connection from hive url and user
*/
public Connection getClickHouseConnection(String url, String user, String password) throws Exception {
if (StringUtils.isBlank(url) || !url.startsWith("jdbc:clickhouse")) {
throw new Exception("ClickHouse server URL was invalid, it should start with jdbc:clickhouse");
}
Connection conn;
try {
Class.forName(CLICKHOUSE_DRIVER_CLASS);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
LOG.error("get clickhouse connection error, please check hive jdbc url, username or password", e);
throw new Exception("get clickhouse connection error, please check jdbc url, username or password. "
+ "other error msg: " + e.getMessage());
}

if (conn == null) {
throw new Exception("get hive connection failed, please contact administrator");
}

LOG.info("get hive connection success, url={}", url);
return conn;
}

public boolean isExistTable(String jdbcUrl, String user, String password, String dbName, String tableName)
throws Exception {
List<String> tables = this.getTables(jdbcUrl, user, password, dbName);
return tables.contains(tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.core.impl;

import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
import org.apache.inlong.manager.common.pojo.query.ConnectionInfo;
import org.apache.inlong.manager.common.pojo.query.DatabaseDetail;
import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
import org.apache.inlong.manager.common.pojo.query.ck.ClickHouseTableQueryBean;
import org.apache.inlong.manager.dao.ClickHouseServerDao;
import org.apache.inlong.manager.service.core.DataSourceService;
import org.apache.inlong.manager.service.resource.ck.builder.ClickHouseCreateDbSqlBuilder;
import org.apache.inlong.manager.service.resource.ck.builder.ClickHouseCreateTableSqlBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class ClickHouseSourceServiceImpl implements DataSourceService<DatabaseQueryBean, ClickHouseTableQueryBean> {

private static final Logger LOGGER = LoggerFactory.getLogger(HiveSourceServiceImpl.class);

@Autowired
ClickHouseServerDao clickHouseServerDao;

@Override
public boolean testConnection(ConnectionInfo connectionInfo) {
return false;
}

@Override
public void createDb(ClickHouseTableQueryBean bean) throws Exception {
ClickHouseCreateDbSqlBuilder builder = new ClickHouseCreateDbSqlBuilder();
String createDbSql = builder.buildDDL(bean);
LOGGER.info("create database sql={}", createDbSql);
clickHouseServerDao.executeDDL(createDbSql, bean.getJdbcUrl(), bean.getUsername(), bean.getPassword());
}

@Override
public void dropDb(DatabaseQueryBean queryBean) throws Exception {

}

@Override
public void createTable(ClickHouseTableQueryBean bean) throws Exception {
ClickHouseCreateTableSqlBuilder builder = new ClickHouseCreateTableSqlBuilder();
String createTableSql = builder.buildDDL(bean);
LOGGER.info("create table sql={}", createTableSql);
clickHouseServerDao.executeDDL(createTableSql, bean.getJdbcUrl(), bean.getUsername(), bean.getPassword());
}

@Override
public void dropTable(ClickHouseTableQueryBean queryBean) throws Exception {

}

@Override
public void createColumn(ClickHouseTableQueryBean queryBean) throws Exception {

}

@Override
public void updateColumn(ClickHouseTableQueryBean queryBean) throws Exception {

}

@Override
public void dropColumn(ClickHouseTableQueryBean queryBean) throws Exception {

}

@Override
public List<ColumnInfoBean> queryColumns(ClickHouseTableQueryBean queryBean) throws Exception {
return null;
}

@Override
public DatabaseDetail queryDbDetail(ClickHouseTableQueryBean queryBean) throws Exception {
return null;
}
}
Loading

0 comments on commit f7268fe

Please sign in to comment.