New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message #464
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm overall minus some minor issues.
@@ -39,6 +39,8 @@ | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
|
|||
import static org.apache.samza.sql.data.SamzaSqlRelMessage.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix wildcard imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
record.put(fieldNames.get(index), values.get(index)); | ||
Object obj = values.get(index); | ||
String fieldName = fieldNames.get(index); | ||
if (obj instanceof SamzaSqlRelRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need "obj != null && ..." ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check does not seem to be required before instanceof - https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.20.2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check does not seem to be required before instanceof - https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.20.2
return convertRecord(schema); | ||
} | ||
|
||
private RelDataType convertRecord(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't this be called: convertSchema of fromSchema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to convertRecordType(). Does this sound ok ?
@Override | ||
public SamzaSqlRelMessage fromBytes(byte[] bytes) { | ||
try { | ||
ObjectMapper mapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ObjectMapper can be reused. Can you create a single instance and reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serde instances are serialized as well. ObjectMapper is not a serializable object hence cannot be made a member variable.
@Override | ||
public byte[] toBytes(SamzaSqlRelMessage p) { | ||
try { | ||
ObjectMapper mapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
import org.apache.samza.storage.kv.RocksDbTableDescriptor; | ||
import org.apache.samza.table.Table; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; | ||
import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix wildcard imports and also the one above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@@ -94,7 +96,8 @@ void translate(final LogicalJoin join, final TranslatorContext context) { | |||
tableKeyIds); | |||
|
|||
JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); | |||
JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); | |||
SamzaSqlRelMessageSerde relMsgSerde = | |||
(SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why both args are null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because they are not needed. SamzaSqlRelMessageSerdeFactory implements SerdeFactory and getSerde is an API that needs to be implemented. In this case, the args are not required.
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix wildcard import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the license.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh.. I missed this.
@@ -55,11 +55,13 @@ void translate(final LogicalFilter filter, final TranslatorContext context) { | |||
|
|||
return inputStream.filter(message -> { | |||
Object[] result = new Object[1]; | |||
expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result); | |||
expr.execute(context.getExecutionContext(), context.getDataContext(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick comment since you are modifying the XxxTranslator classes: can we add one unit test class per translator class in this module? I was working on merging the SamzaSql module w/ serialization PR and the biggest difficulties I had was that I need to create individual TestXxxTranslator class to do the whitebox testing. It would be great if you can add the unit test class per translator class here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, I have opened a ticket (SAMZA-1641) to track adding UTs for Translator classes.
import org.apache.samza.storage.kv.RocksDbTableDescriptor; | ||
import org.apache.samza.table.Table; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; | ||
import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey; | ||
import static org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here. Please add unit test class for JoinTranslator.java.
} | ||
|
||
private Object getRelField(RelDataType relType, Object avroObj) { | ||
if (avroObj instanceof GenericData.Record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not entirely sure, But i don't think this will work if the schema has array of records or Map of records. I think you might need to do this conversion in AvroToRelObjConverter and for Arrays and Maps you might need to recursively call the conversion.
Also can you add a test case for array of records and map of records if they are not already present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I have now added support for Map, Array and Union with one nullable type.
.collect(Collectors.toList())); | ||
values.addAll(avroRecord.getSchema().getFields() | ||
.stream() | ||
.map(x -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work if the sub record has an Array. I think this conversion logic here should use AvroToRelObjConverter. Ideally this conversion should recursively use the same conversion logic that we use for the top level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks a lot for doing this.
switch(schema.getType()) { | ||
case RECORD: | ||
if (avroObj == null) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a bug and also TODO to perform validations of the data types with the Avro schema here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, I have added a comment saying that we are not adding validations here considering the cost per message.
Added support for serialization of nested samza sql rel message and accordingly fixed the conversion of nested avro records to rel message. Please note that we still do not have support for the sql queries that point to fields in nested records (beyond the top level record).