In last files, we built a sequence of tasks that transformed a raw log file into a summarized CSV file.

The problem with the sequence of tasks is that they were written **statically**. That is, they were written for one specific purpose, and we had to declare a variable for each function call. This process is difficult to extend, meaning we're required to rewrite our pipeline process each time we want to add, or change new tasks.

Instead, we want to use a general purpose pipeline that makes it easy to build tasks and dependencies. Rather than calling a function, assigning the return value to an output variable, and then passing that variable to another function, we can use a general pipeline that works for all cases.

By the end of this file, we will learn to do all the above using our own `Pipeline` class:

![image.png](attachment:image.png)

This file will focus on introducing the concepts behind the `@pipeline.task()` syntax, how to combine object oriented design with the functional paradigm, and how to recreate the previous file's pipeline of tasks. To begin, we will start by introducing the last functional concept we'll need for our pipeline.

In the previous file, we wrote a `build_csv()` function that took in a Python file object, and then wrote out the file as a CSV format. A better way to write this is to accept either a file object, or a string filename. The benefit is that we can dynamically open a file, if given a filename, or use the given file object.

We can write this behavior using an **inner function**. An inner function is, non-surprisingly, a function within a function. Here's what this would look like:

In [3]:
import csv

lines = [[1,2,3], [4,5,6]]

def build_csv(lines, header=None, file=None):
    def open_file(f):
        # If it's a string, then open the file
        # and return the opened file.
        if isinstance(f, str):
            f = open(f, 'w')
        return f

    file = open_file(file)  # add inner function.
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

csv_file = build_csv(lines, file='example.csv')

The benefit of these inner functions is that they are **encapsulated** in the scope of the parent function. We cannot call `open_file()` outside of the `build_csv()` function! Since we will only use `open_file()` within `build_csv()`, this keeps the function isolated from the global scope of the program.

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

**Task**

![image.png](attachment:image.png)

**Answer**

In [5]:
def add(a, b):
    return a + b

def partial(func, *args):
    parent_args = args
    def inner(*inner_args):
        return func(*(parent_args + inner_args))
    return inner

add_two = partial(add, 2)
print(add_two(7))

9


![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

**Task**

![image.png](attachment:image.png)

**Answer**

In [6]:
def catch_error(func):
    def inner(*args):
        try:
            return func(*args)
        except Exception as e:
            return e
    return inner

@catch_error
def throws_error():
    raise Exception('Throws Error')
    
print(throws_error())

Throws Error


![image.png](attachment:image.png)

![image.png](attachment:image.png)

**Task**

![image.png](attachment:image.png)

**Answer**

In [7]:
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self):
        def inner(f):
            self.tasks.append(f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

print(pipeline.tasks)

[<function first_task at 0x0000011CB04D7A68>]


![image.png](attachment:image.png)

![image.png](attachment:image.png)

**Task**

![image.png](attachment:image.png)

**Answer**

In [8]:
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

print(pipeline.tasks)

[<function first_task at 0x0000011CB04D7E58>, <function second_task at 0x0000011CB04D7558>, <function last_task at 0x0000011CB04D7F78>]


![image.png](attachment:image.png)

Notice that we're only inputing, and outputing, a single argument and value for each task.

**Task**

![image.png](attachment:image.png)

**Answer**

In [9]:
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner
    
    def run(self, input_):
        output = input_
        for task in self.tasks:
            output = task(output)
        return output
    
pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

print(pipeline.run(20))

38


With the `pipeline.run()` method, we now have a fully functioning general pipeline! We can use this Pipeline class with any tasks that require a dependency ordering. Our previous files task pipeline fits this use case.

Using the functions we wrote in the previous file, we will rebuild the pipeline using the Pipeline class we wrote this file. Here's the functions we wrote in the previous file, and the sequence of executing the tasks:

In [14]:
from datetime import datetime
def parse_time(time_str):
    """
    Parses time in the format [30/Nov/2017:11:59:54 +0000]
    to a datetime object.
    """
    time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S %z]')
    return time_obj

