Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Bag Error: missing 4 required positional arguments #3677

Closed
rclough opened this issue Jun 27, 2018 · 2 comments
Closed

Dask Bag Error: missing 4 required positional arguments #3677

rclough opened this issue Jun 27, 2018 · 2 comments
Labels
needs info Needs further information from the user

Comments

@rclough
Copy link

rclough commented Jun 27, 2018

Python Version: 3.5.2
OS: Ubuntu 16.04
Dask version: 0.18.1
Running in Jupyter Lab 0.32.1

Description: I have some dask code I wrote a while ago that used to work that doesn't seem to be working anymore. I reran each step and found the problematic step, and tried to pare down the example, but the code does involve a 3rd party client, but the error reported back is entirely from Dask from what I can tell. The odd part is the code runs fine for a small subset of my dataset, but when I run it on all entries, it errors out within about 10-20 seconds.

I've run it with all 3 schedulers: multiprocess, threaded, and single thread, same issue. Running the function manually (single threaded without Dask) seems to work fine.

Googling the error was pretty fruitless, I couldnt find a dask example and most errors are for generic python errors that seem mostly unrelated.

Error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-9-7612796cccf7> in <module>()
      1 with ProgressBar():
----> 2     output = 3rdparty_jsons.compute()

/usr/local/lib/python3.5/dist-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/usr/local/lib/python3.5/dist-packages/dask/base.py in compute(*args, **kwargs)
    400     keys = [x.__dask_keys__() for x in collections]
    401     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 402     results = schedule(dsk, keys, **kwargs)
    403     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    404 

/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
    175                            get_id=_process_get_id, dumps=dumps, loads=loads,
    176                            pack_exception=pack_exception,
--> 177                            raise_exception=reraise, **kwargs)
    178     finally:
    179         if cleanup:

/usr/local/lib/python3.5/dist-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    512                 key, res_info, failed = queue_get(queue)
    513                 if failed:
--> 514                     exc, tb = loads(res_info)
    515                     if rerun_exceptions_locally:
    516                         data = dict((dep, state['cache'][dep])

TypeError: __init__() missing 4 required positional arguments: 'params', 'method', 'response', and 'api'

A brief text description of the code:

  1. Start with a Dask bag of about 80k image URLs (these came from other dask tasks, but the problem can be replicated with just this list of URLs)
  2. Run a map function on the bag that takes a URL, uses a 3rd party client to make a request with that URL, and returns the JSON response from the client

Main Code:

import pickle
import wrapper # utility wrapper for 3rd party client

import dask.bag as db
from dask.cache import Cache
from dask.diagnostics import ProgressBar
cache = Cache(2e9)
cache.register()

# Load image list
with open('./data/hurst-image-links.pickle', 'rb') as f:
    image_links_list = pickle.load(f)
image_links = db.from_sequence(image_links_list)

# Set up computation
3rdparty_jsons = image_links.map(wrapper.get_json)

with ProgressBar():
    output = 3rdparty_jsons.compute()

wrapper.py:

import 3rdparty_lib

def get_json(file_url):
    '''
    Get the the JSON response from 3rd party API given a link to an image.
    file_url - link to image to process
    returns - response JSON from 3rd party
    '''
    if file_url is None:
        return None
    
    image = 3rdparty_lib.Image(url=file_url)
    
    client = get_3rdparty_client() # Handles globals/initializes the client if not already in globals
    prediction = client.predict([image])
    
    return prediction['results']

It's worth noting again that running the same code but without dask in the mix works fine (no exceptions), albiet much slower due to single-threading. This code also worked fine at some point on a different dataset of about 7k URLs. Any insight is appreciated. Also if theres a better way to be doing what I'm doing let me know, since I realize its a bit of bastardization from the strictly scientific computing use case, but we're using a 3rd party API to get predictions and using those in conjunction with more "usual" dask type tasks, and internet IO should benefit from parallelization I figure.

@TomAugspurger
Copy link
Member

@rclough sorry for the delay, are you able to provide a minimal example that demonstrates the issue? http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

@TomAugspurger TomAugspurger added the needs info Needs further information from the user label Apr 30, 2019
@jrbourbeau
Copy link
Member

Closing due to the current lack of a reproducible example. @rclough please feel free to re-open this issue if you're able to provide a minimal example

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

3 participants