Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safety #91

Closed
FilippoBoido opened this issue Apr 12, 2023 · 9 comments
Closed

Thread safety #91

FilippoBoido opened this issue Apr 12, 2023 · 9 comments

Comments

@FilippoBoido
Copy link

FilippoBoido commented Apr 12, 2023

I have several threads sending requests with the requests library and streaming the response.content into ijson.items
Should I thread lock every call to ijson.items(response.content, prefix)?
I'm having issues with an (0xC0000005) error and I'm trying to find out the cause.
Thanks!

@rtobar
Copy link

rtobar commented Apr 13, 2023

@FilippoBoido could you please provide more context to your issue? Code example, environment, python/ijson versions, backend, stack traces, etc...

With the limited information you've provided I can only guess at this point. So my guess is that you are seeing a segmentation fault, that you are using the yajl2_c backend, and that there is indeed some thread safety issue with ijson and/or the underlying yajl library.

I don't remember from the top of my head, but I'd guess ijson itself is thread-safe: we don't release the GIL in our C extension, and I think we don't keep any global state, but I might be wrong. Again, more information would be greatly helpful. It could as well be that the yajl library is not thread safe, in which case there's not much to do other than make your ijson calls exclusive.

Note however that there are other (probably better) alternatives to your use case. Note that we don't release the GIL, even in our C extension, so you can't expect multiple threads to run much faster than a single one (depends exactly on how you wrote your code, so again we'd need to see some sample code). If you are issuing multiple requests concurrently you might be better off using an async http client library, and using the async support that ijson provides, so you run all your requests from a single thread. In case you really are CPU bound instead of network or I/O bound, then you can consider using multiprocessing to handle multiple requests in different processes.

Edit: re-inforced the fact that we don't release the GIL so not much speedup can be expected from multiple threads.

@FilippoBoido
Copy link
Author

@rtobar thanks for the fast reply and your willingness to discuss my problem.
I'm using python 3.11 and this is a code example how I use ijson:

def stream_items_from_tracker_single_page_with_query(self, page: int, tracker_id: int, project_id: int):
      endpoint = self.GET_TRACKER_ITEMS_WITH_QUERY
      queryString = f"project.id in ({project_id}) AND tracker.id IN ({tracker_id})"
      response = self.retry_request_with_auth_obj(
          'GET',
          self._server + endpoint,
          self._authenticator,
          params={
              'pageSize': 500,
              'page': page,
              'queryString': queryString},
          log_hash=hash(queryString),
          stream=True
      )
      return ijson.items(response.content, 'items.item')

This method get's called by many threads and the threads are managed by the ThreadPoolExecutor.

Here you can see a windows stack trace since the application crashes without a python exception stack trace:

crash

After I wrote you yesterday I tried to use the python backend, which doesn't crash but consumes a huge amount of memory.
The high memory consumption of other json libraries was the reason that urged me to switch to ijson, so unfortunately the ijson python backend is not an option for me.
Regarding the async adoption, I was expecting you to suggest that, which in my case means a refactoring of the applications architecture and is coupled with a lot of work.
It seems that switching from a ThreadPoolExecutor architecture to an async one is the way to go with this library and I will probably implement it in order to cut the memory utilization of the application.

@rtobar
Copy link

rtobar commented Apr 13, 2023

Thanks @FilippoBoido for the further details. The error window indicates that it's indeed the yajl2_c backend that produces the crash. That and your example code and explanation confirm that the initial guess I ventured was pretty much spot on. Does the "Details" tab of that error window give any further information?

I tried reproducing your error, see this small script. It doesn't read data from the network, but from a local file instead, but it's otherwise a similar situation:

import ijson
import sys

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def do_stuff(_):
    with open(sys.argv[1]) as f:
        for item in ijson.items(f, sys.argv[2]):
            pass

print(f"Using {ijson.backend} backend")
executor = ProcessPoolExecutor() if sys.argv[3] == 'proc' else ThreadPoolExecutor()
results = list(executor.map(do_stuff, range(100)))
executor.shutdown()

I then ran it with these configurations (Linux, AMD Ryzen 7 5825U with 8 cores, 2 threads per core):

$> du -hs ~/downloads/large-file.json 
25M     /home/rtobar/downloads/large-file.json
$> time python3.11 ~/parallel.py ~/downloads/large-file.json item thread
Using yajl2_c backend
38.59user 2.41system 0:38.37elapsed 106%CPU (0avgtext+0avgdata 161128maxresident)k
8inputs+0outputs (1major+375481minor)pagefaults 0swaps
$> time python3.11 ~/parallel.py ~/downloads/large-file.json item proc
Using yajl2_c backend
40.79user 1.23system 0:02.83elapsed 1484%CPU (0avgtext+0avgdata 19176maxresident)k
0inputs+0outputs (0major+472417minor)pagefaults 0swaps

Note that:

  • I can't really reproduce the error in the multi-threaded case. Maybe the issue only happens in Windows?
  • When using multiple threads you can see that there's no real speed up (38.59 user v/s 0:38.37 elapsed times). This is what I was mentioning earlier: even in the C extension we never release the GIL, so it will be impossible to speed things up for ijson this way. You might be getting some speedups because your downloads are happening in parallel though (reading data from a socket in python does release the GIL).
  • Again as expected, the multi-process solution does speed things up (40.79 user v/s 2.83 elapsed --> ~14x speedup in my system with 16 hardware threads).

I also just thought that you until now you assumed it was the multi-threading aspect that was causing crashes. Have you tried testing with a ThreadPoolExecutor(max_workers=1) to check if the crash happens? If it does then it means there's something else at play here.

Also, just double-checking: you are using the latest version of ijson, right?

@FilippoBoido
Copy link
Author

FilippoBoido commented Apr 13, 2023

Thanks @rtobar for your insightful testing.
Indeed I'm testing the yajl2 backend with 1 thread since this morning and I've not experienced any crashes since then.
I assume now that the reason may be that when using many threads to fetch data and streaming it into ijson, the memory of my host is exhausted pretty quickly which leads to the c extension not being able to allocate more memory, which leads to windows killing the process.

I will report back if I find a solution or a way around this problem.

P.S.:
The details tab doesn't show more insights to the problem..
I'm using the latest ijson version.

@rtobar
Copy link

rtobar commented Apr 13, 2023

Interesting... the whole idea of using ijson is to avoid exhausting memory in the first place. How are you using the results from your stream_items_from_tracker_single_page_with_query method? If you are doing any kind of accumulation of data then your memory will indeed grow, but if you're simply aggregating and reducing data then you shouldn't see your memory usage grow. Yes, more threads mean a higher memory requirement, but shouldn't be a problem if you're correctly iterating over the stream instead of accumulating results in any way.

yajl2 backend with 1 thread

You mean the yajl2 or the yajl2_c backend? Just trying to figure out how many things are different between your program working or not. If you're still using the yajl2_c backend then this could still be a thread safety issue.

I'll try more experiments around the idea of exhausting memory and see if I can reproduce the crash, but unfortunately until we get a proper stacktrsce we can only do (educated) guesses.

@FilippoBoido
Copy link
Author

This is an intermediate recap of what I've found out:
I think the memory exhaustion problem is due to the ThreadPoolExecutor not releasing the memory until all operations associated with it are done. See following link -> memory usage with ThreadPoolExecutor

You mean the yajl2 or the yajl2_c backend?

Yes, I'm using the yajl2_c backend.

How are you using the results from your stream_items_from_tracker_single_page_with_query method?

for generator in generator_list:
    for item in generator:
        try:
            if item['typeName'] in item_types:
                result.append(item)
                detail[item['typeName']] += 1
        except (KeyError, TypeError):
            continue

I have a list of objects called generator_list and each of this objects is the return value of ijson.items.
The generator_list is the return value of the ThreadPoolExecutor operation but this piece of code is itself called inside a thread of another ThreadPoolExecutor.
I then iterate through all objects returned by ijson.items and iterate again for all items yielded by the the object.
From my measurements, this line for item in generator: is the one that adds up to the memory bloat to the Gigabyte level.
This line result.append(item) is negligible. The result list is just a couple of megabytes.

@rtobar
Copy link

rtobar commented Apr 13, 2023

@FilippoBoido thanks for another piece of the puzzle, I feel like all of this is all making sense now.

There are two main issues I'm seeing here now.

The first one I should have seen earlier, but I'm not a big requests user myself, hence I hadn't spotted it. The problem is that you are creating a request with stream=True, but still reading the contents with response.content, which reads the entire response into memory. This means you are holding in memory the full JSON documents you are planning to iterate over. What you probably want to do instead is to use response.iter_content and wrap it in a file-like object to give it to ijson (there's .#44 to allow users to provide generators as sources of data, but it hasn't been implemented). See #57 (comment) for more reference.

The second one is that you mention there are two thread pool executors involved. In the first, say pool1, you do the requests and call ijson.items, and in another one, pool2, you go through this second loop of for generator in generator_list. The code looks then something like (just paraphrasing, I'm sure it's widely different):

all_generators = pool1.map(stream_items_from_tracker_single_page_with_query, urls)
...
pool2.map(iterate_over_generators, all_generators)

The issue with this is the following: by the end pool1 has returned, all_generators contains the ijson.items generators, and because of how you are using requests, we know that each of those generators contain the full JSON document that has been downloaded in pool1. That could potentially be a lot of memory already in those response.content objects alone. Then in pool2 you iterate over the generators, and start the actual process of parsing items out of the JSON documents. That parsing needs more memory (hopefully not as much as the whole JSON document probably -- it depends on how big the individual items you are extracting are -- but still some). Finally, when you are done with a generator you are not releasing it either, meaning that the full JSON document you just parsed is still in memory.

I think in an ideal scenario you'd want to fuse the work you're doing in both pools into one so that you can fully stream the HTTP responses into ijson and then reduce your memory footprint for real. Something like:

def do_everything():
    result = []
    detail = {}
    response = self.retry_request_with_auth_obj(...., stream=True)
    f = file_like_object_from_generator(response.iter_content)  # because #44 hasn't been implemented
    for item in ijson.items(f, 'items.item'):
        try:
            if item['typeName'] in item_types:
                result.append(item)
                detail[item['typeName']] += 1
        except (KeyError, TypeError):
            continue
    return result, detail

results_and_details = pool.map(do_everything, ...)
# now merge all results and details

You can then even use a process pool executor instead of a threaded one for improved parallelisation.

@FilippoBoido
Copy link
Author

@rtobar Great insight, thank you!

What you probably want to do instead is to use response.iter_content and wrap it in a file-like object

That makes sense now. I've been busy implementing async methods replicating the current operations done with the ThreadPoolExecutor, so that we can see if the garbage collector is able to release the memory unlike the current implementation with the ThreadPool. A couple of lines after the snippet I shared I clear the generator_list containing the ijson.items and call the gc, which theoretically should free up the memory, but it doesn't.
The Gigabytes of memory are only released after pool2 is finished with all operations its tasked with, which do not only entail iterating through the generators.
I will make sure to create a file like object from the response.iter_content and reach back to update the thread.

P.S.: You have a typo in the snippet you wrote -> for item in ijson.items(f, 'items.item'):

@FilippoBoido
Copy link
Author

I rewrote the routines to make use of the async framework instead of the ThreadPoolExecutor and yield the json payload from files with an async for item in ijson.items(f, "items.item") loop.
By doing so I reduced the memory consumption to 1/7 during peak utilization and I didn't experience any crashes since then.
Thanks for maintaining this amazing library and for your support @rtobar!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants