-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
integrate ABRiS with spark avro api #48
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job, thanks and congrats.
max_line_length = 120 | ||
|
||
[*.md] | ||
trim_trailing_whitespace = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
@@ -118,7 +108,7 @@ object SchemaLoader { | |||
|
|||
private def getSchemaId(paramId: String, subject: String): Int = { | |||
if (paramId == SchemaManager.PARAM_SCHEMA_ID_LATEST_NAME) { | |||
val latest = SchemaManager.getLatestVersion(subject) | |||
val latest = SchemaManager.getLatestVersionsId(subject) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean SchemaManager.getLatestVersionId
?
child: Expression, | ||
jsonFormatSchema: Option[String], | ||
schemaRegistryConf: Option[Map[String,String]], | ||
removeSchemaId: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe confluentCompliant
or something similar? This way, if Confluent comes up with new stuff we might be able to effect it without changing the API. It is very minor though.
child: Expression, | ||
schemaProvider: SchemaProvider, | ||
schemaRegistryConf: Option[Map[String,String]], | ||
prependSchemaId: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as in AvroDataToCatalyst
, maybe renaming to confluentCompliant
would be more extensible and also more informative of why this is here.
src/main/scala/za/co/absa/abris/examples/sql/NewConfluentKafkaAvroWriter.scala
Show resolved
Hide resolved
src/main/scala/za/co/absa/abris/examples/sql/NewConfluentKafkaAvroWriterWithKey.scala
Show resolved
Hide resolved
src/test/scala/za/co/absa/abris/avro/sql/CatalystAvroConversionSpec.scala
Show resolved
Hide resolved
README.md
Outdated
|
||
- Scala 2.11 | ||
|
||
- Spark 2.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change it to 2.4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we agreed this part of README will be removed, pom.xml documents the dependencies much better.
val avroBytes = dataframe | ||
.select(to_avro('bytes, schemaString) as 'avroBytes) | ||
|
||
avroBytes.show() // force evaluation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe .collect()
instead of show()
to avoid polluting the test results? The same is valid for all other methods that use show()
to materialise the results.
Co-Authored-By: Felipe Melo <felipesmmelo@gmail.com>
Co-Authored-By: Felipe Melo <felipesmmelo@gmail.com>
fb65100
to
b657647
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, tks.
This change should make the ABRis API very simmilar to spark avro API. This should allow for easier transition to and from ABRis for the users.