Skip to content
Permalink
Browse files

[SPARK-21402][SQL][BACKPORT-2.3] Fix java array of structs deserializ…

…ation

This PR is to backport #22708 to branch 2.3.

## What changes were proposed in this pull request?

MapObjects expression is used to map array elements to java beans. Struct type of elements is inferred from java bean structure and ends up with mixed up field order.
I used UnresolvedMapObjects instead of MapObjects, which allows to provide element type for MapObjects during analysis based on the resolved input data, not on the java bean.

## How was this patch tested?

Added a test case.
Built complete project on travis.

dongjoon-hyun cloud-fan

Closes #22767 from vofque/SPARK-21402-2.3.

Authored-by: Vladimir Kuriatkov <Vladimir_Kuriatkov@epam.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information...
Vladimir Kuriatkov authored and dongjoon-hyun committed Oct 18, 2018
1 parent 0726bc5 commit 61b301cc7bf3fce4c034be3171291d5212c386e1
@@ -271,10 +271,9 @@ object JavaTypeInference {

case c if listType.isAssignableFrom(typeToken) =>
val et = elementType(typeToken)
MapObjects(
UnresolvedMapObjects(
p => deserializerFor(et, Some(p)),
getPath,
inferDataType(et)._1,
customCollectionCls = Some(c))

case _ if mapType.isAssignableFrom(typeToken) =>
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test.org.apache.spark.sql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.test.TestSparkSession;

public class JavaBeanWithArraySuite {

private static final List<Record> RECORDS = new ArrayList<>();

static {
RECORDS.add(new Record(1, Arrays.asList(new Interval(111, 211), new Interval(121, 221))));
RECORDS.add(new Record(2, Arrays.asList(new Interval(112, 212), new Interval(122, 222))));
RECORDS.add(new Record(3, Arrays.asList(new Interval(113, 213), new Interval(123, 223))));
}

private TestSparkSession spark;

@Before
public void setUp() {
spark = new TestSparkSession();
}

@After
public void tearDown() {
spark.stop();
spark = null;
}

@Test
public void testBeanWithArrayFieldDeserialization() {

Encoder<Record> encoder = Encoders.bean(Record.class);

Dataset<Record> dataset = spark
.read()
.format("json")
.schema("id int, intervals array<struct<startTime: bigint, endTime: bigint>>")
.load("src/test/resources/test-data/with-array-fields.json")
.as(encoder);

List<Record> records = dataset.collectAsList();
Assert.assertEquals(records, RECORDS);
}

public static class Record {

private int id;
private List<Interval> intervals;

public Record() { }

Record(int id, List<Interval> intervals) {
this.id = id;
this.intervals = intervals;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public List<Interval> getIntervals() {
return intervals;
}

public void setIntervals(List<Interval> intervals) {
this.intervals = intervals;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Record)) return false;
Record other = (Record) obj;
return (other.id == this.id) && other.intervals.equals(this.intervals);
}

@Override
public String toString() {
return String.format("{ id: %d, intervals: %s }", id, intervals);
}
}

public static class Interval {

private long startTime;
private long endTime;

public Interval() { }

Interval(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
}

public long getStartTime() {
return startTime;
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}

public long getEndTime() {
return endTime;
}

public void setEndTime(long endTime) {
this.endTime = endTime;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Interval)) return false;
Interval other = (Interval) obj;
return (other.startTime == this.startTime) && (other.endTime == this.endTime);
}

@Override
public String toString() {
return String.format("[%d,%d]", startTime, endTime);
}
}
}
@@ -0,0 +1,3 @@
{ "id": 1, "intervals": [{ "startTime": 111, "endTime": 211 }, { "startTime": 121, "endTime": 221 }]}
{ "id": 2, "intervals": [{ "startTime": 112, "endTime": 212 }, { "startTime": 122, "endTime": 222 }]}
{ "id": 3, "intervals": [{ "startTime": 113, "endTime": 213 }, { "startTime": 123, "endTime": 223 }]}

0 comments on commit 61b301c

Please sign in to comment.
You can’t perform that action at this time.