Skip to content
Permalink
Browse files
[fix](httpv2) make http v2 and v1 interface compatible (#7848)
http v2 TableSchemaAction adds the return value of aggregation_type,
and modifies the corresponding code of Flink/Spark Connector
  • Loading branch information
hf200012 committed Jan 31, 2022
1 parent b8ccc3c commit 8558180b347702d54c946e06b2a7522aba2bc058
Showing 6 changed files with 24 additions and 13 deletions.
@@ -26,14 +26,25 @@ public class Field {
private int precision;
private int scale;

private String aggregation_type;

public Field() { }

public Field(String name, String type, String comment, int precision, int scale) {
public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) {
this.name = name;
this.type = type;
this.comment = comment;
this.precision = precision;
this.scale = scale;
this.aggregation_type = aggregation_type;
}

public String getAggregation_type() {
return aggregation_type;
}

public void setAggregation_type(String aggregation_type) {
this.aggregation_type = aggregation_type;
}

public String getName() {
@@ -49,8 +49,8 @@ public void setProperties(List<Field> properties) {
this.properties = properties;
}

public void put(String name, String type, String comment, int scale, int precision) {
properties.add(new Field(name, type, comment, scale, precision));
public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) {
properties.add(new Field(name, type, comment, scale, precision, aggregation_type));
}

public void put(Field f) {
@@ -103,7 +103,7 @@ private[spark] object SchemaUtils {
*/
def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
val schema = new Schema(tscanColumnDescs.length)
tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0)))
tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, "")))
schema
}
}
@@ -120,12 +120,12 @@ public void testGetUriStr() throws Exception {

@Test
public void testFeResponseToSchema() throws Exception {
String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"name\":\"k5\","
+ "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\"}],\"status\":200}";
String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\","
+ "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}";
Schema expected = new Schema();
expected.setStatus(200);
Field k1 = new Field("k1", "TINYINT", "", 0, 0);
Field k5 = new Field("k5", "DECIMALV2", "", 9, 0);
Field k1 = new Field("k1", "TINYINT", "", 0, 0, "");
Field k5 = new Field("k5", "DECIMALV2", "", 9, 0, "");
expected.put(k1);
expected.put(k5);
Assert.assertEquals(expected, RestService.parseSchema(res, logger));
@@ -235,7 +235,7 @@ public void testRowBatch() throws Exception {
+ "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
+ "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
+ "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
+ "\"type\":\"DECIMAL\",\"precision\":\"9\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\"}],"
+ "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
+ "\"status\":200}";

Schema schema = RestService.parseSchema(schemaStr, logger);
@@ -31,8 +31,8 @@ class TestSchemaUtils extends ExpectedExceptionTest {
def testConvertToStruct(): Unit = {
val schema = new Schema
schema.setStatus(200)
val k1 = new Field("k1", "TINYINT", "", 0, 0)
val k5 = new Field("k5", "BIGINT", "", 0, 0)
val k1 = new Field("k1", "TINYINT", "", 0, 0, "")
val k5 = new Field("k5", "BIGINT", "", 0, 0, "")
schema.put(k1)
schema.put(k5)

@@ -84,8 +84,8 @@ class TestSchemaUtils extends ExpectedExceptionTest {

val expected = new Schema
expected.setStatus(0)
val ek1 = new Field("k1", "BOOLEAN", "", 0, 0)
val ek2 = new Field("k2", "DOUBLE", "", 0, 0)
val ek1 = new Field("k1", "BOOLEAN", "", 0, 0, "")
val ek2 = new Field("k2", "DOUBLE", "", 0, 0, "")
expected.put(ek1)
expected.put(ek2)

0 comments on commit 8558180

Please sign in to comment.