-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] [HUDI-251] JDBC incremental load to HUDI DeltaStreamer #969
Conversation
@vinothchandar @leesf Travis failed on modules which weren't touched by me. Any idea how to restart the travis build |
Also guys another thing that we need to test and implement here is the continuous pull where user gives an interval and after every interval deltastreamer starts to pull from rdbms until it is terminated by the user. |
|
.option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME)); | ||
|
||
if (properties.containsKey(Config.PASSWORD) && !StringUtils | ||
.isNullOrEmpty(properties.getString(Config.PASSWORD))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the value of Config.PASSWORD
would be empty in some case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Please advise should we entertain empty password?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be in some test setups?. might be good to allow that actually.
String[] split = prop.split("\\."); | ||
String key = split[split.length - 1]; | ||
String value = properties.getString(prop); | ||
LOG.info(String.format("Adding %s -> %s to jdbc options", key, value)); | ||
dataFrameReader.option(key, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would extral options be configured EXTRA_OPTIONS + "a.b" = value
? Will add b = value
but a.b = value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it will be a.b it will always be a = value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it will be a.b it will always be a = value
I find it will add b = value
, correct me if i am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf I researched more about the options. All options that i saw were lowerbound, upperbound, numPartitions etc. which fit well to what we have do above. However I came across OracleIntegrationSuite.scala
which uses oracle.jdbc.mapDateToTimestamp
property so I think we will have to change the code to support this.
@vinothchandar any further comments on this?
addExtraJdbcOptions(properties, dataFrameReader); | ||
|
||
if (properties.containsKey(Config.IS_INCREMENTAL) && StringUtils | ||
.isNullOrEmpty(properties.getString(Config.IS_INCREMENTAL))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the value equals to true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you are trying to ask here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf i understand what you're saying. I shall make that change
} | ||
|
||
@Override | ||
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about use sourceLimit
to limit num records read from RDBMS. Thats select xxx from xxx limit sourceLimit
or set fetchsize = sourceLimit
to dataFrameReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf problem with that would be not all databases support the limit clause. Some support limit, some say top and the lingo is different. To instead just pull everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could limit the df if needed but that's just added complexity on the user. Because between batches we would then have to manage what we have already sent earlier and what we should send now and in either case spark is always reading everything from RDBMS but just limiting to df level so i don't think we should be doing anything here. @vinothchandar please advise further if am wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf and @vinothchandar Im sure we cannot limit through sql query as I see that Mysql and Postgres do SELET * FROM XXX LIMIT 1
where as orcale uses SELET * FROM XXX FETCH NEXT 1 ROWS
and derby uses SELET * FROM XXX FETCH FIRST 10 ROWS ONLY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, in case different SQL syntax in different RDBMS, I think it is ok to ignore sourceLimit if there is no better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limiting might be helpful to break down the load into smaller chunks. DBMSes don't usually like large scans... So having some ability to limit would actually be good..
@taherk77 how about having the ability to add a LIMIT
clause depending on the jdbc endpoint.. it should tell you if its MySQL or Postgres (two are very popular anyway, so having this working even for those 2 initially would be awesome)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So on the driver class that the user gives straight away if it contains MySql or postgres keyword we should start applying limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes usually the jdbc url is like jdbc:mysql:
. and jdbc:postgressql:
,. you can just match this.. Also document this special handling..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limiting might be helpful to break down the load into smaller chunks. DBMSes don't usually like large scans... So having some ability to limit would actually be good..
@taherk77 how about having the ability to add a
LIMIT
clause depending on the jdbc endpoint.. it should tell you if its MySQL or Postgres (two are very popular anyway, so having this working even for those 2 initially would be awesome)
Hi @vinothchandar so do you mean if the user sets the limit to 10. For postgres and MYSQL we should do select * from table limit 10?
I dont think that would work here with the type of semantics we have here. In continuous mode and full scan JDBC scans the whole table every interval.
In incremental we first do a full scan and write checkpoints we then assume the column given for incremental is either a long, int or timestamp. If the query for incremental fails then we fall back to full scans. How would limit work here?
It would always keep getting the same records.
Further talking about interval of jobs. This has not yet implemented as I do not have clarity. I want to know how we should do it. This would require further brain storming of how to keep schedule jobs.
Assert.assertEquals(10, rowDataset.where("commit_time=000").count()); | ||
Assert.assertEquals(10, rowDataset.where("commit_time=001").count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rowDataset1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will correct that
@leesf @vinothchandar can we extract DataFrameReader from a Dataset? Would be really helpful in testing properties if there is a way |
@leesf @vinothchandar Travis failed again on the same module as before. |
As far as I know, we could easily get Dataset from DataFrameReader, But I didn't find a way to get DataFrameReader from Dataset, let me know if there is. |
@leesf @vinothchandar All changes are addressed and fixed. Travis builds failing because of random VM crashes. What can we do further? |
Is this good to go now? |
@leesf @vinothchandar All tests passing guys!!!!!!!! 🥇 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments.. Looks almost ready!
.option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME)); | ||
|
||
if (properties.containsKey(Config.PASSWORD) && !StringUtils | ||
.isNullOrEmpty(properties.getString(Config.PASSWORD))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be in some test setups?. might be good to allow that actually.
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JDBCSource.java
Show resolved
Hide resolved
.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) { | ||
LOG.info( | ||
String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE))); | ||
FileSystem fileSystem = FileSystem.get(new Configuration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use the configuration object from spark context. otherwise, it may not be able to pick up settings placed there
FileSystem fileSystem = FileSystem.get(new Configuration()); | ||
passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE))); | ||
byte[] bytes = new byte[passwordFileStream.available()]; | ||
passwordFileStream.read(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC there is already a helper method in FileIOUtils
for this? can we reuse that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I need to read from FS like hdfs, gcs or s3 which the hadoop fs FileSystem will be useful for. FileIOUtils dont use FileSystem for reads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you can pass the passwordFileStream
to FileIoUtils.readAsByteArray` correct? its just another InputStream,?
String key = Arrays.asList(prop.split(Config.EXTRA_OPTIONS)).stream() | ||
.collect(Collectors.joining()); | ||
String value = properties.getString(prop); | ||
if (!StringUtils.isNullOrEmpty(value)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.. should we allow empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will allow empty options
} | ||
|
||
@Override | ||
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limiting might be helpful to break down the load into smaller chunks. DBMSes don't usually like large scans... So having some ability to limit would actually be good..
@taherk77 how about having the ability to add a LIMIT
clause depending on the jdbc endpoint.. it should tell you if its MySQL or Postgres (two are very popular anyway, so having this working even for those 2 initially would be awesome)
try { | ||
if (isIncremental) { | ||
Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN)); | ||
final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would make a second pass over rowDataset
right.. Should we cache this so that we don't go and read from the database twice? Line 126 above, will do one fetch and compute the checkpoint and then the rowDataset is passed back to DeltaSync class back, and we start to fetch rows to write to Hudi, it will trigger another "recomputation" .. We should atleast support an option to cache this dataset IMO.
Also is there a way to do this via Accumulators? (could be tricky) :D We can file a JIRA for later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure for now should we cache in the fetch() method, get the max of checkpointing, uncache and then return the df?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. lets cache it in fetch() and add a parameter to control the persistence level, like we do in BloomIndex please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hoodie.datasource.jdbc.storage.level="MEMORY_ONLY_SER"
Has been added to the props file to get storage level. If no storage level is given by the user then we use the default as MEMORY_AND_DISK_SER
|
||
public static void cleanDerby() { | ||
try { | ||
Files.walk(Paths.get(DatabaseUtils.getDerbyDir())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already have a helper in FileIOUtils
hoodie.datasource.jdbc.table.incremental.pull.interval=5000 | ||
#Extra options for jdbc | ||
hoodie.datasource.jdbc.extra.options.fetchsize=1000 | ||
hoodie.datasource.jdbc.extra.options.timestampFormat="yyyy-mm-dd hh:mm:ss" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:newline
@vinothchandar @leesf Changes addressed |
@taherk77 if you could resovle comments after addressing them, that would be very helpful for reviewing incrementally. small tip :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@taherk77 I think this will make for a great blog post here.. You can provide an end-end example of how to bulk load, lkeep incrementally ingesting with code and command samples?
https://cwiki.apache.org/confluence/display/HUDI#ApacheHudi(Incubating)-How-toblogs
lmk if you are interested. We can work out the perms
Apologies. Will keep and mind |
Hi @vinothchandar and @taherk77
A sample MySQL query for incrementing timestamp columns as ( |
@pushpavanthar Great suggestion.. Let me see if we can structure this solution more,. Just supporting raw sql as input for extracting the data with the hoodie checkpoint simply being a list of string replaces in a template sql, could provide a lot of flexibility Taking the same example from above. user specifies the following SQL. (we can blog and document this well)
Hoodie checkpoint is a list of string values, once for each of the incremental column names, e.g All this said, I want to get a basic version working and checked in :) first. |
@vinothchandar Thanks. |
@pushpavanthar Was out for a conference.. So picking this back up.. I am also pretty interested in this feature. I think having some basic usable version (even if not perfect) will go a long way... Hudi will store checkpoints as a part of the commit metadata and supply them at the start of every batch to the source to substitute into SQL. There is no ticket open for you to add thoughts to it.. Feel free to open a new JIRA on your ideas? @taherk77 I unassigned the JIRA HUDI-251 since I was not sure if you were still working on this. At this point, @pushpavanthar or anyone interested in taking this forward , please go ahead.. |
Closing due to inactivity |
No description provided.