Skip to content

Commit

Permalink
[FLINK-30093] Fix outerProtoPrefix to be generated from descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
laughingman7743 committed Jan 7, 2023
1 parent 8fa7fa5 commit f06c17e
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 34 deletions.
Expand Up @@ -26,4 +26,5 @@ public class PbConstant {
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
}
Expand Up @@ -54,8 +54,7 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
String flinkRowDataVar = "rowData" + uid;

int fieldSize = rowType.getFieldNames().size();
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode);
appender.appendLine(
"GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")");
Expand Down
Expand Up @@ -58,16 +58,15 @@ public class ProtoToRowConverter {
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
String outerPrefix = PbFormatUtils.getOuterProtoPrefix(descriptor);
Class<?> messageClass =
Class.forName(
formatConfig.getMessageClassName(),
true,
Thread.currentThread().getContextClassLoader());
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
// pb3 always read default values
formatConfig =
Expand Down
Expand Up @@ -52,8 +52,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
String flinkRowDataVar = "rowData" + uid;
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
String messageBuilderVar = "messageBuilder" + uid;
appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode);
appender.appendLine(
Expand Down
Expand Up @@ -60,9 +60,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
case SMALLINT:
case TINYINT:
if (fd.getJavaType() == JavaType.ENUM) {
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
resultVar
+ " = "
Expand All @@ -86,9 +84,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()");
if (fd.getJavaType() == JavaType.ENUM) {
String enumValueDescVar = "enumValueDesc" + uid;
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
"Descriptors.EnumValueDescriptor "
+ enumValueDescVar
Expand Down
Expand Up @@ -55,11 +55,10 @@ public class RowToProtoConverter {
public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
String outerPrefix = PbFormatUtils.getOuterProtoPrefix(descriptor);
PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);

PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
Expand Down
Expand Up @@ -95,7 +95,7 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
} else {
// simple message
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType());
}
break;
case INT:
Expand All @@ -108,7 +108,7 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
typeStr = "String";
break;
case ENUM:
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
break;
case FLOAT:
typeStr = "Float";
Expand Down Expand Up @@ -178,7 +178,7 @@ public static String pbDefaultValueCode(
String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
switch (fieldDescriptor.getJavaType()) {
case MESSAGE:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType())
+ ".getDefaultInstance()";
case INT:
return "0";
Expand All @@ -187,7 +187,7 @@ public static String pbDefaultValueCode(
case STRING:
return "\"" + nullLiteral + "\"";
case ENUM:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType())
+ ".values()[0]";
case FLOAT:
return "0.0f";
Expand Down
Expand Up @@ -28,26 +28,25 @@

/** Protobuf function util. */
public class PbFormatUtils {
public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.Descriptor descriptor) {
if (null != descriptor.getContainingType()) {
// nested type
String parentJavaFullName =
getFullJavaName(descriptor.getContainingType(), outerProtoName);
String parentJavaFullName = getFullJavaName(descriptor.getContainingType());
return parentJavaFullName + "." + descriptor.getName();
} else {
// top level message
String outerProtoName = getOuterProtoPrefix(descriptor);
return outerProtoName + descriptor.getName();
}
}

public static String getFullJavaName(
Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
return outerProtoName + enumDescriptor.getName();
return enumDescriptor.getFullName();
}
}

Expand All @@ -72,14 +71,33 @@ public static String getStrongCamelCaseJsonName(String name) {
return ProtobufInternalUtils.underScoreToCamelCase(name, true);
}

public static String getOuterProtoPrefix(String name) {
name = name.replace('$', '.');
int index = name.lastIndexOf('.');
if (index != -1) {
// include dot
return name.substring(0, index + 1);
public static String getOuterProtoPrefix(Descriptors.Descriptor descriptor) {
String javaPackageName = descriptor.getFile().getOptions().getJavaPackage();
if (javaPackageName.isEmpty()) {
javaPackageName = descriptor.getFile().getPackage();
}

if (descriptor.getFile().getOptions().getJavaMultipleFiles()) {
return javaPackageName + ".";
}
if (descriptor.getFile().getOptions().hasJavaOuterClassname()) {
String outerName = descriptor.getFile().getOptions().getJavaOuterClassname();
return javaPackageName + "." + outerName + ".";
} else {
return "";
String[] fileNames = descriptor.getFile().getName().split("/");
String fileName = fileNames[fileNames.length - 1];
String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]);
// https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
// The name of the wrapper class is determined by converting the base name of the .proto
// file to camel case if the java_outer_classname option is not specified.
// For example, foo_bar.proto produces the class name FooBar. If there is a service,
// enum, or message (including nested types) in the file with the same name,
// "OuterClass" will be appended to the wrapper class's name.
if (outerName.equals(descriptor.getName())) {
return javaPackageName + "." + outerName + PbConstant.PB_OUTER_CLASS_SUFFIX + ".";
} else {
return javaPackageName + "." + outerName + ".";
}
}
}

Expand Down
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
import org.apache.flink.table.data.RowData;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto same outer class name data to flink internal data. */
public class SameOuterClassNameTest {

@Test
public void testSimple() throws Exception {
TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
.setA(1)
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
testSameOuterClassName.toByteArray());

assertEquals(1, row.getInt(0));
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto timestamp data with multiple_files options to flink internal data. */
public class TimestampMultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestMulti timestampTestMulti =
TimestampTestMulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestMulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Test conversion of proto timestamp data with multiple_files and outer_classname options
* to flink internal data.
*/
public class TimestampOuterMultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestOuterMulti timestampTestOuterMulti =
TimestampTestOuterMulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestOuterMulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto timestamp data with outer_classname options to flink internal data. */
public class TimestampOuterNomultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestOuterNomultiProto.TimestampTestOuterNomulti timestampTestOuterNomulti =
TimestampTestOuterNomultiProto.TimestampTestOuterNomulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestOuterNomulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}

0 comments on commit f06c17e

Please sign in to comment.