Skip to content

Commit 4d048cc

Browse files
[Improve][Connector-V2] Improve orc write strategy to support all data types (#2860)
* [Improve][Connector-V2] Improve orc write strategy to support all data types Co-authored-by: tyrantlucifer <tyrantlucifer@gmail.com>
1 parent 3aee11f commit 4d048cc

File tree

4 files changed

+318
-33
lines changed

4 files changed

+318
-33
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java

Lines changed: 160 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
1919

20+
import org.apache.seatunnel.api.table.type.ArrayType;
2021
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.DecimalType;
23+
import org.apache.seatunnel.api.table.type.MapType;
2124
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2225
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2327
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
2428

2529
import lombok.NonNull;
@@ -28,16 +32,29 @@
2832
import org.apache.orc.OrcFile;
2933
import org.apache.orc.TypeDescription;
3034
import org.apache.orc.Writer;
35+
import org.apache.orc.storage.common.type.HiveDecimal;
3136
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
3237
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
38+
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
3339
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
40+
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
3441
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
42+
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
43+
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
44+
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
3545
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
3646

3747
import java.io.IOException;
48+
import java.math.BigDecimal;
3849
import java.math.BigInteger;
3950
import java.nio.charset.StandardCharsets;
51+
import java.sql.Timestamp;
52+
import java.time.LocalDate;
53+
import java.time.LocalDateTime;
54+
import java.time.LocalTime;
55+
import java.time.temporal.ChronoField;
4056
import java.util.HashMap;
57+
import java.util.List;
4158
import java.util.Map;
4259

4360
public class OrcWriteStrategy extends AbstractWriteStrategy {
@@ -109,37 +126,53 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
109126
}
110127

111128
private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
112-
if (BasicType.BOOLEAN_TYPE.equals(type)) {
113-
return TypeDescription.createBoolean();
129+
switch (type.getSqlType()) {
130+
case ARRAY:
131+
BasicType<?> elementType = ((ArrayType<?, ?>) type).getElementType();
132+
return TypeDescription.createList(buildFieldWithRowType(elementType));
133+
case MAP:
134+
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) type).getKeyType();
135+
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) type).getValueType();
136+
return TypeDescription.createMap(buildFieldWithRowType(keyType), buildFieldWithRowType(valueType));
137+
case STRING:
138+
return TypeDescription.createString();
139+
case BOOLEAN:
140+
return TypeDescription.createBoolean();
141+
case TINYINT:
142+
return TypeDescription.createByte();
143+
case SMALLINT:
144+
return TypeDescription.createShort();
145+
case INT:
146+
return TypeDescription.createInt();
147+
case BIGINT:
148+
return TypeDescription.createLong();
149+
case FLOAT:
150+
return TypeDescription.createFloat();
151+
case DOUBLE:
152+
return TypeDescription.createDouble();
153+
case DECIMAL:
154+
int precision = ((DecimalType) type).getPrecision();
155+
int scale = ((DecimalType) type).getScale();
156+
return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
157+
case BYTES:
158+
return TypeDescription.createBinary();
159+
case DATE:
160+
return TypeDescription.createDate();
161+
case TIME:
162+
case TIMESTAMP:
163+
return TypeDescription.createTimestamp();
164+
case ROW:
165+
TypeDescription struct = TypeDescription.createStruct();
166+
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes();
167+
for (int i = 0; i < fieldTypes.length; i++) {
168+
struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i]));
169+
}
170+
return struct;
171+
case NULL:
172+
default:
173+
String errorMsg = String.format("Orc file not support this type [%s]", type.getSqlType());
174+
throw new UnsupportedOperationException(errorMsg);
114175
}
115-
if (BasicType.SHORT_TYPE.equals(type)) {
116-
return TypeDescription.createShort();
117-
}
118-
if (BasicType.INT_TYPE.equals(type)) {
119-
return TypeDescription.createInt();
120-
}
121-
if (BasicType.LONG_TYPE.equals(type)) {
122-
return TypeDescription.createLong();
123-
}
124-
if (BasicType.FLOAT_TYPE.equals(type)) {
125-
return TypeDescription.createFloat();
126-
}
127-
if (BasicType.DOUBLE_TYPE.equals(type)) {
128-
return TypeDescription.createDouble();
129-
}
130-
if (BasicType.BYTE_TYPE.equals(type)) {
131-
return TypeDescription.createByte();
132-
}
133-
if (BasicType.STRING_TYPE.equals(type)) {
134-
return TypeDescription.createString();
135-
}
136-
if (BasicType.VOID_TYPE.equals(type)) {
137-
return TypeDescription.createString();
138-
}
139-
140-
// TODO map struct array
141-
142-
return TypeDescription.createString();
143176
}
144177

