Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-251] JDBC incremental load to HUDI with DeltaStreamer #917

Closed
wants to merge 2 commits into from
Closed

[HUDI-251] JDBC incremental load to HUDI with DeltaStreamer #917

wants to merge 2 commits into from

Conversation

taherk77
Copy link
Contributor

No description provided.

@taherk77
Copy link
Contributor Author

https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_incremental_imports mentions --last_value. I think we should be computing that within the source. My vision is once the user specifies the incremental column and the interval we should be pulling the data every interval.

so in short deltastreamer will never get shut down unless it is manually shut and will keep pulling new data from the JDBC source.

The other option is that when we set is_incremental as false then we pull all the data at once and then write it.

@taherk77
Copy link
Contributor Author

@vinothchandar Travis tests failed for modules that I haven't touched. What should we do?

@vinothchandar
Copy link
Member

vinothchandar commented Sep 24, 2019

The other option is that when we set is_incremental as false then we pull all the data at once and then write it.

this can be implemented by passing --checkpoint null or --full-load flag? This is actually a general issue for all sources.. it would be good to open a new JIRA for this and tackle separately.. For e.g, even if you have files on DFS, you want to probably have an option to do this.. For this PR, we can just focus on incremental pulling where the first run without checkpoint, pulls the entire table?

the interval we should be pulling the data every interval.

On the interval, may be I was vague. apologies. What I meant was, the frequency at which we run DeltaStreamer is controlled by the user in non-continuous mode and #921 just added a flag to control this in continuous mode. Don't think we need to worry about it in this PR?

@vinothchandar
Copy link
Member

That seems like a flaky test?

Failed tests: 
  TestMergeOnReadTable.testRollbackWithDeltaAndCompactionCommit:421 expected:<1> but was:<0>

Hmmm. for now, you can restart the build on travis and it should go away.. have not seen travis be flaky in a while.. So if it persists, we can take it up separately

@@ -0,0 +1 @@
hudi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure this is not any real password :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of this file. it is not even been used in the Junits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) we dont have such strong passwords .. haha

@taherk77
Copy link
Contributor Author

this can be implemented by passing --checkpoint null or --full-load flag? This is actually a general issue for all sources.. it would be good to open a new JIRA for this and tackle separately.. For e.g, even if you have files on DFS, you want to probably have an option to do this.. For this PR, we can just focus on incremental pulling where the first run without a checkpoint, pulls the entire table?

So here is the kind of algorithm that I think of implementing

with load params (incremental_ column and interval x)

  1. First, run do an entire pull of the table, write max(incremental_column) as last_val to the checkpoint.
  2. second schedule after interval x run with Spark JDCB predicate pushdown do "select * from table where incremental_column > last_val" write data this rdd and the again write max(incremental_column) as checkpoint again and keep going.

the interval we should be pulling the data every interval.

On the interval maybe I was vague. apologies. What I meant was the frequency at which we run DeltaStreamer is controlled by the user in non-continuous mode and #921 just added a flag to control this in continuous mode. Don't think we need to worry about it in this PR?

On interval meaning, if a thread that keeps running until killed and keeps scheduling the JDBC job after the mentioned interval. I had some concerns here, let's say a JDBC job takes 10 mins to complete but user mentions 5 mins interval that means jobs will keep on piling right?

Last concern, ever time we write JDBC jobs in spark standard practise if

