Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions be/src/exec/text_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
break;
}

case TYPE_DATEV2: {
vectorized::DateV2Value<vectorized::DateV2ValueType> ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
uint32_t int_val = ts_slot.to_date_int_val();
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)
->get_data()
.resize_fill(origin_size + rows, int_val);
break;
}
case TYPE_DATETIME: {
vectorized::VecDateTimeValue ts_slot;
if (!ts_slot.from_date_str(data, len)) {
Expand All @@ -155,7 +167,18 @@ bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
.resize_fill(origin_size + rows, *reinterpret_cast<int64_t*>(&ts_slot));
break;
}

case TYPE_DATETIMEV2: {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> ts_slot;
if (!ts_slot.from_date_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
uint64_t int_val = ts_slot.to_date_int_val();
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)
->get_data()
.resize_fill(origin_size + rows, int_val);
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value decimal_slot;
if (decimal_slot.parse_from_str(data, len)) {
Expand All @@ -167,7 +190,45 @@ bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
.resize_fill(origin_size + rows, decimal_slot.value());
break;
}

case TYPE_DECIMAL32: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
int32_t value = StringParser::string_to_decimal<int32_t>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
->get_data()
.resize_fill(origin_size + rows, value);
break;
}
case TYPE_DECIMAL64: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
int64_t value = StringParser::string_to_decimal<int64_t>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
->get_data()
.resize_fill(origin_size + rows, value);
break;
}
case TYPE_DECIMAL128I: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
vectorized::Int128 value = StringParser::string_to_decimal<vectorized::Int128>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)
->get_data()
.resize_fill(origin_size + rows, value);
break;
}
default:
DCHECK(false) << "bad slot type: " << slot_desc->type();
break;
Expand Down
37 changes: 36 additions & 1 deletion be/src/exec/text_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,42 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
decimal_slot.value());
break;
}

case TYPE_DECIMAL32: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
int32_t value = StringParser::string_to_decimal<int32_t>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
value);
break;
}
case TYPE_DECIMAL64: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
int64_t value = StringParser::string_to_decimal<int64_t>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
value);
break;
}
case TYPE_DECIMAL128I: {
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
vectorized::Int128 value = StringParser::string_to_decimal<vectorized::Int128>(
data, len, slot_desc->type().precision, slot_desc->type().scale, &result);
if (result != StringParser::PARSE_SUCCESS) {
parse_result = StringParser::PARSE_FAILURE;
break;
}
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
value);
break;
}
default:
DCHECK(false) << "bad slot type: " << slot_desc->type();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ TBLPROPERTIES (

msck repair table table_with_vertical_line;

CREATE external TABLE `table_with_pars`(
`id` int COMMENT 'id',
`data` string COMMENT 'data')
PARTITIONED BY (
`dt_par` date,
`time_par` timestamp,
`decimal_par1` decimal(8, 4),
`decimal_par2` decimal(18, 6),
`decimal_par3` decimal(38, 12))
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='|',
'serialization.format'='|')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'/user/doris/preinstalled_data/csv_partition_table/table_with_pars/';

set hive.msck.path.validation=ignore;
msck repair table table_with_pars;

CREATE TABLE `table_with_x01`(
`k1` string COMMENT 'k1',
`k2` string COMMENT 'k2',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1|1.11abc
2|1.11ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
3|1.12abc
4|1.12ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
5|1.21abc
6|1.21ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
7|1.22abc
8|1.22ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
9|2.11abc
10|2.11ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
11|2.12abc
12|2.12ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
13|2.21abc
14|2.21ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
15|2.22abc
16|2.22ABC
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
17|1.11cba
18|1.11CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
19|1.12cba
20|1.12CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
21|1.21cba
22|1.21CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
23|1.22cba
24|1.22CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
25|2.11cba
26|2.11CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
27|2.12cba
28|2.12CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
29|2.21cba
30|2.21CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
31|2.22cba
32|2.22CBA
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
33|1.11xyz
34|1.11XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
35|1.12xyz
36|1.12XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
37|1.21xyz
38|1.21XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
39|1.22xyz
40|1.22XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
41|2.11xyz
42|2.11XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
43|2.12xyz
44|2.12XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
45|2.21xyz
46|2.21XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
47|2.22xyz
48|2.22XYZ
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
49|1.11zxy
50|1.11ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
51|1.12zxy
52|1.12ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
53|1.21zxy
54|1.21ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
55|1.22zxy
56|1.22ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
57|2.11zxy
58|2.11ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
59|2.12zxy
60|2.12ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
61|2.21zxy
62|2.21ZXY
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
63|2.22zxy
64|2.22ZXY
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ public FileSystem getFileSystem(String remotePath) throws UserException {
hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
}
if (username == null) {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
} else {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf, username);
}
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public FileSystem getFileSystem(String remotePath) throws UserException {
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
S3Resource.getS3HadoopProperties(caseInsensitiveProperties).forEach(conf::set);
try {
dfsFileSystem = FileSystem.get(new URI(remotePath), conf);
dfsFileSystem = FileSystem.get(new org.apache.hadoop.fs.Path(remotePath).toUri(), conf);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import org.apache.logging.log4j.Logger;
import org.apache.parquet.Strings;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -177,6 +180,12 @@ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
long idx = 0;
for (String partitionName : partitionNames) {
try {
partitionName = URLDecoder.decode(partitionName, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// It should not be here
throw new RuntimeException(e);
}
long partitionId = idx++;
ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types);
idToPartitionItem.put(partitionId, listPartitionItem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -198,7 +196,7 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Obj
.foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>, Object[]>>>) t -> {
// write the data to dst file
Configuration conf = new Configuration(serializableHadoopConf.value());
FileSystem fs = FileSystem.get(URI.create(etlJobConfig.outputPath), conf);
FileSystem fs = FileSystem.get(new Path(etlJobConfig.outputPath).toUri(), conf);
String lastBucketKey = null;
ParquetWriter<InternalRow> parquetWriter = null;
TaskContext taskContext = TaskContext.get();
Expand Down Expand Up @@ -859,12 +857,11 @@ private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
List<String> filePaths,
EtlJobConfig.EtlFileGroup fileGroup,
StructType dstTableSchema)
throws SparkDppException, IOException, URISyntaxException {
throws SparkDppException, IOException {
Dataset<Row> fileGroupDataframe = null;
for (String filePath : filePaths) {
try {
URI uri = new URI(filePath);
FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value());
FileSystem fs = FileSystem.get(new Path(filePath).toUri(), serializableHadoopConf.value());
FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
if (fileStatuses == null) {
throw new SparkDppException("fs list status failed: " + filePath);
Expand Down Expand Up @@ -1130,8 +1127,7 @@ private void process() throws Exception {
private void writeDppResult(DppResult dppResult) throws Exception {
String outputPath = etlJobConfig.getOutputPath();
String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
URI uri = new URI(outputPath);
FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value());
FileSystem fs = FileSystem.get(new Path(outputPath).toUri(), serializableHadoopConf.value());
Path filePath = new Path(resultFilePath);
FSDataOutputStream outputStream = fs.create(filePath);
Gson gson = new Gson();
Expand Down
Loading