145178
private TypeDescription buildSchemaWithRowType() {
@@ -169,9 +202,101 @@ private void setColumn(Object value, ColumnVector vector, int row) {
169202
BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
170203
setByteColumnVector(value, bytesColumnVector, row);
171204
break;
205+
case DECIMAL:
206+
DecimalColumnVector decimalColumnVector = (DecimalColumnVector) vector;
207+
setDecimalColumnVector(value, decimalColumnVector, row);
208+
break;
209+
case TIMESTAMP:
210+
TimestampColumnVector timestampColumnVector = (TimestampColumnVector) vector;
211+
setTimestampColumnVector(value, timestampColumnVector, row);
212+
break;
213+
case LIST:
214+
ListColumnVector listColumnVector = (ListColumnVector) vector;
215+
setListColumnVector(value, listColumnVector, row);
216+
break;
217+
case MAP:
218+
MapColumnVector mapColumnVector = (MapColumnVector) vector;
219+
setMapColumnVector(value, mapColumnVector, row);
220+
break;
221+
case STRUCT:
222+
StructColumnVector structColumnVector = (StructColumnVector) vector;
223+
setStructColumnVector(value, structColumnVector, row);
224+
break;
172225
default:
173-
throw new RuntimeException("Unexpected ColumnVector subtype");
226+
throw new RuntimeException("Unexpected ColumnVector subtype " + vector.type);
227+
}
228+
}
229+
}
230+
231+
private void setStructColumnVector(Object value, StructColumnVector structColumnVector, int row) {
232+
if (value instanceof SeaTunnelRow) {
233+
SeaTunnelRow seaTunnelRow = (SeaTunnelRow) value;
234+
Object[] fields = seaTunnelRow.getFields();
235+
for (int i = 0; i < fields.length; i++) {
236+
setColumn(fields[i], structColumnVector.fields[i], row);
237+
}
238+
} else {
239+
throw new RuntimeException("SeaTunnelRow type expected for field");
240+
}
241+
242+
}
243+
244+
private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row) {
245+
if (value instanceof Map) {
246+
Map<?, ?> map = (Map<?, ?>) value;
247+
248+
mapColumnVector.offsets[row] = mapColumnVector.childCount;
249+
mapColumnVector.lengths[row] = map.size();
250+
mapColumnVector.childCount += map.size();
251+
252+
int i = 0;
253+
for (Map.Entry<?, ?> entry : map.entrySet()) {
254+
int mapElem = (int) mapColumnVector.offsets[row] + i;
255+
setColumn(entry.getKey(), mapColumnVector.keys, mapElem);
256+
setColumn(entry.getValue(), mapColumnVector.values, mapElem);
257+
++i;
174258
}
259+
} else {
260+
throw new RuntimeException("Map type expected for field");
261+
}
262+
}
263+
264+
private void setListColumnVector(Object value, ListColumnVector listColumnVector, int row) {
265+
Object[] valueArray;
266+
if (value instanceof Object[]) {
267+
valueArray = (Object[]) value;
268+
} else if (value instanceof List) {
269+
valueArray = ((List<?>) value).toArray();
270+
} else {
271+
throw new RuntimeException("List and Array type expected for field");
272+
}
273+
listColumnVector.offsets[row] = listColumnVector.childCount;
274+
listColumnVector.lengths[row] = valueArray.length;
275+
listColumnVector.childCount += valueArray.length;
276+
277+
for (int i = 0; i < valueArray.length; i++) {
278+
int listElem = (int) listColumnVector.offsets[row] + i;
279+
setColumn(valueArray[i], listColumnVector.child, listElem);
280+
}
281+
}
282+
283+
private void setDecimalColumnVector(Object value, DecimalColumnVector decimalColumnVector, int row) {
284+
if (value instanceof BigDecimal) {
285+
decimalColumnVector.set(row, HiveDecimal.create((BigDecimal) value));
286+
} else {
287+
throw new RuntimeException("BigDecimal type expected for field");
288+
}
289+
}
290+
291+
private void setTimestampColumnVector(Object value, TimestampColumnVector timestampColumnVector, int row) {
292+
if (value instanceof Timestamp) {
293+
timestampColumnVector.set(row, (Timestamp) value);
294+
} else if (value instanceof LocalDateTime) {
295+
timestampColumnVector.set(row, Timestamp.valueOf((LocalDateTime) value));
296+
} else if (value instanceof LocalTime) {
297+
timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime) value).atDate(LocalDate.ofEpochDay(0))));
298+
} else {
299+
throw new RuntimeException("Time series type expected for field");
175300
}
176301
}
177302

