diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index 1332f4f07..d89ea2012 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -70,6 +70,13 @@ provided + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + org.apache.flink flink-connector-kafka diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiHiveJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiHiveJob.java index 44807206c..e02251f04 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiHiveJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiHiveJob.java @@ -6,7 +6,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.FormatDescriptor; -import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; @@ -26,11 +26,6 @@ protected StreamExecutionEnvironment jobReturnValue() { return null; } - @Override - protected void setConnectorDialect(StreamTableEnvironment tableEnv) { - tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); - } - @Override protected String getTableConnector() { return "hive"; @@ -38,7 +33,7 @@ protected String getTableConnector() { @Override protected FormatDescriptor getFormatDescriptor() { - return FormatDescriptor.forFormat("parquet").build(); + return FormatDescriptor.forFormat("orc").build(); } @Override @@ -52,5 +47,10 @@ protected void registerCatalog(StreamTableEnvironment tableEnv) { tableEnv.useCatalog(name); } + @Override + protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder builder) { + return super.fillTableOptions(builder) + .option("hive.storage.file-format", "orc"); + } }