Skip to content

Commit

Permalink
[FLINK-13610][hive]Refactor HiveTableSource Test use sql query and re…
Browse files Browse the repository at this point in the history
…move HiveInputFormatTest
  • Loading branch information
zjuwangg committed Aug 7, 2019
1 parent 78d1e68 commit 5ca9937
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 209 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.planner.runtime.utils.TableUtil;
Expand All @@ -31,7 +29,6 @@
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -40,7 +37,9 @@
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import scala.collection.JavaConverters;

Expand Down Expand Up @@ -81,6 +80,7 @@ public void setupSourceDatabaseAndData() {

@Test
public void testReadNonPartitionedTable() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test";
hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
Expand All @@ -93,17 +93,41 @@ public void testReadNonPartitionedTable() throws Exception {
.commit();

TableEnvironment tEnv = HiveTestUtils.createTableEnv();
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();

Assert.assertEquals(4, rows.size());
Assert.assertEquals(1, rows.get(0).getField(0));
Assert.assertEquals(2, rows.get(1).getField(0));
Assert.assertEquals(3, rows.get(2).getField(0));
Assert.assertEquals(4, rows.get(3).getField(0));
Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
Assert.assertEquals("2,2,b,2000,2.22", rows.get(1).toString());
Assert.assertEquals("3,3,c,3000,3.33", rows.get(2).toString());
Assert.assertEquals("4,4,d,4000,4.44", rows.get(3).toString());
}

@Test
public void testReadComplexDataType() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "complex_test";
hiveShell.execute("create table source_db.complex_test(" +
"a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
Integer[] array = new Integer[]{1, 2, 3};
Map<Integer, String> map = new LinkedHashMap<>();
map.put(1, "a");
map.put(2, "b");
Object[] struct = new Object[]{3, 3L};
hiveShell.insertInto(dbName, tblName)
.withAllColumns()
.addRow(array, map, struct)
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
Assert.assertEquals(1, rows.size());
assertArrayEquals(array, (Integer[]) rows.get(0).getField(0));
assertEquals(map, rows.get(0).getField(1));
assertEquals(Row.of(struct[0], struct[1]), rows.get(0).getField(2));
}

/**
Expand All @@ -112,55 +136,56 @@ public void testReadNonPartitionedTable() throws Exception {
*/
@Test
public void testReadPartitionTable() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test_table_pt";
hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
"(year STRING, value INT) partitioned by (pt int);");
hiveShell.insertInto("source_db", "test_table_pt")
hiveShell.insertInto(dbName, tblName)
.withColumns("year", "value", "pt")
.addRow("2014", 3, 0)
.addRow("2014", 4, 0)
.addRow("2015", 2, 1)
.addRow("2015", 5, 1)
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();

assertEquals(4, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
}

@Test
public void testPartitionPrunning() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test_table_pt_1";
hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
"(year STRING, value INT) partitioned by (pt int);");
hiveShell.insertInto("source_db", "test_table_pt_1")
hiveShell.insertInto(dbName, tblName)
.withColumns("year", "value", "pt")
.addRow("2014", 3, 0)
.addRow("2014", 4, 0)
.addRow("2015", 2, 1)
.addRow("2015", 5, 1)
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
tEnv.registerTableSource("src", new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable));
Table table = tEnv.sqlQuery("select * from src where pt = 0");
String[] explain = tEnv.explain(table).split("==.*==\n");
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
// first check execution plan to ensure partition prunning works
String[] explain = tEnv.explain(src).split("==.*==\n");
assertEquals(4, explain.length);
String abstractSyntaxTree = explain[1];
String optimizedLogicalPlan = explain[2];
String physicalExecutionPlan = explain[3];
assertTrue(abstractSyntaxTree.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: false, PartitionNums: 2]"));
assertTrue(optimizedLogicalPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
assertTrue(physicalExecutionPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) table)).asJava();
// second check execute results
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
assertEquals(2, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 5ca9937

Please sign in to comment.