Skip to content

Commit b978792

Browse files
[Feature][Connector-V2][Hbase] implement hbase catalog (#7516)
1 parent d1c75f7 commit b978792

26 files changed

+1339
-169
lines changed

seatunnel-connectors-v2/connector-hbase/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@
4747
<version>${hbase.version}</version>
4848
</dependency>
4949

50+
<dependency>
51+
<groupId>org.apache.seatunnel</groupId>
52+
<artifactId>seatunnel-format-json</artifactId>
53+
<version>${project.version}</version>
54+
</dependency>
55+
5056
</dependencies>
5157

5258
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
21+
import org.apache.seatunnel.api.table.catalog.Catalog;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23+
import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
24+
import org.apache.seatunnel.api.table.catalog.PreviewResult;
25+
import org.apache.seatunnel.api.table.catalog.TablePath;
26+
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
27+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
28+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
29+
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
30+
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
31+
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
32+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
33+
34+
import lombok.extern.slf4j.Slf4j;
35+
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Optional;
40+
import java.util.stream.Collectors;
41+
42+
import static com.google.common.base.Preconditions.checkNotNull;
43+
44+
/** Hbase catalog implementation. */
45+
@Slf4j
46+
public class HbaseCatalog implements Catalog {
47+
48+
private final String catalogName;
49+
private final String defaultDatabase;
50+
private final HbaseParameters hbaseParameters;
51+
52+
private HbaseClient hbaseClient;
53+
54+
public HbaseCatalog(
55+
String catalogName, String defaultDatabase, HbaseParameters hbaseParameters) {
56+
this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
57+
this.defaultDatabase = defaultDatabase;
58+
this.hbaseParameters = checkNotNull(hbaseParameters, "Hbase Config cannot be null");
59+
}
60+
61+
@Override
62+
public void open() throws CatalogException {
63+
try {
64+
hbaseClient = HbaseClient.createInstance(hbaseParameters);
65+
} catch (Exception e) {
66+
throw new CatalogException(String.format("Failed to open catalog %s", catalogName), e);
67+
}
68+
}
69+
70+
@Override
71+
public void close() throws CatalogException {
72+
hbaseClient.close();
73+
}
74+
75+
@Override
76+
public String name() {
77+
return catalogName;
78+
}
79+
80+
@Override
81+
public String getDefaultDatabase() throws CatalogException {
82+
return defaultDatabase;
83+
}
84+
85+
@Override
86+
public boolean databaseExists(String databaseName) throws CatalogException {
87+
return hbaseClient.databaseExists(databaseName);
88+
}
89+
90+
@Override
91+
public List<String> listDatabases() throws CatalogException {
92+
return hbaseClient.listDatabases();
93+
}
94+
95+
@Override
96+
public List<String> listTables(String databaseName)
97+
throws CatalogException, DatabaseNotExistException {
98+
if (!databaseExists(databaseName)) {
99+
throw new DatabaseNotExistException(catalogName, databaseName);
100+
}
101+
return hbaseClient.listTables(databaseName);
102+
}
103+
104+
@Override
105+
public boolean tableExists(TablePath tablePath) throws CatalogException {
106+
checkNotNull(tablePath);
107+
return hbaseClient.tableExists(tablePath.getTableName());
108+
}
109+
110+
@Override
111+
public CatalogTable getTable(TablePath tablePath)
112+
throws CatalogException, TableNotExistException {
113+
throw new UnsupportedOperationException("Not implement");
114+
}
115+
116+
@Override
117+
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
118+
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
119+
checkNotNull(tablePath, "tablePath cannot be null");
120+
hbaseClient.createTable(
121+
tablePath.getDatabaseName(),
122+
tablePath.getTableName(),
123+
hbaseParameters.getFamilyNames().values().stream()
124+
.filter(value -> !"all_columns".equals(value))
125+
.collect(Collectors.toList()),
126+
ignoreIfExists);
127+
}
128+
129+
@Override
130+
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
131+
throws TableNotExistException, CatalogException {
132+
checkNotNull(tablePath);
133+
if (!tableExists(tablePath) && !ignoreIfNotExists) {
134+
throw new TableNotExistException(catalogName, tablePath);
135+
}
136+
hbaseClient.dropTable(tablePath.getDatabaseName(), tablePath.getTableName());
137+
}
138+
139+
@Override
140+
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
141+
throws DatabaseAlreadyExistException, CatalogException {
142+
if (databaseExists(tablePath.getDatabaseName()) && !ignoreIfExists) {
143+
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
144+
}
145+
hbaseClient.createNamespace(tablePath.getDatabaseName());
146+
}
147+
148+
@Override
149+
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
150+
throws DatabaseNotExistException, CatalogException {
151+
if (!databaseExists(tablePath.getDatabaseName()) && !ignoreIfNotExists) {
152+
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
153+
}
154+
hbaseClient.deleteNamespace(tablePath.getDatabaseName());
155+
}
156+
157+
@Override
158+
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
159+
if (!tableExists(tablePath) && !ignoreIfNotExists) {
160+
throw new TableNotExistException(catalogName, tablePath);
161+
}
162+
hbaseClient.truncateTable(tablePath.getDatabaseName(), tablePath.getTableName());
163+
}
164+
165+
@Override
166+
public boolean isExistsData(TablePath tablePath) {
167+
return hbaseClient.isExistsData(tablePath.getDatabaseName(), tablePath.getTableName());
168+
}
169+
170+
private Map<String, String> buildTableOptions(TablePath tablePath) {
171+
Map<String, String> options = new HashMap<>();
172+
options.put("connector", "hbase");
173+
options.put("config", ConfigUtil.convertToJsonString(tablePath));
174+
return options;
175+
}
176+
177+
@Override
178+
public PreviewResult previewAction(
179+
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
180+
if (actionType == ActionType.CREATE_TABLE) {
181+
return new InfoPreviewResult("create index " + tablePath.getTableName());
182+
} else if (actionType == ActionType.DROP_TABLE) {
183+
return new InfoPreviewResult("delete index " + tablePath.getTableName());
184+
} else if (actionType == ActionType.TRUNCATE_TABLE) {
185+
return new InfoPreviewResult("delete and create index " + tablePath.getTableName());
186+
} else if (actionType == ActionType.CREATE_DATABASE) {
187+
return new InfoPreviewResult("create index " + tablePath.getTableName());
188+
} else if (actionType == ActionType.DROP_DATABASE) {
189+
return new InfoPreviewResult("delete index " + tablePath.getTableName());
190+
} else {
191+
throw new UnsupportedOperationException("Unsupported action type: " + actionType);
192+
}
193+
}
194+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.Catalog;
23+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
24+
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
26+
import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
27+
28+
import com.google.auto.service.AutoService;
29+
30+
@AutoService(Factory.class)
31+
public class HbaseCatalogFactory implements CatalogFactory {
32+
33+
@Override
34+
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
35+
// Create an instance of HbaseCatalog, passing in the catalog name, namespace, and Hbase
36+
// parameters
37+
HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(options);
38+
return new HbaseCatalog(catalogName, hbaseParameters.getNamespace(), hbaseParameters);
39+
}
40+
41+
@Override
42+
public String factoryIdentifier() {
43+
return HbaseIdentifier.IDENTIFIER_NAME;
44+
}
45+
46+
@Override
47+
public OptionRule optionRule() {
48+
return OptionRule.builder().build();
49+
}
50+
}

0 commit comments

Comments
 (0)