spark.read().jdbc("url,"table","someTableColumn",1,10,connectionProps) here 1 is lowerbound and 10 is upperbound with this each Spark executor does a range query and pulls data independently. Without the lowerbound and upperbound all data will be pulled in one executor, which will make this process really slow. So how do we incorporate this in the code?

even if we call a repartition on spark.read.jdbc that means all data will still come to one executor and then get repartition from there.

@vinothchandar
Copy link
Member

So here is the kind of algorithm that I think of implementing

steps 1 & 2 sound good to me.

but user mentions 5 mins interval that means jobs will keep on piling right?
next batch wont be schedule until the first one completes. so there is backpressure there already to prevent pile up.

spark.read().jdbc("url,"table","someTableColumn",1,10,connectionProps)
does not spark not already parallelize the pull? i.e allocate 1-2 to one executor, 2-3 to another and so on?

@taherk77
Copy link
Contributor Author

Hi @vinothchandar I have created what we discussed. If checkpoint found we reference the checkpoint or else do a whole pull. Have also added a few other options and tested through JUNIT locally.

I am really unsure how to mock the MySQL database in the JUNIT test! Can you give me some pointers on that please it would really be helpful!?

@taherk77
Copy link
Contributor Author

Also can we write the incremental column along with the last_val to the checkpoint? I am saying this because let's say for run 1 we do incremental column as "contract_id" and let's say the max value written was 8, and then for run 2 I change the params to use incremental column as "contract_created_at" then the ppd query will be "select * from contract where contract_created_at > "8"" which is incorrect and will fail!. So if we write the column we first very the column and only then run or else we can straight away throw an exception and exit.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few comments.. Its taking shape!

On mocking mysql.. You can probably just mock at the jdbc level e.g https://dzone.com/articles/mockito-basic-example-using-jdbc or use something like Derby (embeddable sql) and write a test against a real thing


private static Logger LOG = LoggerFactory.getLogger(JDBCSource.class);

private final String ppdQuery = "(select * from %s where %s >= \" %s \") rdbms_table";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static?

dataFrameReader = dataFrameReader
.option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME));

if (!properties.containsKey(Config.PASSWORD)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2c. could be lot simpler, if we just checked one by one and then at the end , threw the error based on whether password was obtainable

String password = null;
if (properties.containsKey(Config.PASSWORD_FILE)) {
   password = //set if you can read it from file
}
if (properties.getString(Config.PASSWORD)) {
  password = //set value. 
}
if (password == null) {
  // throw the error 
}

} else {
throw new IllegalArgumentException(String.format("%s cannot be null or empty. ", Config.PASSWORD));
}
if (properties.containsKey(Config.EXTRA_OPTIONS)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of splitting/ecnoding using , like this.. can you just take all properites that begin with the prefix. hoodie.datasource.jdbc

Copy link
Contributor Author

@taherk77 taherk77 Oct 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra.options is for adding some properties for jdbc pulls. so lets say use writes the below spark code.

      .option("url", jdbcUrl)
      .option("dbtable", "datetimePartitionTest")
      .option("partitionColumn", "d")
      .option("lowerBound", "2018-07-06")
      .option("upperBound", "2018-07-20")
      .option("numPartitions", 3)
      .option("timestampFormat", "MM/dd/yyyy h:mm:ss a")
      .option("oracle.jdbc.mapDateToTimestamp", "false")
      .option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'")
      .load()

Now how do we let the user express the same with a properties file? We use extra.options

hoodie.datasource.jdbc.extra.options=" fetchsize=1000, timestampFormat=MM/dd/yyyy h:mm:ss a, oracle.jdbc.mapDateToTimestamp=false, sessionInitStatement="ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra.options is for adding some properties for jdbc pulls. so lets say use writes the below spark code.

val df1 = spark.read.format("jdbc") .option("url", jdbcUrl) .option("dbtable", "datetimePartitionTest") .option("partitionColumn", "d") .option("lowerBound", "2018-07-06") .option("upperBound", "2018-07-20") .option("numPartitions", 3) .option("timestampFormat", "MM/dd/yyyy h:mm:ss a") .option("oracle.jdbc.mapDateToTimestamp", "false") .option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") .load()

Now how do we let the user express the same with a properties file? We use extra.options

hoodie.datasource.jdbc.extra.options=" fetchsize=1000, timestampFormat=MM/dd/yyyy h:mm:ss a, oracle.jdbc.mapDateToTimestamp=false, sessionInitStatement="ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'"

I'm seeing users using "=" in some of the options they pass. Hence I think we should reconsider what the delimiter should be. I think || should do it. Thinking of how no one ever uses a double pipe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or simply do

hoodie.deltastreamer.source.jdbc.fetchsize=1000
hoodie.deltastreamer.source.jdbc.timestampFormat=...

and so on? In your code, just look for properties with prefix hoodie.deltastreamer.source.. . I think KafkaSource works like this.. You can check it out..

*/
private static final String URL = "hoodie.datasource.jdbc.url";

private static final String URL_PROP = "url";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename defaults to DEFAULT_URL_PROP and keys to URL_PROP?

/**
* {@value #TIME_FORMAT_PROP} used internally to build TIME_FORMAT for jdbc
*/
private static final String TIME_FORMAT_PROP = "timestampFormat";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these don't have defaults ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be remove I forgot that. A better way to handle all this is through the extra.options in jdbc.properties file
hoodie.datasource.jdbc.extra.options="fetchsize=1000,timestampFormat=yyyy-mm-dd hh:mm:ss"

DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER,
Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL));

