APEXMALHAR-2066 JdbcPolling,idempotent,partitionable #282
Conversation
JdbcPolling,idempotent,partitionable |
protected transient boolean isReplayed; | ||
protected transient boolean isPollable; | ||
protected int batchSize; | ||
protected int fetchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadocs for properties
1538b57
to
8ad2fae
Compare
|
||
} catch (SQLException e) { | ||
LOG.error("Exception in initializing the range query for a given partition", e); | ||
throw new RuntimeException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new RuntimeException(e);
No need to log error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also wrap original exception, use Throwables.propagate
What are the challenges in extending AbstractJdbcInputOperator to support new features ? As AbstractJdbcInputOperator is Evolving, there is no backward compatibility issue. |
@sandeepdeshmukh I will let @devtagare comment but I believe this operator makes assumptions on how data is organized in the database and expects the implementations to provide that information whereas the original operator makes no such assumptions. |
Any updates? |
I have updated the JIRA with assumptions for the operator - https://issues.apache.org/jira/browse/APEXMALHAR-2066. Waiting for comments from more folks before making final changes. |
protected String upper; | ||
protected boolean recovered; | ||
protected boolean isPolled; | ||
protected String whereCondition = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also clarify which subset of SQL will be supported here? How complex can the where clause be?
@devtagare : I wen through the Jira but it still doesn't mention how is this different from existing version and why can't existing version be enhanced? |
* Dont flush if its pollable*/ | ||
if (!hasNext) { | ||
if ((isPollable && isPolled) || !isPollable) { | ||
emitQueue.add(metaList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might throw an exception if the queue is full. Use offer() instead?
ResultSet rs = preparedStatement.executeQuery(); | ||
|
||
while (rs.next()) { | ||
rowCount = Long.parseLong(rs.getString("RowCount")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to use index instead of a column alias. Also seems like this is a simple count utility. Can you rename it to something like "getTupleCount()" ?
@devtagare Apart from the existing comments I am fine with the changes. The only major concern I have is related to the way polling is done might be dependent on a lot of things like the target database implementation and the organization of data in the database. |
4ed207d
to
6687bad
Compare
@bhupeshchawda - review comments are incorporated.Could you please check. |
pollableInstance.beginWindow(1); | ||
Thread.sleep(700); | ||
pollableInstance.endWindow(); | ||
pollableInstance.emitTuples(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is emitting outside the window. Needs to be between begin and end window.
@devtagare I can still see a lot of open comments. Can you please address them? |
6687bad
to
6d8f237
Compare
@bhupeshchawda - fixed the latest comments and pushed the changes.Mail sent to dev@. |
long endTs = System.currentTimeMillis(); | ||
long ioTime = endTs - startTs; | ||
long sleepTime = pollInterval - ioTime; | ||
LOG.debug("pollInterval = " + pollInterval + " I/O time = " + ioTime + " sleepTime = " + sleepTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use {} syntax for log message, the string gets evaluated even if log level is not enabled.
Why would this not be part of the javadoc? sent from mobile
|
Yes, this should be part of javadoc as well. |
6d8f237
to
4c7d268
Compare
JdbcPolling,idempotent,partitionable #282