New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding StringFormat to available formatters #283
Conversation
It looks like @alexnu hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @alexnu just signed our Contributor License Agreement. 👍 Always at your service, clabot |
Could this be used for CSV data, or just single lines? |
Just single lines and we've tested it together with StringConverter. But maybe it could work for CSV too, I'm not sure. All it does is write the whole value of the message as a string. |
I was mostly curious about if CSV could be done with respect to Hive tables stored as text. |
Unfortunately, Hive is not supported by this PR. I followed the example of JSON formatter which throws an |
Yes, I noticed that, and noticed that your Unsupported Exception string also says "JSON" instead of "Text". Also, wanted to ask if |
You are right about the Unsupported Exception string, I already changed it. About the |
So I replaced |
Not really sure why the tests are failing. I tried running them locally and they all pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alexnu!
The addition of this format seems straightforward and thus, I believe we can add it soon to target 4.1
. It's also low risk. I've left just a few comments that should be easy to address.
@@ -0,0 +1,55 @@ | |||
/** | |||
* Copyright 2017 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
@@ -0,0 +1,59 @@ | |||
/** | |||
* Copyright 2017 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
|
||
|
||
/** | ||
* A storage format implementation that exports JSON records to text files with a '.json' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc is off. Remainder from copying JsonFormat
@@ -0,0 +1,93 @@ | |||
/** | |||
* Copyright 2017 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
@@ -0,0 +1,96 @@ | |||
/** | |||
* Copyright 2017 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
log.trace("Sink record: {}", record.toString()); | ||
try { | ||
String value = (String) record.value(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep things tighter here. Extra blank line
|
||
writer.write(value); | ||
writer.write(LINE_SEPARATOR); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra blank line here too
List<SinkRecord> sinkRecords = new ArrayList<>(); | ||
for (long offset = 0, total = 0; total < size; ++offset) { | ||
for (TopicPartition tp : partitions) { | ||
String record = "Some random text..."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's include the offset
in the text message to produce different content in every record.
return new RecordWriter() { | ||
final Path path = new Path(filename); | ||
final OutputStream out = path.getFileSystem(conf.getHadoopConfiguration()).create(path); | ||
final OutputStreamWriter writer = new OutputStreamWriter(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure we need to wrap this with a BufferedWriter
. Let's use one with 128*1024
bytes as size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, similar to what we do elsewhere, any constant should be a private static final variable. Same for the WRITER_BUFFER_SIZE
here
String value = (String) record.value(); | ||
|
||
writer.write(value); | ||
writer.write(LINE_SEPARATOR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By using a BufferedWriter
you can use writer.newLine()
here instead and skip declaration of LINE_SEPARATOR
altogether in this class.
To unblock the build, you'll need to rebase on top of the recent changes in |
Thanks for the review @kkonstantine. I believe all of your comments are now resolved. Also, I rebased from master but I'm getting errors at the following tests:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build fails due to a findbugs error re: the charset used by the writer.
Pushing this change myself as well as a javadoc improvement so we can merge this promptly. Thanks for addressing the comments @alexnu!
|
||
/** | ||
* A storage format implementation that exports records to text files with a '.txt' | ||
* extension. In these files, records are separated by the BufferedWriter's new line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit too specific w.r.t to implementation details for this javadoc. I'll change to by the system's line separator
.
return new RecordWriter() { | ||
final Path path = new Path(filename); | ||
final OutputStream out = path.getFileSystem(conf.getHadoopConfiguration()).create(path); | ||
final OutputStreamWriter streamWriter = new OutputStreamWriter(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where findbugs is complaining about not specifying a charset. Still, I'll use the default charset that can be overridden by the file.encoding
property and if not specified defaults to UTF-8
instead of introducing yet another connector config property.
format.class
.