## [[Discourse] How do you pipe multiple arguments in a dask bag pipeline? #177](https://github.com/coiled/dask-community/issues/177)

In [1]:
import dask.bag as db

In [2]:
def download(x):
    return x + " data "

def process_data(x):
    if x == "test_none data ":
        return None
    else:
        return x + "processed data "

def save_for_url(x, y):
    return (x, y + "saved")

In [3]:
# from functools import wraps

# def star(f):
#     @wraps(f)
#     def wrapper(*args, **kwargs):
#         return f(*args[0], *args[1:], **kwargs)
#     return wrapper

In [3]:
urls = ['test_none', 'dask.org', 'docs.dask.org', 'exampels.dask.org']

In [None]:
# TypeError: <lambda>() missing 1 required positional argument: 'data'

# (db
#  .from_sequence(urls)
#  .map(lambda url: (url, download(url)))
#  .map(lambda url, data: (url, process_data(data)))
#  .filter(lambda url, result: result is not None)
#  .map(lambda url, result: save_for_url(url, result))
#  ).compute()

In [None]:
# Using `Bag.starmap()`

(db
 .from_sequence(urls)
 .map(lambda url: (url, download(url)))
 .starmap(lambda url, data: (url, process_data(data)))
 .filter(lambda x: x[1] is not None)
 .starmap(lambda url, result: save_for_url(url, result))
 ).compute()

In [None]:
# With star() -- works

# (db
#  .from_sequence(urls)
#  .map(lambda url: (url, download(url)))
#  .map(star(lambda url, data: (url, process_data(data))))
#  .filter(star(lambda url, result: result is not None))
#  .map(star(lambda url, result: save_for_url(url, result)))
#  ).compute()

In [4]:
# Using `Bag.zip()`

urls_bag = db.from_sequence(urls)

In [5]:
processed_urls_bag = (urls_bag
                      .map(lambda url: download(url))
                      .map(lambda data: process_data(data))
                      .filter(lambda result: result is not None)
                     )

In [6]:
processed_urls_bag.compute()

['dask.org data processed data ',
 'docs.dask.org data processed data ',
 'exampels.dask.org data processed data ']

In [7]:
urls_bag.compute()

['test_none', 'dask.org', 'docs.dask.org', 'exampels.dask.org']

In [26]:
db.zip(urls_bag, processed_urls_bag).starmap(lambda url, result: save_for_url(url, result)).compute()

[('dask.org', 'dask.org data processed data saved'),
 ('docs.dask.org', 'docs.dask.org data processed data saved'),
 ('exampels.dask.org', 'exampels.dask.org data processed data saved')]

In [9]:
list(zip(urls_bag.compute(), processed_urls_bag.compute()))

[('test_none', 'dask.org data processed data '),
 ('dask.org', 'docs.dask.org data processed data '),
 ('docs.dask.org', 'exampels.dask.org data processed data ')]

In [4]:
# Super-minimal reproducer

l = [("a", "b"), ("c", "d")]

l_bag = db.from_sequence(l)

# TypeError: <lambda>() missing 1 required positional argument: 'y'
l_bag.map(lambda x, y: x + y).compute()

# With start() -- works!
l_bag.map(star(lambda x, y: x + y)).compute()

In [5]:
l_bag.starmap(lambda x, y: x + y).compute()

['ab', 'cd']

In [None]:
# create minimal example for zip/filter

In [5]:
import dask.bag as db

l1 = ['a', 'b', 'c', 'd']
l2 = ['e', 'f', 'g']

l1_bag = db.from_sequence(l1)
l2_bag = db.from_sequence(l2)

list(zip(l1, l2)) # Output: [('a', 'e'), ('b', 'f'), ('c', 'g')]

db.zip(l1_bag, l2_bag).compute() # AssertionError

l3_bag = l1_bag.filter(lambda x: x == 'c' or x == 'd')
l3_bag.compute()

db.zip(l1_bag, l3_bag).compute() # Output: [('c', 'c'), ('d', 'd')]

In [25]:
l3_bag.compute()

['c', 'd']