Skip to content

Commit

Permalink
[BugFix][Spark-translation] map type cast error (#6552)
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN committed Mar 21, 2024
1 parent 216efb7 commit b1dcd4a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@ source {
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "tmp1"
query = """select * from fake"""
}
}

sink {
Assert {
source_table_name = "tmp1"
rules {
row_rules = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.spark.unsafe.types.UTF8String;

import scala.Tuple2;
import scala.collection.immutable.HashMap.HashTrieMap;
import scala.collection.immutable.AbstractMap;
import scala.collection.mutable.WrappedArray;

import java.io.IOException;
Expand Down Expand Up @@ -179,7 +179,7 @@ private Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
case STRING:
return field.toString();
case MAP:
return reconvertMap((HashTrieMap<?, ?>) field, (MapType<?, ?>) dataType);
return reconvertMap((AbstractMap<?, ?>) field, (MapType<?, ?>) dataType);
case ARRAY:
return reconvertArray((WrappedArray.ofRef<?>) field, (ArrayType<?, ?>) dataType);
default:
Expand All @@ -206,23 +206,23 @@ private SeaTunnelRow reconvert(SeaTunnelRow engineRow, SeaTunnelRowType rowType)
}

/**
* Convert HashTrieMap to LinkedHashMap
* Convert AbstractMap to LinkedHashMap
*
* @param hashTrieMap HashTrieMap data
* @param abstractMap AbstractMap data
* @param mapType fields type map
* @return java.util.LinkedHashMap
* @see HashTrieMap
* @see AbstractMap
*/
private Map<Object, Object> reconvertMap(HashTrieMap<?, ?> hashTrieMap, MapType<?, ?> mapType) {
if (hashTrieMap == null || hashTrieMap.size() == 0) {
private Map<Object, Object> reconvertMap(AbstractMap<?, ?> abstractMap, MapType<?, ?> mapType) {
if (abstractMap == null || abstractMap.size() == 0) {
return Collections.emptyMap();
}
int num = hashTrieMap.size();
int num = abstractMap.size();
Map<Object, Object> newMap = new LinkedHashMap<>(num);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
scala.collection.immutable.List<?> keyList = hashTrieMap.keySet().toList();
scala.collection.immutable.List<?> valueList = hashTrieMap.values().toList();
scala.collection.immutable.List<?> keyList = abstractMap.keySet().toList();
scala.collection.immutable.List<?> valueList = abstractMap.values().toList();
for (int i = 0; i < num; i++) {
Object key = keyList.apply(i);
Object value = valueList.apply(i);
Expand Down

0 comments on commit b1dcd4a

Please sign in to comment.