Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid calling nbytes multiple times #3349

Merged
merged 1 commit into from Jan 1, 2020
Merged

avoid calling nbytes multiple times #3349

merged 1 commit into from Jan 1, 2020

Conversation

mmohrhard
Copy link
Contributor

Tornado has been required to be 5.0+ for quite some time so the _iostream_allows_memoryview is always True.

At least in one of our cases with a custom pickle5 serializer we managed to spent up to 20% in the nbytes call for collections with hundred thousands of small numpy arrays and therefore many frames.

@mrocklin
Copy link
Member

mrocklin commented Jan 1, 2020

Thanks @mmohrhard . This looks good to me. I'm merging it in.

I appreciate that you identified an issue and resolved it so smoothly. Merging. I also notice that this is your first code contribution to this repository. Welcome!

I'm curious, how were you profiling this? Were you using the cProfile module, or Dask's internal statistical profiler?

I'm also curious to learn more about your experiences with the pickle 5 protocol so far.

@mrocklin mrocklin merged commit fd45121 into dask:master Jan 1, 2020
@mmohrhard
Copy link
Contributor Author

We automatically generate profiling information by cPickle when the IOLoop is unresponsive for more than a few seconds. This has helped us identify quite a number of performance issues in our modifications to the scheduler. At least for us the penalty from constantly running profiling on the IOLoop thread has easily been paid off by being able to identify performance problems in production jobs.

pickle5 has helped us replace all custom serializations and we have seen huge benefits especially from numpy's PickleBuffer support. Especially for numpy arrays nested in several layers of collections, which is quite common for our code, we see significantly better performance.

@mrocklin
Copy link
Member

mrocklin commented Jan 5, 2020

Thanks for the additional information @mmohrhard

We automatically generate profiling information by cPickle when the IOLoop is unresponsive for more than a few seconds

I'm very interested in this. Is this something that you can share more about. Do you think that it would make sense to upstream? Also, are you familiar with Dask's statistical profiler?

pickle5 has helped us replace all custom serializations and we have seen huge benefits especially from numpy's PickleBuffer support

I'm a little surprised by this. My guess was that our special handling of Numpy would be at least as good as Pickle5 would be. Again, do you think that this is something that would make sense to contribute upstream?

@mmohrhard
Copy link
Contributor Author

Thanks for the additional information @mmohrhard

We automatically generate profiling information by cPickle when the IOLoop is unresponsive for more than a few seconds

I'm very interested in this. Is this something that you can share more about. Do you think that it would make sense to upstream? Also, are you familiar with Dask's statistical profiler?

Nothing fancy here. The writing of the profiles should obviously not happen in the IOLoop (especially as in our case disk access can hang for an arbitrary amount of time):

    def _measure_tick(self):
        now = time()
        diff = now - self._last_tick
        self._last_tick = now
        if self.profile:
            self.profile.disable()

        if diff > self.tick_maximum_delay:
            logger.info(
                "Event loop was unresponsive in %s for %.2fs.  "
                "This is often caused by long-running GIL-holding "
                "functions or moving large chunks of data. "
                "This can cause timeouts and instability.",
                type(self).__name__,
                diff,
            )
            if self.profile:
                self.profile.create_stats()
                self.profile_thread_pool.submit(
                    write_profile,
                    file_name=self.profile_path
                    % (
                        str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")),
                        diff * 1000,
                    ),
                    profile=self.profile,
                )

        if self.digests is not None:
            self.digests["tick-duration"].add(diff)

        if self.profile_path:
            self.profile = cProfile.Profile()
            self.profile.enable()

and in the init something like:

        self.profile = None
        if debug_dir:
            assert debug_id
            self.profile_path = os.path.join(
                debug_dir, debug_id + "-profile-%s-blocked-%dms.pstats"
            )
            self.profile_thread_pool = ThreadPoolExecutor(max_workers=1)
        else:
            self.profile_path = None

pickle5 has helped us replace all custom serializations and we have seen huge benefits especially from numpy's PickleBuffer support

I'm a little surprised by this. My guess was that our special handling of Numpy would be at least as good as Pickle5 would be. Again, do you think that this is something that would make sense to contribute upstream?

I don't think in its current form it would be too helpful. We basically replaced all serializations as we did not have to worry about anything except for numpy.

