New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5188] Create analog of RowCsvInputFormat in java and adjust all the imports of Row and RowTypeInfo #3003
Conversation
3317c2b
to
0edfd48
Compare
…w and RowTypeInfo imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doing this refactoring @tonycox!
I had a few comments but can fix them myself before merging.
Thanks, Fabian
import org.apache.flink.types.Row; | ||
import org.apache.flink.types.parser.FieldParser; | ||
|
||
@Internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I we move it here, we can make this@PublicEvolving
.
@@ -168,8 +168,10 @@ abstract class BatchTableEnvironment( | |||
private[flink] def explain(table: Table, extended: Boolean): String = { | |||
val ast = table.getRelNode | |||
val optimizedPlan = optimize(ast) | |||
val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) | |||
dataSet.output(new DiscardingOutputFormat[Row]) | |||
val dataSet = translate[org.apache.flink.types.Row](optimizedPlan) ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an import for Row
instead of using the full class name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see why you did it this way. I'll move this PR behind #3004.
@@ -327,7 +327,9 @@ abstract class StreamTableEnvironment( | |||
def explain(table: Table): String = { | |||
val ast = table.getRelNode | |||
val optimizedPlan = optimize(ast) | |||
val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) | |||
val dataStream = translate[org.apache.flink.types.Row](optimizedPlan)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add import for Row
@@ -28,6 +28,7 @@ import org.apache.flink.api.table.typeutils.RowComparator.{createAuxiliaryFields | |||
import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment} | |||
import org.apache.flink.types.KeyFieldOutOfBoundsException | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change
|
||
import scala.collection.JavaConversions._ | ||
|
||
object TypeConverter { | ||
|
||
val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] | ||
val DEFAULT_ROW_TYPE = new org.apache.flink.api.java.typeutils.RowTypeInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import RowTypeInfo
@@ -166,5 +166,5 @@ class SqlExpressionTest extends ExpressionTestBase { | |||
override def testData: Any = new Row(0) | |||
|
|||
override def typeInfo: TypeInformation[Any] = | |||
new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] | |||
new RowTypeInfo(Seq.empty[TypeInformation[Any]]: _*).asInstanceOf[TypeInformation[Any]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be changed to new RowTypeInfo()
?
@@ -117,14 +117,14 @@ abstract class ExpressionTestBase { | |||
// compile and evaluate | |||
val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) | |||
val mapper = clazz.newInstance() | |||
val result = mapper.map(testData).asInstanceOf[Row] | |||
val result = mapper.map(testData).asInstanceOf[org.apache.flink.types.Row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import Row
import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo | ||
import org.apache.flink.api.java.typeutils.{RowTypeInfo => RowTypeInfoNew} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to rename here?
@@ -21,7 +21,8 @@ package org.apache.flink.api.table.typeutils | |||
import org.apache.flink.api.common.ExecutionConfig | |||
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} | |||
import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} | |||
import org.apache.flink.api.table.Row | |||
import org.apache.flink.types.Row | |||
import org.apache.flink.api.java.typeutils.{RowTypeInfo => RowTypeInfoNew} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why renaming here?
@@ -50,8 +50,8 @@ class RowSerializerTest { | |||
|
|||
@Test | |||
def testRowSerializer(): Unit = { | |||
val rowInfo: TypeInformation[Row] = new RowTypeInfo( | |||
Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) | |||
val rowInfo = new org.apache.flink.api.java.typeutils.RowTypeInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import RowTypeInfo
Hi @fhueske okay, as you wish |
…lls for new Row type. - Port RowCsvInputFormat to Java and move it to flink-core. This closes apache#3003.
Merging |
…lls to new Row type. - Port RowCsvInputFormat to Java and move it to flink-core. This closes apache#3003.
…lls to new Row type. - Port RowCsvInputFormat to Java and move it to flink-core. This closes apache#3003.
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
General
Documentation
Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed