-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENG-10385 formatter fix for importers #3586
Conversation
*/ | ||
public abstract Formatter<?> create(); | ||
public Formatter<?> create() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should take importer (for that matter) a name identifier. Dont have to pick from properties list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will remove import of ImportDataProcessor from formatter and formatter can be used anywhere else as well.
@@ -378,8 +379,8 @@ synchronized void commit(long offset, BigInteger v) { | |||
} | |||
|
|||
synchronized BigInteger getSafeCommitPoint() { | |||
if(checkpoints != null && validateOffset((int) c)){ | |||
return checkpoints[(int) c]; | |||
if (checkpoints != null && validateOffset((int) c)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont make changes to lines you have not modified....makes it hard to look at diff.
@@ -263,7 +269,7 @@ private void log() { | |||
} | |||
KafkaStreamImporterConfig config = new KafkaStreamImporterConfig(uri, brokerList, topic, | |||
part.partitionId(), new HostAndPort(leader.host(), leader.port()), | |||
groupId, fetchSize, soTimeout, procedure, formatterFactory); | |||
groupId, fetchSize, soTimeout, procedure, formatterFactory, formatProps, formatName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the formatter name be an attribute of fomatterFactory? let's try to weave the smallest number of new parameters possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The importer configurations of the same import module share the same instance of the formatter factory. thus formatter name is passed as a separate arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the factory configureFormatterFactory method to pass both the formatter name and properties. You can can add accessors to both of those and use them to make discernments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer very much that you would not weave additional parameters to a lot of methods when you don't have to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap formate name, properties and factory into FormatterBuilder. Every configuration of importers has its own FormatterBuilder which will delegate formatter creation to formatter factory. Importer calls FormatterBuilder.create to instantiate its own formatter instance per URI.
You should wire in 2 formatter configuration in TestImportSuite or something similar to tests that 2 formatters for single importer can work. If you want you can create 2 simple formatter e.g number formatter and date formatter and have socket importer configure both and push diff data to diff sockets and validate. |
you have merge conflicts please merge with origin/master. |
@@ -148,7 +148,7 @@ public StreamConsumer() { | |||
public void initialize(InitializationInput initInput) { | |||
|
|||
m_shardId = initInput.getShardId(); | |||
m_formatter = ((Formatter<String>) m_config.getFormatterFactory().create()); | |||
m_formatter = (Formatter<String>) m_config.getFormatterBuilder().create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance you can make formatter builder templated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
TestCSVFormatterSuite now tests two socket importer configurations which use their own csv formatters. |
No description provided.