# Pipeline Design Pattern in Python

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Use-case" data-toc-modified-id="Use-case-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Use case</a></span></li><li><span><a href="#Key-elements" data-toc-modified-id="Key-elements-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Key elements</a></span></li><li><span><a href="#Define-classes" data-toc-modified-id="Define-classes-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Define classes</a></span><ul class="toc-item"><li><span><a href="#Data-objects" data-toc-modified-id="Data-objects-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Data objects</a></span></li><li><span><a href="#Processor-classes" data-toc-modified-id="Processor-classes-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Processor classes</a></span><ul class="toc-item"><li><span><a href="#Base-class" data-toc-modified-id="Base-class-3.2.1"><span class="toc-item-num">3.2.1&nbsp;&nbsp;</span>Base class</a></span></li><li><span><a href="#Actual-processors" data-toc-modified-id="Actual-processors-3.2.2"><span class="toc-item-num">3.2.2&nbsp;&nbsp;</span>Actual processors</a></span></li></ul></li><li><span><a href="#Pipeline-class" data-toc-modified-id="Pipeline-class-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Pipeline class</a></span></li></ul></li><li><span><a href="#Execute" data-toc-modified-id="Execute-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Execute</a></span><ul class="toc-item"><li><span><a href="#Set-up-Pipeline" data-toc-modified-id="Set-up-Pipeline-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>Set up Pipeline</a></span></li><li><span><a href="#Add-Domains-and-run" data-toc-modified-id="Add-Domains-and-run-4.2"><span class="toc-item-num">4.2&nbsp;&nbsp;</span>Add Domains and run</a></span></li><li><span><a href="#Possible-refinements" data-toc-modified-id="Possible-refinements-4.3"><span class="toc-item-num">4.3&nbsp;&nbsp;</span>Possible refinements</a></span></li></ul></li><li><span><a href="#Advantages" data-toc-modified-id="Advantages-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Advantages</a></span></li></ul></div>

## Use case

Perform a number of processing steps on multiple uniform data objects.

In data science, e.g. for data preprocessing or feature extractions.

Even though it's usually called 'pipeline', 'conveyor belt' would often be a more fitting name.

## Key elements
- **Data objects**
- **Processor classes** with uniform interface to perform the processing steps
- A **pipeline class**, which handles the processing

... like in `sklearn` ...

## Define classes

### Data objects

In [1]:
from dataclasses import dataclass  # for python < 3.7

@dataclass
class TextObject:
    text: str
    language: str = None
    sentiment: str = None

In [2]:
my_text = TextObject('This is an example for a text!')
my_text

TextObject(text='This is an example for a text!', sentiment=None, language=None)

### Processor classes

#### Base class

In [3]:
class Processor(object):
    def __init__(self):
        print(f"Initializing {self.__class__.__name__}")

#### Actual processors

In [4]:
import langdetect

class LanguageDetector(Processor):
    
    def process(self, text_object):
        detected_languages = langdetect.detect_langs(text_object.text)
        if len(detected_languages) > 0:
            text_object.language = sorted(detected_languages, key=lambda x: x.prob, reverse=True)[0].lang
            

In [5]:
import textblob

class SentimentDetector(Processor):
        
    def process(self, text_object):
        if text_object.language == 'en':
            text_object.sentiment = textblob.TextBlob(text_object.text).sentiment.polarity

### Pipeline class

In [6]:
@dataclass
class Pipeline(object):
    processors: list

    def run(self, texts):
        """Iterate over all texts and apply processors"""
        for text in texts:
            for processor in processors:
                processor.process(text)

## Execute

### Set up Pipeline

In [7]:
processors = [
    LanguageDetector(),
    SentimentDetector(),
]

pipeline = Pipeline(processors)

Initializing LanguageDetector
Initializing SentimentDetector


### Add Domains and run

In [8]:
texts = [
    TextObject("Hello World. I am happy. Today is a good day."),
    TextObject('Dieser Satz ist auf Deutsch.'),
]

pipeline.run(texts)

In [9]:
texts

[TextObject(text='Hello World. I am happy. Today is a good day.', sentiment=0.75, language='en'),
 TextObject(text='Dieser Satz ist auf Deutsch.', sentiment=None, language='de')]

### Possible refinements
- process method return code, stop further processing -> Filter
- Check which processors have already run
- separate raw / added data

## Advantages
- Easily remove/add processing steps or features
- Same steps/processors for training and model application
- Modularity
- Readibility
- Testing
- Single responsibiliy
- High-level structure
- Easily parallelizable