As an example why pickle5 is significantly more advanced:

In [1]: import numpy as np                                                                                                                                                                                                                    

In [2]: import distributed.protocol.numpy                                                                                                                                                                                                     

In [3]: from distributed.protocol.serialize import serialize  

In [10]: a = [np.arange(i) for i in range(6)]                                                                                                                                                                                                 

In [11]: serialize(a, serializers=("dask", "pickle", "numpy"))                                                                                                                                                                                
Out[11]: 
({'serializer': 'pickle'},
 [b'\x80\x04\x95\x9c\x01\x00\x00\x00\x00\x00\x00]\x94(\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01b\x94\x87\x94R\x94(K\x01K\x00\x85\x94h\x04\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x89C\x00\x94t\x94bh\x03h\x06K\x00\x85\x94h\x08\x87\x94R\x94(K\x01K\x01\x85\x94h\x10\x89C\x08\x00\x00\x00\x00\x00\x00\x00\x00\x94t\x94bh\x03h\x06K\x00\x85\x94h\x08\x87\x94R\x94(K\x01K\x02\x85\x94h\x10\x89C\x10\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x94t\x94bh\x03h\x06K\x00\x85\x94h\x08\x87\x94R\x94(K\x01K\x03\x85\x94h\x10\x89C\x18\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x94t\x94bh\x03h\x06K\x00\x85\x94h\x08\x87\x94R\x94(K\x01K\x04\x85\x94h\x10\x89C \x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x94t\x94bh\x03h\x06K\x00\x85\x94h\x08\x87\x94R\x94(K\x01K\x05\x85\x94h\x10\x89C(\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x94t\x94be.'])

In [12]: a = [np.arange(i) for i in range(5)]                                                                                                                                                                                                 

In [13]: serialize(a, serializers=("dask", "pickle", "numpy"))                                                                                                                                                                                
Out[13]: 
({'sub-headers': [{'dtype': (0, '<i8'),
    'shape': (0,),
    'strides': (8,),
    'lengths': [0],
    'type': 'numpy.ndarray',
    'type-serialized': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.',
    'serializer': 'dask'},
   {'dtype': (0, '<i8'),
    'shape': (1,),
    'strides': (8,),
    'lengths': [8],
    'type': 'numpy.ndarray',
    'type-serialized': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.',
    'serializer': 'dask'},
   {'dtype': (0, '<i8'),
    'shape': (2,),
    'strides': (8,),
    'lengths': [16],
    'type': 'numpy.ndarray',
    'type-serialized': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.',
    'serializer': 'dask'},
   {'dtype': (0, '<i8'),
    'shape': (3,),
    'strides': (8,),
    'lengths': [24],
    'type': 'numpy.ndarray',
    'type-serialized': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.',
    'serializer': 'dask'},
   {'dtype': (0, '<i8'),
    'shape': (4,),
    'strides': (8,),
    'lengths': [32],
    'type': 'numpy.ndarray',
    'type-serialized': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.',
    'serializer': 'dask'}],
  'is-collection': True,
  'frame-lengths': [1, 1, 1, 1, 1],
  'type-serialized': 'list'},
 [<memory at 0x7f4be7bd0460>,
  <memory at 0x7f4be7bd07a0>,
  <memory at 0x7f4be7bd0870>,
  <memory at 0x7f4be7bd0940>,
  <memory at 0x7f4be7bd0a10>])

Whereas pickle5 as part of normal pickling will automatically create something quite similar to the 5 element version for any form and size of the pickled collection. This is a direct consequence of distributed/protocol/serialize.py:147 and is a limitation of having to traverse the serialized object outside of the normal pickling infrastructure.
I suppose that as soon as python 3.6 can be used as baseline distributed could consider replacing at least some parts of the pickling infrastructure with pickle5 (We have been using https://github.com/pitrou/pickle5-backport on pickle 3.7 and in the past 3.6 quite successfully). Without dropping custom serialization it will never be as efficient as a pure pickle5 solution but would at least cover more cases.

@mrocklin
Copy link
Member

mrocklin commented Jan 6, 2020

Yes, ok, that makes sense to me. Thank you for taking the time to explain your motivation and context. That was very interesting!

bnaul pushed a commit to replicahq/distributed that referenced this pull request Feb 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants