Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iterrows() and itertuples() DataFrame API, its usage is similar to pandas #369

Closed
wants to merge 30 commits into from

Conversation

kxbin
Copy link
Contributor

@kxbin kxbin commented Aug 3, 2021

Related to this issues:
Can we get batch data use df.to_pandas() in the case of big data? close #345

Add to_pandas_in_batch() DataFrame API

Then, We can use below code to get batch dataframe in the case of big data

pd_df_iterator = ed_df.to_pandas_in_batch(batch_size=1000)
for pd_df in pd_df_iterator:
    print(pd_df)

If there code is something wrong, please give some suggestions. Thank you!

@elasticmachine
Copy link

Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?

@cla-checker-service
Copy link

cla-checker-service bot commented Aug 3, 2021

💚 CLA has been signed

@V1NAY8
Copy link
Contributor

V1NAY8 commented Aug 3, 2021

So, I am thinking with this #368 We will eliminate scroll API.
Current logic is if batch_size is less than 10k (hard coded) we take search else we take scroll

Since, search API accepts only 10k size at once. else we have to use pagination to fetch next set of results.

I am thinking we can add a new parameter to to_pandas called batch_size which can take until 10k.
If given more than that. we throw a warning saying 10k is maximum and fetch results with max batch size

Also turn existing method into iterator by default

  • So, Is a different method required for this ?
  • Also we need to document how this iterator has to be used.

Once this is done. I am thinking if Collector wont be required after this. Since logic will be straight forward.

@sethmlarson What do you think ?

@kxbin
Copy link
Contributor Author

kxbin commented Aug 3, 2021

So, I am thinking with this #368 We will eliminate scroll API.
Current logic is if batch_size is less than 10k (hard coded) we take search else we take scroll

Since, search API accepts only 10k size at once. else we have to use pagination to fetch next set of results.

I am thinking we can add a new parameter to to_pandas called batch_size which can take until 10k.
If given more than that. we throw a warning saying 10k is maximum and fetch results with max batch size

Also turn existing method into iterator by default

  • So, Is a different method required for this ?
  • Also we need to document how this iterator has to be used.

Once this is done. I am thinking if Collector wont be required after this. Since logic will be straight forward.

@sethmlarson What do you think ?

@V1NAY8 Thanks for the reply

I agree with you.

We can throw a warning saying 10k is maximum and fetch results with max batch size,I will modify it now.

In addition, I also considered turn existing method into iterator by default.
but will some users want to get all the data at once? (On the other hand, compatibility is also considered.)

Maybe we can determine whether to return an iterator or a dataframe based on the batch_size parameters (batch_size is None or not None).
How about this?

@V1NAY8
Copy link
Contributor

V1NAY8 commented Aug 3, 2021

@kxbin Don't throw a warning now in this PR. The existing code can run for both < 10k and > 10k. I will throw if needed in my other changes.

@kxbin
Copy link
Contributor Author

kxbin commented Aug 3, 2021

@V1NAY8 Okay, I'll trouble you to throw a warning.

@V1NAY8
Copy link
Contributor

V1NAY8 commented Aug 3, 2021

So, we can turn internally into iterator by default. The method to_pandas() which will dump all the data at once, we can change it to iterate internally and construct a df and return.
We can expose another method just to expose iterator
Thus compatibility will be there 😛

@kxbin
Copy link
Contributor Author

kxbin commented Aug 3, 2021

So, we can turn internally into iterator by default. The method to_pandas() which will dump all the data at once, we can change it to iterate internally and construct a df and return.
We can expose another method just to expose iterator
Thus compatibility will be there 😛

This idea is great 😃

@sethmlarson
Copy link
Contributor

Thanks @kxbin and @V1NAY8 for your interest in this feature. I discussed this with the team and I believe we're thinking it makes sense to implement iterrows() which can be transformed by the user into a "chunking/batching".

The reason we'd like to do this instead of a DataFrame chunking API is it provides compatibility with pandas and only allows two ways to view your data (either "all" the data or every row) instead of any number of views based on batch_size. Does this make sense?

@kxbin
Copy link
Contributor Author

kxbin commented Aug 5, 2021

Thanks @kxbin and @V1NAY8 for your interest in this feature. I discussed this with the team and I believe we're thinking it makes sense to implement iterrows() which can be transformed by the user into a "chunking/batching".

The reason we'd like to do this instead of a DataFrame chunking API is it provides compatibility with pandas and only allows two ways to view your data (either "all" the data or every row) instead of any number of views based on batch_size. Does this make sense?

@sethmlarson Thanks for the reply

Yeah, If we can be compatible with pandas, it will make very sense to do so.
Because all users are very familiar with the usage of pandas.

We can implement the following methods:
ed.DataFrame.iterrows()
ed.DataFrame.itertuples()

Make it similar to pandas usage:
pandas.DataFrame.iterrows()
pandas.DataFrame.itertuples()

Please give me some time, I think I can finish it. 😛

@sethmlarson
Copy link
Contributor

Awesome, sounds great! 💪 Let me know if you have questions.

@kxbin
Copy link
Contributor Author

kxbin commented Aug 5, 2021

Awesome, sounds great! 💪 Let me know if you have questions.

Okay 😃

@kxbin kxbin marked this pull request as draft August 5, 2021 09:09
eland/dataframe.py Outdated Show resolved Hide resolved
eland/dataframe.py Outdated Show resolved Hide resolved
@sethmlarson
Copy link
Contributor

