Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2608] add json to avro converter #7463

Open
wants to merge 5 commits into
base: release-0.11.0
Choose a base branch
from

Conversation

abhishekkh
Copy link

@abhishekkh abhishekkh commented Dec 15, 2022

Change Logs

Handle Jsonschema serialized kafka messages as a Hudi source. Refer issue https://issues.apache.org/jira/browse/HUDI-2608

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekkh Thanks for your contribution! May I know why the destination branch is release-0.11.0 and not master?

String jsonType = property.get("type").getAsString();

switch (jsonType) {
case "integer":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't there some static constants / type enums defined in gson for these? If not, it would be better to define static constants in our code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope yes, we could. This PR is not ready for review yet. I am still working on using the KafkaJsonSchemaDeserializer when deserializing events read from the kafka source. Looking at the JsonKafkaSource class, everything seems to be converted into a String first and then avro later. That becomes a problem when reading decimal numbers as the string conversion leaves the value as AA== for a decimal value of 0.00. Now when the value is converted back to AVRO it throws a wrong datatype exception as it was expecting a double and not a string.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to introduce a JsonSchemaKafka source that reads the consumer records and keeps the type information instead of converting them to string, before conversion to Avro. Let me know if you see a better way to do this

return "boolean";
case "object":
return jsonPropertiesToAvro(property.get(PROPERTIES).getAsJsonObject());
// TODO: handle json array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create a ticket for this todo

Schema parse = new Schema.Parser().parse(avroSchema);

assert !parse.isError();
assert parse.getName().equals("tranlog");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use Assertions.assertEquals and similar in our test cases. Also, let's cover more scenarios to test.

* @return Object avro datatype
* @throws Exception
*/
private static Object jsonTypeToAvroType(JsonObject property) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also cover nested fields?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does

@codope codope added schema-and-data-types priority:major degraded perf; unable to move forward; potential bugs hudistreamer issues related to Hudi streamer (Formely deltastreamer) labels Dec 19, 2022
@codope codope self-assigned this Dec 19, 2022
@abhishekkh
Copy link
Author

@abhishekkh Thanks for your contribution! May I know why the destination branch is release-0.11.0 and not master?

Wasn't sure which was the stable release as this is my first contribution, I can move it against master

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan xushiyan changed the title add jsontoavro converter [HUDI-2608] add json to avro converter Jan 2, 2023
@nsivabalan nsivabalan assigned nsivabalan and unassigned codope Jan 19, 2023
public static final String PROPERTIES = "properties";
public static final String FIELDS = "fields";
public static final String RECORD = "record";
public static final String MARQETA_JCARD_NAMESPACE = "com.marqeta.jcard";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems specific to your company or project, can we make this more generic?

@@ -1044,8 +1044,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
<scope>test</scope>
<version>2.9.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be careful with this change since it is changing the scope for all modules that are not currently overriding the scope

* @return table name
*/
private static String getTableName(String title) {
Pattern pattern = Pattern.compile("cdc_marqeta_jcard_(.*).Envelope");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly this seems specific to a use case

avroFields.add(avroObject);
}
} catch (Exception e) {
System.out.println("exception: " + e.getCause().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use logging instead of printing


import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use jackson instead of gson since we're already using jackson for json deserialization

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer) priority:major degraded perf; unable to move forward; potential bugs schema-and-data-types size:L PR with lines of changes in (300, 1000]
Projects
Status: 🏗 Under discussion
Development

Successfully merging this pull request may close these issues.

None yet

5 participants