Skip to content

Commit b592014

Browse files
authored
[Feature][HbaseSink]support array data. (#6100)
1 parent dd64ed5 commit b592014

File tree

3 files changed

+92
-0
lines changed
  • seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test

3 files changed

+92
-0
lines changed

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
179179
return Bytes.toBytes((Double) field);
180180
case BOOLEAN:
181181
return Bytes.toBytes((Boolean) field);
182+
case ARRAY:
183+
String arrayAsString = field.toString().replaceAll("\\[|\\]|\\s", "");
184+
return arrayAsString.getBytes(
185+
Charset.forName(hbaseParameters.getEnCoding().toString()));
182186
case STRING:
183187
return field.toString()
184188
.getBytes(Charset.forName(hbaseParameters.getEnCoding().toString()));

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.seatunnel.e2e.common.container.TestContainer;
2323

2424
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.Cell;
26+
import org.apache.hadoop.hbase.CellUtil;
2527
import org.apache.hadoop.hbase.HBaseConfiguration;
2628
import org.apache.hadoop.hbase.TableName;
2729
import org.apache.hadoop.hbase.client.Admin;
@@ -36,6 +38,7 @@
3638
import org.apache.hadoop.hbase.client.TableDescriptor;
3739
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
3840
import org.apache.hadoop.hbase.io.compress.Compression;
41+
import org.apache.hadoop.hbase.util.Bytes;
3942

4043
import org.junit.jupiter.api.AfterAll;
4144
import org.junit.jupiter.api.Assertions;
@@ -143,4 +146,30 @@ public void testHbaseSink(TestContainer container) throws IOException, Interrupt
143146
}
144147
Assertions.assertEquals(results.size(), 5);
145148
}
149+
150+
@TestTemplate
151+
public void testHbaseSinkWithArray(TestContainer container)
152+
throws IOException, InterruptedException {
153+
Container.ExecResult execResult = container.executeJob("/fake-to-hbase-array.conf");
154+
Assertions.assertEquals(0, execResult.getExitCode());
155+
Table hbaseTable = hbaseConnection.getTable(table);
156+
Scan scan = new Scan();
157+
ArrayList<Result> results = new ArrayList<>();
158+
ResultScanner scanner = hbaseTable.getScanner(scan);
159+
for (Result result : scanner) {
160+
String rowKey = Bytes.toString(result.getRow());
161+
for (Cell cell : result.listCells()) {
162+
String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
163+
String value = Bytes.toString(CellUtil.cloneValue(cell));
164+
if ("A".equals(rowKey) && "info:c_array_string".equals(columnName)) {
165+
Assertions.assertEquals(value, "\"a\",\"b\",\"c\"");
166+
}
167+
if ("B".equals(rowKey) && "info:c_array_int".equals(columnName)) {
168+
Assertions.assertEquals(value, "4,5,6");
169+
}
170+
}
171+
results.add(result);
172+
}
173+
Assertions.assertEquals(results.size(), 3);
174+
}
146175
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
execution.parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
FakeSource {
25+
schema = {
26+
fields {
27+
name = string
28+
score = int
29+
c_array_string = "array<string>"
30+
c_array_int = "array<int>"
31+
}
32+
}
33+
rows = [
34+
{
35+
kind = INSERT
36+
fields = ["A", 100,["a","b","c"],[1,2,3]]
37+
},
38+
{
39+
kind = INSERT
40+
fields = ["B", 200,["d","e","f"],[4,5,6]]
41+
},
42+
{
43+
kind = INSERT
44+
fields = ["C", 300,["g","h","k"],[7,8,9]]
45+
}
46+
]
47+
}
48+
}
49+
50+
sink {
51+
Hbase {
52+
zookeeper_quorum = "hbase-e2e:2181"
53+
table = "seatunnel_test"
54+
rowkey_column = ["name"]
55+
family_name {
56+
all_columns = info
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)