def strip_quotes(s):
    return s.replace('"', '')

In [15]:
import itertools

def parse_log(log):
    for line in log:
        split_line = line.split()
        remote_addr = split_line[0]
        time_local = parse_time(split_line[3] + " " + split_line[4])
        request_type = strip_quotes(split_line[5])
        request_path = split_line[6]
        status = split_line[8]
        body_bytes_sent = split_line[9]
        http_referrer = strip_quotes(split_line[10])
        http_user_agent = strip_quotes(" ".join(split_line[11:]))
        yield (
            remote_addr, time_local, request_type, request_path,
            status, body_bytes_sent, http_referrer, http_user_agent
        )

def build_csv(lines, header=None, file=None):
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

def count_unique_request(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('request_type')

    uniques = {}
    for line in reader:

        if not uniques.get(line[idx]):
            uniques[line[idx]] = 0
        uniques[line[idx]] += 1
    return ((k, v) for k,v in uniques.items())

# Run the static tasks.
log = open('example_log.txt')
parsed = parse_log(log)
file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    file=file
)
uniques = count_unique_request(csv_file)
summarized_file = open('summarized.csv', 'r+')
summarized_csv = build_csv(uniques, header=['request_type', 'count'], file=summarized_file)

IndexError: list index out of range

Before we conclude, there's a helpful module that we can optionally use for file reading and writing. Instead of having to open a file, we can use the `StringIO` object [from the `io` module](https://docs.python.org/3/library/io.html). The `StringIO` object mimicks a file-like object that, instead of writing out to disk, keeps a file-like object in memory.

In [17]:
import io

log = open('example_log.txt')
parsed = parse_log(log)
csv_file = build_csv(
    parsed,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    # Using file-like object instead of `temporary.csv`.
    file=io.StringIO()
)
uniques = count_unique_request(csv_file)
summarized_csv = build_csv(
    uniques,
    header=['request_type', 'count'],
    # Using file-like object instead of `summarized.csv`.
    file=io.StringIO()
)
print(summarized_csv.readlines())

['request_type,count\r\n', 'PUT,3367\r\n', 'POST,3299\r\n', 'GET,3334\r\n']


![image.png](attachment:image.png)

**Task**

* Rebuild the previous file's pipeline using the `Pipeline` class.
    * The functions from the example code are provided .
    * The pipeline doesn't have to be identical. We can choose to `omit`, `combine`, or `add` on additional functions.
    * Use `io.StringIO()` as the file object in the `build_csv()` keyword argument.
* Call `pipeline.run()` on the `log` file.
* Assign the return value to the variable `summarized_csv`, and `print()` the results of `summarized_csv.readlines()`.

**Answer**

In [21]:
import io

class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner
    
    def run(self, input_):
        output = input_
        for task in self.tasks:
            output = task(output)
        return output
    
pipeline = Pipeline()

@pipeline.task()
def parse_logs(logs):
    return parse_log(logs)

@pipeline.task(depends_on=parse_logs)
def build_raw_csv(lines):
    return build_csv(lines, header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    file=io.StringIO())

@pipeline.task(depends_on=build_raw_csv)
def count_uniques(csv_file):
    return count_unique_request(csv_file)

@pipeline.task(depends_on=count_uniques)
def summarize_csv(lines):
    return build_csv(lines, header=['request_type', 'count'], file=io.StringIO())

log = open('example_log.txt')
summarized_file = pipeline.run(log)
print(summarized_file.readlines())

['request_type,count\r\n', 'PUT,3367\r\n', 'POST,3299\r\n', 'GET,3334\r\n']


In this file, we created a general purpose pipeline with single argument tasks. We learned about closures and function decorators which provided us with clean pipeline syntax. Finally, we rebuilt the previous file's pipeline using the `pipeline` class we wrote.

There are a couple drawbacks with our `pipeline` that we did not discuss. First, we can only input one argument for each task, and second, we only have a linear dependency mapping between the tasks. 

In the next file, we will expand on this `pipeline`, and enhance it with task scheduling, a proper dependency tree, and other options.