Option<String> beginInstant =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not an instant right? can you rework variable names based on the jdbc source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to lastCheckpoint

private Pair<Option<Dataset<Row>>, String> sendDfAndCheckpoint(Dataset<Row> rowDataset, boolean isIncremental) {
if (isIncremental) {
Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN));
final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might fetch the data out twice? once to compute the aggregation and once to actually read the values out? can we either an accumulator (if possible) or just persist the dataset before you call .agg ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as my knowledge it should not pull the rdd twice. However, I will still get an explain plan and check if I can see that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will unless you cache it.. I am pretty certain :) // let me know what you find.

.getString(0);
LOG.info("Sending {} with checkpoint val {} ", incrementalColumn, max);
return Pair.of(Option.of(rowDataset), max);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. so the use of isIncremental is to actually to reset the checkpoints once in a while and force a bulk load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me put it this way for you.
case 1: A JDBC job can be continuous but not incremental
case 2: A JDBC job can be continuous and incremental
case 3: A JDBC job is not continuous but is incremental
case 4: A JDBC job is not continuous and not incremental

In case 1 and case 4 we never reference anything to/from the checkpoint and we pull the entire table always.

In case 2 and 3 we reference the checkpoint to see the last checkpointed value. If the value is present, we build an incremental query or else we do the whole table pull and write the max of incremental column and keep doing this same thing again and again. This is what I think will be best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.. Once you have this, we should also probably think about generalizing across sources.. I like the fact that you can do both modes in the same way.

Please file a JIRA for tracking this, if you also agree

@@ -0,0 +1,13 @@
----------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this file?

} else {
throw new IllegalArgumentException(String.format("%s cannot be null or empty. ", Config.PASSWORD));
}
if (properties.containsKey(Config.EXTRA_OPTIONS)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or simply do

hoodie.deltastreamer.source.jdbc.fetchsize=1000
hoodie.deltastreamer.source.jdbc.timestampFormat=...

and so on? In your code, just look for properties with prefix hoodie.deltastreamer.source.. . I think KafkaSource works like this.. You can check it out..

private Pair<Option<Dataset<Row>>, String> sendDfAndCheckpoint(Dataset<Row> rowDataset, boolean isIncremental) {
if (isIncremental) {
Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN));
final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will unless you cache it.. I am pretty certain :) // let me know what you find.

.getString(0);
LOG.info("Sending {} with checkpoint val {} ", incrementalColumn, max);
return Pair.of(Option.of(rowDataset), max);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.. Once you have this, we should also probably think about generalizing across sources.. I like the fact that you can do both modes in the same way.

Please file a JIRA for tracking this, if you also agree

@vinothchandar
Copy link
Member

@taherk77 Just a bump to make sure you got the last messages :)

@taherk77
Copy link
Contributor Author

taherk77 commented Oct 8, 2019

Hey really sorry Vinoth. I am moving to a new company soon and hence things are a little slow here. As of the commit it was just to move things fast on to the git repo. Apologies for replying late, however I've been really caught up in all this moving thing. I will pick up where i left off hopefully by 1st November the latest

@vinothchandar
Copy link
Member

oh.. Good luck! and no need to apologize. Was just following up :) take your time

@taherk77 taherk77 closed this Oct 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants