Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/dev/types_serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ Internally, Flink makes the following distinctions between types:

* Composite types

* Flink Java Tuples (part of the Flink Java API)
* Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported

* Scala *case classes* (including Scala tuples)
* Scala *case classes* (including Scala tuples): max 22 fields, null fields not supported

* Row: tuples with arbitrary number of fields and support for null fields

* POJOs: classes that follow a certain bean-like pattern

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sinks.StreamTableSink;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;


Expand Down Expand Up @@ -49,15 +49,15 @@ public JsonRowSerializationSchema(String[] fieldNames) {

@Override
public byte[] serialize(Row row) {
if (row.productArity() != fieldNames.length) {
if (row.getArity() != fieldNames.length) {
throw new IllegalStateException(String.format(
"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
}

ObjectNode objectNode = mapper.createObjectNode();

for (int i = 0; i < row.productArity(); i++) {
JsonNode node = mapper.valueToTree(row.productElement(i));
for (int i = 0; i < row.getArity(); i++) {
JsonNode node = mapper.valueToTree(row.getField(i));
objectNode.set(fieldNames[i], node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.junit.Test;

Expand Down Expand Up @@ -61,10 +61,10 @@ public void testDeserialization() throws Exception {

Row deserialized = deserializationSchema.deserialize(serializedJson);

assertEquals(3, deserialized.productArity());
assertEquals(id, deserialized.productElement(0));
assertEquals(name, deserialized.productElement(1));
assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
assertEquals(3, deserialized.getArity());
assertEquals(id, deserialized.getField(0));
assertEquals(name, deserialized.getField(1));
assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
}

/**
Expand All @@ -85,8 +85,8 @@ public void testMissingNode() throws Exception {

Row row = deserializationSchema.deserialize(serializedJson);

assertEquals(1, row.productArity());
assertNull("Missing field not null", row.productElement(0));
assertEquals(1, row.getArity());
assertNull("Missing field not null", row.getField(0));

deserializationSchema.setFailOnMissingField(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.junit.Test;
Expand Down Expand Up @@ -88,10 +88,10 @@ private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row

private void assertEqualRows(Row expectedRow, Row resultRow) {
assertEquals("Deserialized row should have expected number of fields",
expectedRow.productArity(), resultRow.productArity());
for (int i = 0; i < expectedRow.productArity(); i++) {
expectedRow.getArity(), resultRow.getArity());
for (int i = 0; i < expectedRow.getArity(); i++) {
assertEquals(String.format("Field number %d should be as in the original row", i),
expectedRow.productElement(i), resultRow.productElement(i));
expectedRow.getField(i), resultRow.getField(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
Expand Down Expand Up @@ -276,7 +276,7 @@ public Row nextRecord(Row row) throws IOException {
if (!hasNext) {
return null;
}
for (int pos = 0; pos < row.productArity(); pos++) {
for (int pos = 0; pos < row.getArity(); pos++) {
row.setField(pos, resultSet.getObject(pos + 1));
}
//update hasNext after we've read the record
Expand Down
Loading