Skip to content

Commit 49d397c

Browse files
authored
[Improve][hbase] The specified column is written to the specified column family (#5234)
1 parent 05717ef commit 49d397c

File tree

6 files changed

+133
-3
lines changed

6 files changed

+133
-3
lines changed

docs/en/connector-v2/sink/Hbase.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,20 @@ Hbase {
116116
all_columns = seatunnel
117117
}
118118
}
119+
```
120+
121+
## Writes To The Specified Column Family
119122

123+
```hocon
124+
Hbase {
125+
zookeeper_quorum = "hbase_e2e:2181"
126+
table = "assign_cf_table"
127+
rowkey_column = ["id"]
128+
family_name {
129+
c_double = "cf1"
130+
c_bigint = "cf2"
131+
}
132+
}
120133
```
121134

122135
## Changelog

docs/zh/connector-v2/sink/Hbase.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,20 @@ Hbase {
119119
120120
```
121121

122+
## 写入指定列族
123+
124+
```hocon
125+
Hbase {
126+
zookeeper_quorum = "hbase_e2e:2181"
127+
table = "assign_cf_table"
128+
rowkey_column = ["id"]
129+
family_name {
130+
c_double = "cf1"
131+
c_bigint = "cf2"
132+
}
133+
}
134+
```
135+
122136
## 更改日志
123137

124138
### 下一个版本

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8080
getPluginName(), PluginType.SINK, result.getMsg()));
8181
}
8282
this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
83+
if (hbaseParameters.getFamilyNames().size() == 0) {
84+
throw new HbaseConnectorException(
85+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
86+
"The corresponding field options should be configured and should not be empty Refer to the hbase sink document");
87+
}
8388
}
8489

8590
@Override

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.IOException;
4242
import java.nio.charset.Charset;
4343
import java.util.List;
44+
import java.util.Map;
4445
import java.util.stream.Collectors;
4546
import java.util.stream.IntStream;
4647

@@ -62,7 +63,7 @@ public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
6263

6364
private final int versionColumnIndex;
6465

65-
private String defaultFamilyName = "value";
66+
private String writeAllColumnFamily;
6667

6768
public HbaseSinkWriter(
6869
SeaTunnelRowType seaTunnelRowType,
@@ -76,7 +77,7 @@ public HbaseSinkWriter(
7677
this.versionColumnIndex = versionColumnIndex;
7778

7879
if (hbaseParameters.getFamilyNames().size() == 1) {
79-
defaultFamilyName = hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
80+
this.writeAllColumnFamily = hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
8081
}
8182

8283
// initialize hbase configuration
@@ -131,8 +132,14 @@ private Put convertRowToPut(SeaTunnelRow row) {
131132
.collect(Collectors.toList());
132133
for (Integer writeColumnIndex : writeColumnIndexes) {
133134
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
135+
// This is the family of columns that we define to be written through the.conf file
136+
Map<String, String> configurationFamilyNames = hbaseParameters.getFamilyNames();
134137
String familyName =
135-
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
138+
configurationFamilyNames.getOrDefault(fieldName, writeAllColumnFamily);
139+
if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
140+
&& !configurationFamilyNames.containsKey(fieldName)) {
141+
continue;
142+
}
136143
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
137144
if (bytes != null) {
138145
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(fieldName), bytes);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
public class HbaseIT extends TestSuiteBase implements TestResource {
5757

5858
private static final String TABLE_NAME = "seatunnel_test";
59+
private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
5960

6061
private static final String FAMILY_NAME = "info";
6162

@@ -64,6 +65,7 @@ public class HbaseIT extends TestSuiteBase implements TestResource {
6465
private Admin admin;
6566

6667
private TableName table;
68+
private TableName tableAssign;
6769

6870
private HbaseCluster hbaseCluster;
6971

@@ -75,7 +77,9 @@ public void startUp() throws Exception {
7577
// Create table for hbase sink test
7678
log.info("initial");
7779
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
80+
hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2"));
7881
table = TableName.valueOf(TABLE_NAME);
82+
tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
7983
}
8084

8185
@AfterAll
@@ -133,6 +137,46 @@ public void testHbaseSinkWithArray(TestContainer container)
133137
scanner.close();
134138
}
135139

140+
@TestTemplate
141+
public void testHbaseSinkAssignCfSink(TestContainer container)
142+
throws IOException, InterruptedException {
143+
deleteData(tableAssign);
144+
145+
Container.ExecResult sinkExecResult = container.executeJob("/fake-to-assign-cf-hbase.conf");
146+
Assertions.assertEquals(0, sinkExecResult.getExitCode());
147+
148+
Table hbaseTable = hbaseConnection.getTable(tableAssign);
149+
Scan scan = new Scan();
150+
ResultScanner scanner = hbaseTable.getScanner(scan);
151+
ArrayList<Result> results = new ArrayList<>();
152+
for (Result result : scanner) {
153+
results.add(result);
154+
}
155+
156+
Assertions.assertEquals(results.size(), 5);
157+
158+
if (scanner != null) {
159+
scanner.close();
160+
}
161+
int cf1Count = 0;
162+
int cf2Count = 0;
163+
164+
for (Result result : results) {
165+
for (Cell cell : result.listCells()) {
166+
String family = Bytes.toString(CellUtil.cloneFamily(cell));
167+
if ("cf1".equals(family)) {
168+
cf1Count++;
169+
}
170+
if ("cf2".equals(family)) {
171+
cf2Count++;
172+
}
173+
}
174+
}
175+
// check cf1 and cf2
176+
Assertions.assertEquals(cf1Count, 5);
177+
Assertions.assertEquals(cf2Count, 5);
178+
}
179+
136180
private void deleteData(TableName table) throws IOException {
137181
Table hbaseTable = hbaseConnection.getTable(table);
138182
Scan scan = new Scan();
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
19+
env {
20+
parallelism = 1
21+
job.mode = "BATCH"
22+
}
23+
24+
source {
25+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
26+
FakeSource {
27+
schema = {
28+
fields {
29+
id = int
30+
c_double = double
31+
c_bigint = bigint
32+
}
33+
}
34+
}
35+
}
36+
37+
sink {
38+
Hbase {
39+
zookeeper_quorum = "hbase_e2e:2181"
40+
table = "assign_cf_table"
41+
rowkey_column = ["id"]
42+
family_name {
43+
c_double = "cf1"
44+
c_bigint = "cf2"
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)