@@ -186,10 +311,12 @@ private void setLongColumnVector(Object value, LongColumnVector longVector, int
186311
} else if (value instanceof BigInteger) {
187312
BigInteger bigInt = (BigInteger) value;
188313
longVector.vector[row] = bigInt.longValue();
189-
} else if (value instanceof Short) {
190-
longVector.vector[row] = (Short) value;
191314
} else if (value instanceof Byte) {
192315
longVector.vector[row] = (Byte) value;
316+
} else if (value instanceof Short) {
317+
longVector.vector[row] = (Short) value;
318+
} else if (value instanceof LocalDate) {
319+
longVector.vector[row] = ((LocalDate) value).getLong(ChronoField.EPOCH_DAY);
193320
} else {
194321
throw new RuntimeException("Long or Integer type expected for field");
195322
}

seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,12 @@ public void testFakeSourceToLocalFileJson() throws IOException, InterruptedExcep
5757
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
5858
Assertions.assertEquals(0, execResult.getExitCode());
5959
}
60+
61+
@Test
62+
public void testFakeSourceToLocalFileORCAndReadToConsole() throws IOException, InterruptedException {
63+
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_orc.conf");
64+
Assertions.assertEquals(0, execResult.getExitCode());
65+
Container.ExecResult execResult2 = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf");
66+
Assertions.assertEquals(0, execResult2.getExitCode());
67+
}
6068
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
# You can set spark configuration here
20+
spark.app.name = "SeaTunnel"
21+
spark.executor.instances = 2
22+
spark.executor.cores = 1
23+
spark.executor.memory = "1g"
24+
spark.master = local
25+
job.mode = "BATCH"
26+
}
27+
28+
source {
29+
FakeSource {
30+
result_table_name = "fake"
31+
schema = {
32+
fields {
33+
c_map = "map<string, string>"
34+
c_array = "array<tinyint>"
35+
c_string = string
36+
c_boolean = boolean
37+
c_tinyint = tinyint
38+
c_smallint = smallint
39+
c_int = int
40+
c_bigint = bigint
41+
c_float = float
42+
c_double = double
43+
c_decimal = "decimal(30, 8)"
44+
c_bytes = bytes
45+
c_date = date
46+
c_timestamp = timestamp
47+
}
48+
}
49+
}
50+
51+
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
52+
# please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
53+
}
54+
55+
transform {
56+
sql {
57+
sql = "select * from fake"
58+
}
59+
60+
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
61+
# please go to https://seatunnel.apache.org/docs/category/transform
62+
}
63+
64+
sink {
65+
LocalFile {
66+
path="/tmp/test/orc/"
67+
partition_by=["c_boolean"]
68+
partition_dir_expression="${k0}=${v0}"
69+
is_partition_field_write_in_file=true
70+
file_name_expression="${transactionId}_${now}"
71+
file_format="orc"
72+
filename_time_format="yyyy.MM.dd"
73+
is_enable_transaction=true
74+
}
75+
76+
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
77+
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
78+
}

0 commit comments

Comments
 (0)