Btw you may want to wait for #370 to land and base your work off of this function that's been added. It may help you out a lot!

@kxbin
Copy link
Contributor Author

kxbin commented Aug 6, 2021

Btw you may want to wait for #370 to land and base your work off of this function that's been added. It may help you out a lot!

Thank you for your corrections and tips.

I will wait for the landing, otherwise it is possible that _es_result() will be modified at the same time, and there will be conflicts.

@sethmlarson
Copy link
Contributor

@kxbin I've merged #370, now it should be easier to implement this feature using eland.operations.search_after_with_pit()

@kxbin
Copy link
Contributor Author

kxbin commented Aug 9, 2021

@kxbin I've merged #370, now it should be easier to implement this feature using eland.operations.search_after_with_pit()

Thanks for the tips.

@kxbin
Copy link
Contributor Author

kxbin commented Aug 17, 2021

I think I should have succeeded in the optimization.
At present, the same 50,000 data sets, the test speed is as follows:

ed.iterrows(),  It took a total of `21 seconds` after the iteration
ed.itertuples(), It took a total of `25 seconds` after the iteration
ed.to_pandas(), It took `16 seconds`

This is a good idea!

I try to convert QueryCompiler._es_results_to_pandas() into a generator itself. And did some speed tests.

I used a data set of 50,000 rows to do the test, and the results are as follows:

Before conversion:

ed.iterrows(),  It took a total of `2 minutes 30 seconds` after the iteration
ed.itertuples(), It took a total of `3 minutes 53 seconds` after the iteration
ed.to_pandas(), It took `15 seconds`

After conversion:

ed.iterrows(),  It took a total of `1 minutes 52 seconds` after the iteration
ed.itertuples(), It took a total of `2 minutes 11 seconds` after the iteration
ed.to_pandas(), It took `1 minutes 54 seconds`

@kxbin kxbin requested a review from sethmlarson August 17, 2021 06:39
@sethmlarson
Copy link
Contributor

jenkins test this please

Copy link
Contributor

@sethmlarson sethmlarson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really great for a first contribution, thanks for working on this! I have some comments below for you, however I also have a higher-level comment of I think our search_after_hits implementation should be yielding "batches" of documents instead of individual documents.

This makes more sense I think because the "post-processing" phase is a lot more efficient to be called less frequently and means we'll end up creating fewer pd.DataFrame in the end. We'll still be able to use the current setup we're doing now in itertuples and iterrows having yield from pd_df.itertuples(). I can accomplish this in a separate PR.

eland/operations.py Outdated Show resolved Hide resolved
docs/sphinx/reference/api/eland.DataFrame.iterrows.rst Outdated Show resolved Hide resolved
docs/sphinx/reference/api/eland.DataFrame.itertuples.rst Outdated Show resolved Hide resolved
eland/query_compiler.py Outdated Show resolved Hide resolved
eland/operations.py Outdated Show resolved Hide resolved
eland/operations.py Outdated Show resolved Hide resolved
eland/query_compiler.py Outdated Show resolved Hide resolved
@sethmlarson
Copy link
Contributor

Looks like the lint and docs jobs are failing, make sure nox -rs format and nox -rs docs pass.

@sethmlarson
Copy link
Contributor

sethmlarson commented Aug 17, 2021

@kxbin Take a look at the change in #379 and adapt your PR to do essentially:

def iterrows():
    ... (setup)
    for hits in _search_yield_hits(...):
        df = _es_results_to_pandas(hits)
        df = self._post_process_...(df)
        yield from df.iterrows()

Perhaps you can even encapsulate the logic within (setup) but maybe that can be a separate PR. Focus on getting iterrows() and itertuples() integrated first :)

@kxbin kxbin deleted the branch elastic:master August 18, 2021 07:16
@kxbin kxbin closed this Aug 18, 2021
@kxbin kxbin deleted the master branch August 18, 2021 07:16
@th0ger
Copy link

th0ger commented Aug 18, 2021

I am thinking we can add a new parameter to to_pandas called batch_size which can take until 10k.
If given more than that. we throw a warning saying 10k is maximum and fetch results with max batch size

I think this would be useful, to allow raising scroll size from eland default 1000 to elastic maximum 10K.

@kxbin
Copy link
Contributor Author

kxbin commented Aug 18, 2021

I am thinking we can add a new parameter to to_pandas called batch_size which can take until 10k.
If given more than that. we throw a warning saying 10k is maximum and fetch results with max batch size

I think this would be useful, to allow raising scroll size from eland default 1000 to elastic maximum 10K.

Yeah, Really useful.

We have now adopted a better solution, we don't use batch_size now, and change to expose iterrows and itertuples method.

Then the user can implement his own batch_size in the dataframe iteration.

@th0ger
Copy link

th0ger commented Aug 18, 2021

Then the user can implement his own batch_size in the dataframe iteration.

Would you mind providing an example of how this should be called?

@kxbin
Copy link
Contributor Author

kxbin commented Aug 19, 2021

Then the user can implement his own batch_size in the dataframe iteration.

Would you mind providing an example of how this should be called?

Such like this:

ed_flights = ed.DataFrame('localhost:9200', 'flights')
batch_size = 10000

batch_series = []

for index, row in ed_flights.iterrows()
    batch_series.append(row)

    if index % batch_size == 0:
        batch_dataframe = pd.Dataframe(batch_series)
        batch_series = []
        # Then, we can use this batch_dataframe to do something we want

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Can we get batch data use df.to_pandas() in the case of big data?
5 participants