-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-2156: Add ListDatabaseTables processor #642
Conversation
@WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"), | ||
@WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table") | ||
}) | ||
@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be "cluster"? That way when primary node changes it will keep the same listing of tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't sure about that but makes sense to me :) will change to cluster.
Alright, after giving it some time and a cup of coffee I realize how off I was at first, lol. This processor reaches out to the DB asking for the tables. Then for each table that isn't already stored in state it creates a flowfile. If the processor is configured to give the count, it needs to send a SQL query asking for it. If that query fails it will remove the flowfile it created and continue onto the next table. If successful, the FQN of the table will then be added to state (after queuing it to transfer). That realization makes my comment about data loss void (was afraid it would get stored in state after un-successfully getting the count). One new comment, would a user want to set an expiration for tables in state? That way they could get updates on the count of a table every X seconds/minutes. In it's current form it will get the table once but never again. You're already storing the timestamp as the value so it should be an easy addition. |
Yes, at one point I had a "Refresh Interval" property but I think that was in another branch, will restore it. Also, currently any change to properties will reset the state (since the tables fetched may have changed), I'm thinking of taking that part out. The Refresh Interval would cause all tables to be re-fetched, and/or the user could always manually clear state. What do you think? |
} | ||
if (lastRefreshed > 0 && refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval)) { | ||
stateManager.clear(Scope.CLUSTER); | ||
stateMapProperties.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why clear all the properties at once and not just have an expiration time that applies to each table? Where if you have it set to 5 minutes, the processor will end up reporting the count of the table every minutes (while it still exists). You already store the timestamp under the FQN.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, will add.
refreshTable = false; | ||
} | ||
} catch (final NumberFormatException nfe) { | ||
getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception really shouldn't happen since we are setting the long ourselves but if it does it will fail entirely until state is cleared by the user. In addition to returning and yielding, it should probably clear the offending state entry (and log in the error message that this is hapenning and the ramifications). This will at least give the processor a chance to continue working if it every reaches this state.
// Update the last time the processor finished successfully | ||
stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER); | ||
// Update the timestamps for listed tables | ||
stateManager.setState(stateMapProperties, Scope.CLUSTER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not change this to setState. It will overwrite anything that is in State. If someone does end up running clustered and not primary Node it will blow it away without warnings.
Instead just check if the prior map version was -1 and do a set instead of replace.
+1 Visually verified code and any comments were addressed. Ran a contrib check build and verified functionality in a standalone cluster hitting a MySQL DB. Thanks @mattyb149, I will squash and merge |
No description provided.