Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed May 3, 2021
1 parent 2ccef95 commit 0071187
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
- [**conditional**](#conditional) - Run parts of the flow based on the structure of the datapackage at the calling point
- [**finalizer**](#finalizer) - Call a function when all data had been processed
- [**checkpoint**](#checkpoint) - Cache results of a subflow in a datapackage and load it upon request
- [**parallelize**](#parallelize) - Run a row processor over multiple processes

### Manipulate row-by-row
- [**add_field**](#add_field) - Adds a column to the data
Expand Down Expand Up @@ -335,6 +336,42 @@ f.process()
f.process()
```

#### parallelize

Run a row processor over multiple processes, making to make better use of multiple cores and compensate for long i/o waits.

```python
def parallelize(row_func, num_processors=None, predicate=None, resources=None):
pass
```

- `row_func` - A function handling a single row in a resource, to be run in parallel in multiple processes
- `num_processors` - Number of processors to use. If not specified, will make an educated guess based on the current machine's architecture.
- `predicate` - A function which accepts a row and returns a boolean. If provided, only rows for which `predicate(row) is True` will be processed, others will be passed through unmodified.
- `resources` - Only apply the function on specific resources, same semantics as `load` processor `resources` argument

A few important notes regarding the `parallelize` processor -
- `row_func` runs in the context of a new process, so don't assume it has access to any global variables or state that were available in the main process
- Due to its parallel nature, rows might change their order in the output of this processor.
- `predicate` is an important tool in optimizing perforance - since it's more tiem consuming, we only want to pass rows to the worker processes if there's work to be done there. In fact, until the predicate returns `True` for the first time, no worker processes are even created.

Example:

```python
import requests

data = [dict(url=url) for url in list_of_urls]

def fetch(row):
row['data'] = requests.get(row['url']).content

Flow(
data,
parallelize(fetch), # Will fetch all data in parallel
dump_to_path('scraper')
).process()
```


### Manipulate row-by-row
#### add_field
Expand Down

0 comments on commit 0071187

Please sign in to comment.