Skip to content

Commit

Permalink
Update README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Aug 3, 2023
1 parent 3b45315 commit d01c6b6
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

[![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/test/badge.svg)](https://github.com/bonnal-enzo/kioss/actions) [![Actions Status](https://github.com/bonnal-enzo/kioss/workflows/PyPI/badge.svg)](https://github.com/bonnal-enzo/kioss/actions)

Write concise and expressive definitions of ETL and Reverse ETL pipelines. This library has been specifically designed to be convenient for handling data integration from and to APIs, with features such as multithreading, rate limiting, batching, and exception handling.

## Install

`pip install kioss`
Expand Down Expand Up @@ -31,14 +33,14 @@ with open("/path/to/file.text", "r") as text_file:
# flatten the pipe to make it yield individual words
.flatten()
# log the advancement of this step
.log("parsed words")
.log(what="parsed words")

# parse the word to get the email domain in it if any
.map(lambda word: re.search(r"@([a-zA-Z0-9.-]+)", word).group(1))
# catch exception produced by non-email words and ignore them
.catch(AttributeError, ignore=True)
# log the advancement of this step
.log("parsed email domains")
.log(what="parsed email domains")

# batch the words into chucks of 500 words at most and not spanning over more than a 1 minute
.batch(size=500, secs=60)
Expand All @@ -48,7 +50,7 @@ with open("/path/to/file.text", "r") as text_file:
# flatten back to yield individual domains from a batch
.flatten()
# log the advancement of this step
.log("parsed email domains deduplicated by batch")
.log(what="parsed email domains deduplicated by batch")

# construct url from email domain
.map(lambda email_domain: f"https://{email_domain}")
Expand All @@ -57,11 +59,11 @@ with open("/path/to/file.text", "r") as text_file:
# limit requests to roughly 20 requests sent by second to avoid spam
.slow(freq=20)
# catch request errors without ignoring them this time:
# it means that the pipeline will yield the exception object encountered instead of raising it
.catch(requests.RequestException, ignore=False)
# log the advancement of this step
.log("domain responses")
.log(what="requests to domain")

# it means that the pipeline will yield the exception object encountered instead of raising it
.catch(requests.RequestException, ignore=False)
# get only errors, i.e. non-200 status codes or request exceptions (yielded by upstream because ignore=False)
.filter(lambda reponse: isinstance(reponse, requests.RequestException) or reponse.status_code != 200)
# iterate over the entire pipe but only store the 32 first errors
Expand All @@ -72,7 +74,7 @@ with open("/path/to/file.text", "r") as text_file:

## Features
- define:
- `Pipe`'s constructor takes an `Iterator[T]` or `Iterable[T]` object as data source, and the constructed pipe object is itself an `Iterator[T]` on which you can call any function working with iterators: `set`, `functools.reduce`, etc...
- `Pipe`'s constructor takes an `Iterator[T]` or `Iterable[T]` object as data source (The `Pipe` class extends itself `Iterator[T]`, so that on which you can call on a pipe object any function working with iterators: `set(pipe)`, `functools.reduce(func, pipe, inital)`, `itertools.islice(pipe, n_samples)`, etc...)
- `.map` over pipe's elements and yield the results as they arrive, optionally using multiple threads.
- `.flatten` a pipe, whose elements are assumed to be iterators, creating a new pipe with individual elements.
- `.filter` a pipe.
Expand Down

0 comments on commit d01c6b6

Please sign in to comment.