# Functional Programming: Map, Filter, Reduce

1. Write two functions:
    * read(): takes in a filename, reads the file, then returns a list of lines from the file.
    * count(): takes in a list, and returns its length.
2. Call read() on the example_log.txt file, and assign the return value to a example_lines variable.
3. Call count() on example_lines, and assign the return value to the lines_count variable.

In [2]:
def read(filename):
    with open(filename, 'r') as f:
        return [line for line in f]
    
def count(lst):
    return len(lst)

example_lines = read("example_log.txt")
lines_count = count(example_lines)

print(example_lines[0])
print(lines_count)

200.155.108.44 - - [30/Nov/2017:11:59:54 +0000] "PUT /categories/categories/categories HTTP/1.1" 401 963 "http://www.yates.com/list/tags/category/" "Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"

10000


### Instead of the def syntax for function declaration, we can use a lambda expression to write Python functions. The lambda syntax closely follows the def syntax, but it's not a 1-to-1 mapping. Here's an example of building a function that adds two numbers:

In [3]:
# Using `def` (old way).
def old_add(a, b):
    return a + b

# Using `lambda` (new way).
new_add = lambda a, b: a + b

print(old_add(10, 5) == new_add(10, 5))

True


The lambda expression takes in comma-separated sequences of inputs (like def). Then, immediately following the colon, it returns the expression without using an explicit return statement. Finally, when assigning the lambda expression to a variable, it acts exactly like a Python function, and we can call it using the function call syntax: new_add().

If we didn't assign lambda to a variable name, it would be called an anonymous function. These anonymous functions are extremely helpful, especially when using them as an input for another function. For example, the sorted() function takes in an optional key argument (a function) that describes how the items in a list should be sorted.

In [4]:
unsorted = [('b', 6), ('a', 10), ('d', 0), ('c', 4)]

# Sort on the second tuple value (the integer).
print(sorted(unsorted, key=lambda x: x[1]))

[('d', 0), ('c', 4), ('b', 6), ('a', 10)]


1. Call sorted on the lines variable and in the key argument sort on the following:
    * Split the line on empty spaces ' '
    * Return the 6th element on the split line
2. Assign the sorted return value to a sorted_lines variable.
3. Print the sorted_lines variable

In [6]:
def read(filename):
    with open(filename, 'r') as f:
        return [line for line in f]
    
lines = read('example_log.txt')

sorted_lines = sorted(lines, key=lambda x: x.split(' ')[5])
print(sorted_lines[0])

233.154.7.24 - - [30/Nov/2017:11:59:54 +0000] "GET /app HTTP/1.1" 404 526 "http://www.cherry.com/main.htm" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5360 (KHTML, like Gecko) Chrome/13.0.839.0 Safari/5360"



**First-class functions** - The ability to pass in functions as arguments isn't unique to Python.

There are a set of important first-class functions that are common within the functional paradigm. These functions take in a Python iterable, and, like sorted, apply a function for each element in the list.

The first function we'll work with is the map() function. The map() function takes in an iterable (i.e., list), and creates a new iterable object: a special map object. The first-class function applies to every element in the new object. 

Here's how we could use map() to add 10 or 20 to every element in a list:

In [8]:
values = [1, 2, 3, 4, 5]

# Note: We convert the returned map object to
# a list data structure.
add_10 = list(map(lambda x: x + 10, values))
add_20 = list(map(lambda x: x + 20, values))

print(add_10)
print(add_20)

[11, 12, 13, 14, 15]
[21, 22, 23, 24, 25]


It's important to cast the return value from map() as a list object. Using the returned map object is difficult to work with if you're expecting it to function like a list. Printing it doesn't show each of its items, and you can only iterate over it once.

1. Map each line in the lines variable to its corresponding IP address:
    * Split the line on empty spaces ' '
    * Return the first element on the split line
2. Cast the mapped object to a list, and assign it to the ip_addresses variable.
3. Print the ip_addresses variable.

In [9]:
lines = read('example_log.txt')

ip_addresses = list(map(lambda x: x.split(' ')[0], lines))

print(ip_addresses[:5])

['200.155.108.44', '36.139.255.202', '50.112.115.219', '204.132.56.4', '233.154.7.24']


The second function we'll work with is the filter() function. The filter() function takes in an iterable, creates a new iterable object (again, a special map object), and a first-class function that must return a bool value. The new map object is a filtered iterable of all the elements that returned True.

Here's how we could filter odd or even values from a list:

In [11]:
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Note: We convert the returned filter object to
# a list data structure.
even = list(filter(lambda x: x % 2 == 0, values))
odd = list(filter(lambda x: x % 2 == 1, values))

print(even)
print(odd)

[2, 4, 6, 8, 10]
[1, 3, 5, 7, 9]


1. Filter each line in the ip_addresses list to IP addresses that begin with less than or equal to 20.
2. Cast the filtered object to a list, and assign it to the filtered_ips variable.
3. Print the filtered_ips variable.


In [12]:
x = '200.155.108.44'
x[:3]

'200'

In [13]:
lines = read('example_log.txt')
ip_addresses = list(map(lambda x: x.split()[0], lines))

filtered_ips = list(filter(lambda x: int(x.split('.')[0]) <= 20, ip_addresses))

print(filtered_ips[:5])

['4.31.18.29', '5.237.70.145', '4.186.143.85', '7.205.198.134', '2.98.108.99']


The last function we'll look at is the reduce() function from the functools package. The reduce() function takes in a function and an iterable object such as a list. It will then reduce the list to a single value by successively applying the given function. It will first apply it on the first two elements and replace them with the result. Then it will apply the function on the first result and the next element and so on until a single value remains.

In [14]:
from functools import reduce

values = [1, 2, 3, 4]

summed = reduce(lambda a, b: a + b, values)
print(summed)

10


Note that you do not have to operate on the second value in the lambda expression. For example, you can write a function that always returns the first value of an iterable:

In [15]:
from functools import reduce

values = [1, 2, 3, 4, 5]

# By convention, we add `_` as a placeholder for an input
# we do not use.
first_value = reduce(lambda a, _: a, values)
print(first_value)

1


Another important aspect of reduce is that the lambda function doesn't necessarily need to return a value that is the same type as the inputs. Imagine that we have a list of words and that we want to use ```reduce()``` to add all word lengths together. For example, for the list ```["I", "love", "data", "science"]``` the answer would be 1 + 4 + 4 + 7 = 16.

The first solution that might come to mind is to use the lambda function ```lambda a, b: len(a) + len(b)``` that adds the lengths of two strings. The problem however is that after adding the length of "I" with "love", reduce() will apply the lambda function on that result, which is 5, and "data" as shows bellow:

This will result in an error because ```len(5)``` isn't defined. To overcome this, we need to change the lambda function to account for two cases:

1. Both a and b are strings
2. a is already an integer, and b is a string

The following code shows this:

In [17]:
total_len = reduce(lambda x, y: len(x) + len(y) if isinstance(x, str) else x + len(y), ["I", "love", "data", "science"])
print(total_len)

16


To complete this exercise, you'll need to use both special cases that we described above: using a single of the argument and having two cases in the lambda function.

1. Using ```reduce```, count the total number of elements in ```lines``` and ```filtered_ips```.
2. Find the ratio between ```filtered_ips``` and ```lines```, and assign the value to ```ratio```.
3. Print the ```ratio``` variable.

In [18]:
from functools import reduce

lines = read('example_log.txt')
ip_addresses = list(map(lambda x: x.split()[0], lines))
filtered_ips = list(filter(lambda x: int(x.split('.')[0]) <= 20, ip_addresses))

count_all = reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, lines)
count_filtered = reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, filtered_ips)

ratio = count_filtered / count_all

print(ratio)

0.0808


Because we eventually convert to lists, we should rewrite the ```map()``` and ```filter()``` functions using list comprehension instead. This is the more Pythonic way of writing them — we're taking advantage of the Python syntax for making lists. Here's how you could translate the previous examples of ```map()``` and ```filter()``` to list comprehensions:

In [19]:
values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Map.
add_10 = [x + 10 for x in values]
print(add_10)

[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]


From the examples, you can see that we don't need to add the lambda expressions. If you want to add ```map()```, or ```filter()``` functions to your code, this is usually the recommended way. However, on the next screen, we'll show you a case when you would still use the map() and filter() functions.

1. Using list comprehension, do the following:
    * Rewrite the ip_addresses mapping
    * Rewrite the filtered_ips filter
2. Keeping everything else, print the ratio variable.

In [20]:
lines = read('example_log.txt')
# Rewrite ip_addresses, and filtered_ips.
# list(map(lambda x: x.split()[0], lines))

ip_addresses = [line.split()[0] for line in lines]

# list(filter(lambda x: int(x.split('.')[0]) <= 20, ip_addresses))

filtered_ips = [ip.split('.')[0] for ip in ip_addresses if int(ip.split('.')[0]) <= 20]

count_all = reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, lines)
count_filtered = reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, filtered_ips)
ratio = count_filtered / count_all
print(ratio)

0.0808


Sometimes we want to use the behavior of a function, but we want to decrease the number of arguments it takes. The purpose is to "save" one of the inputs, and create a new function that defaults the behavior using the saved input. For example, perhaps we want to write a function that will always add 2 to any number:

In [21]:
def add_two(b):
    return 2 + b 

print(add_two(4))

6


The **add_two** function is similar to the general function, ```f(a,b)=a+b```, only it defaults one of the arguments ```(a=2)```. In Python, we can use the **partial module** from the **functools** package to set these argument defaults. The **partial** module takes in a function, and "freezes" any number of args (or kwargs), starting from the first argument, then returns a new function with the default inputs.

In [22]:
from functools import partial

def add(a, b):
    return a + b

add_two = partial(add, 2)
add_ten = partial(add, 10)

print(add_two(4))

6


In [23]:
print(add_ten(4))

14


Partials can take in any function, including functions from the standard library!

In [25]:
# A partial that grabs IP addresses using
# the `map` function from the exercises.
extract_ips = partial(
    map,
    lambda x: x.split(' ')[0]
)
lines = read('example_log.txt')
ip_addresses = list(extract_ips(lines))
ip_addresses[:5]

['200.155.108.44',
 '36.139.255.202',
 '50.112.115.219',
 '204.132.56.4',
 '233.154.7.24']

1. Using a partial, create a count function that takes in a list and runs the reduce implementation of list counting.
2. Replace the reduce with count for the following:
    * count_all
    * count_filtered
3. Keeping everything else, print the ratio variable.

In [26]:
from functools import partial

lines = read('example_log.txt')
ip_addresses = list(map(lambda x: x.split()[0], lines))
filtered_ips = list(filter(lambda x: int(x.split('.')[0]) <= 20, ip_addresses))

# reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, lines)
# reduce(lambda x, _: 2 if isinstance(x, str) else x + 1, filtered_ips)

ratio = count_filtered / count_all
print(ratio)
from functools import partial

count = partial(
    reduce,
    lambda x, _: 2 if isinstance(x, str) else x + 1
)

lines = read('example_log.txt')
ip_addresses = [line.split()[0] for line in lines]
filtered_ips = [
    ip.split('.')[0]
    for ip in ip_addresses if int(ip.split('.')[0]) <= 20
]
count_all = count(lines)
count_filtered =  count(filtered_ips)
ratio = count_filtered / count_all
print(ratio)

0.0808
0.0808


Let's examine how we used each of the ```map```, ```filter```, and ```reduce``` functions. In the exercises, we first mapped a list of log lines to their IP addresses, filtered those IP addresses for IPs that start with an integer less than or equal to 20, then counted the results. Notice that the output of each function was inputted into the one immediately following it.

Viewing our exercises this way, it's as if we've created a chain of function calls starting from ```map``` and ending with ```reduce```. This chain of function calls has a term in mathematics called function composition. Given a chain of functions, ```f(x), g(x), h(x)```, function composition occurs when you apply the output of each function to the input of the next: ```h(g(f(x)))```.

This is exactly the same concept we used in our exercises:

```reduce(filter(map(...)))```

Using a function ```compose``` that takes in a sequence of **single argument** functions, we can create a composed single argument function similar to the example above. Here's a composed function with ```int``` types instead of iterable types:

```
def add_two(x):
    return x + 2

def multiply_by_four(x):
    return x * 4

def subtract_seven(x):
    return x - 7

composed = compose(
    add_two,  # + 2
    multiply_by_four,  # * 4
    subtract_seven  # - 7
)

# (((10 + 2) * 4) - 7) = 41
answer = composed(10)
print(answer)
```

By restricting each ```map```, ```filter```, and ```reduce``` functions, requiring only a single input (an iterable), we can rewrite our previous implementations as a composable function.

1. Using ```compose```, combine the ```map```, ```filter```, and ```reduce``` functions (with ```partial```) to create a composable function that takes in a list and returns a filtered count.
2. Assign the result of the composed function to the variable ```counted```.

In [29]:
from compose import compose

lines = read('example_log.txt')
ip_addresses = list(map(lambda x: x.split()[0], lines))
filtered_ips = list(filter(lambda x: int(x.split('.')[0]) <= 20, ip_addresses))

ratio = count_filtered / count_all
extract_ips = partial(
    map,
    lambda x: x.split()[0]
)
filter_ips = partial(
    filter,
    lambda x: int(x.split('.')[0]) <= 20
)
count = partial(
    reduce,
    lambda x, _: 2 if isinstance(x, str) else x + 1
)
compose = compose(
    extract_ips,
    filter_ips,
    count
)
counted = compose(lines)

TypeError: 'int' object is not iterable

# Pipeline Tasks

In the previous lesson, we learned about functional programming. We briefly spoke about the requirements of tasks, and how a combination of tasks combine to create a data pipeline. In this lesson, we will build on the functional programming concepts we learned, and construct a real pipeline from scratch.

The goal of our pipeline will be to take log lines, from the ```example_log.txt``` file, and create a summary CSV file of unique HTTP request types and their associated counts. Each line from the ```example_log.txt``` file is from an NGINX log file. This log file contains each client request sent to a running web server in a client-server model.

Here's the first few lines of the log:
```
200.155.108.44 - - [30/Nov/2017:11:59:54 +0000] "PUT /categories/categories/categories HTTP/1.1" 401 963 "http://www.yates.com/list/tags/category/" "Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"
36.139.255.202 - - [30/Nov/2017:11:59:54 +0000] "PUT /search HTTP/1.1" 404 171 "https://www.butler.org/main/tag/category/home.php" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_5_0) AppleWebKit/5332 (KHTML, like Gecko) Chrome/15.0.813.0 Safari/5332"
50.112.115.219 - - [30/Nov/2017:11:59:54 +0000] "POST /main/blog HTTP/1.1" 404 743 "http://deleon-bender.com/categories/category.html" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_5 rv:2.0; apn-IN) AppleWebKit/531.48.1 (KHTML, like Gecko) Version/4.0 Safari/531.48.1"
204.132.56.4 - - [30/Nov/2017:11:59:54 +0000] "POST /list HTTP/1.1" 404 761 "http://smith.com/category.htm" "Opera/9.39.(Windows 98; Win 9x 4.90; mn-MN) Presto/2.9.163 Version/12.00"
233.154.7.24 - - [30/Nov/2017:11:59:54 +0000] "GET /app HTTP/1.1" 404 526 "http://www.cherry.com/main.htm" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5360 (KHTML, like Gecko) Chrome/13.0.839.0 Safari/5360"
```

Each line follows the convention:
```
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"
```

* \$remote_addr — the ip address of the client making the request to the server.
* \$remote_user — if the client authenticated with basic authentication, this is the user name (blank in the examples above).
* \$time_local — the local time when the request was made.
* \$request — the type of request, and the URL that it was made to.
* \$status — the response status code from the server.
* \$body_bytes_sent — the number of bytes sent by the server to the client in the response body.
* \$http_referrer — the page that the client was on before sending the current request.
* \$http_user_agent — information about the browser and system of the client.

In our exercises, we will be focusing on the **request** type that can be one of POST, GET, or PUT. To begin, we're going to learn about special iterable types in Python, called generators. Then, we will use these generators to create a highly performant data pipeline.

### Generators in Python

Before we can dive into our task pipeline, we need to introduce **generators** in Python. The best way to do this is with an example.

In the previous lesson, we would read in the ```example_log.txt``` file, and write it to a list. Recall that when creating a list, Python loads each element of the list into RAM. For files that exceed multiple gigabytes, this file loading can cause a program to run out of memory.

Instead of reading the file into memory, we can take advantage of **file streaming**. File streaming works by breaking a file into small sections (called **chunks**), and then loaded one at time into memory. Once a chunk has been exhausted (all the bytes of that chunk has been read), Python requests the next chunk, and then that chunk is loaded into memory to be iterated on.

This is abstracted away when you run the following:

```
with open('example_log.txt') as file:
    for line in file:
        # The file acts like an iterator.
        print(line)
```

We can see evidence of exhausted bytes if you try to read from the opened file again:

```
with open('example_log.txt') as file:
    for line in file:
        do_something()

    # At this point, the file has been read and
    # no unread bytes are remaining.
    for line in file:
        # The `file` is empty and the loop ends
        # immediately.
        do_something()
```

This stream-like behavior is extremely helpful when working with large data sets. We can replicate this behavior with other iterators with the use of **generators**. A generator is an iterable object that is created from a **generator function**.

The generator function differs from a regular function by two important differences:
* A generator uses ```yield```, instead of ```return```. (However, a ```return``` statement is used to stop iteration, more on that soon).
* Local variables are kept in memory until the generator completes.

Here's a simple example of a generator that calculates the squares of each (non-negative) number up to and excluding ```N```:

```
def squares(N):
    for i in range(N):
        yield i * i

for i in squares(4):
    print(i)
```

Let's break down this snippet, highlighting the mentioned differences. First, notice that there is no ```return``` statement, but instead, there is a ```yield``` expression. The ```yield``` expression is responsible for two actions:

* A signal to the Python interpreter that this function will be a generator.
* Suspends the function execution, keeping the local variables in memory, until the next call.

The suspension of execution, saving local variables, and then resuming operation is what allows the generator to act like a stream. We can even make generators that can continue execution forever. Here's a count generator that continuously counts from 1 until the program terminates:

```
def count():
    i = 1
    while True:
        yield i
        i += 1

for i in count():
    print(i)
```

Using the ```next()``` function, you can see the generator and yield suspension work in action. In an iteration (like a ```for``` loop), the Python interpreter continuously calls the ```next()``` function to receive the "next" element in the iterable. In a generator, each call to the ```next()``` function completes a cycle, and then stops at the next ```yield```.

```
print(next(count()))
1
print(next(count()))
2
```


Suppose we wanted to give an upper limit to the ```count()``` function. Then we need to use a ```return``` statement within the generator. The ```return``` statement is one way that a Python loop (eg. ```for```) knows when to stop looping. Using ```return``` without an argument ends the function and returns ```None```, breaking the loop. Here's how we would update ```count()``` using ```return```:

```
# Count with an upper limit of `N`.
def count(N):
    i = 1
    while True
        if i > N:
            return
        yield i
        i += 1

for i in count(5):
    print(i)
```

* Rewrite the squares(N) example generator using a while True loop, instead of a for loop.
    * Using a while loop is tricky, so if you hit an infinite loop, cancel the code run and check your logic.
* Using the new squares(N) generator, create a list of squared elements for squares(20).
* Assign the list to the variable squared_values.

In [30]:
def squares(N):
    i = 0
    while True:
        if i >= N:
            return
        yield i * i
        i += 1

squared_values = [i for i in squares(20)]
print(squared_values)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]


### Pipeline Tasks

Generator comprehensions are extremely similar to list comprehensions. We can turn any list comprehension into a generator comprehension by replacing the square brackets ```[]``` to parenthesis ```()```. For example, here's how we could write the ```squares``` function in the previous screen as a list and generator expression:

```
squared_list = [i * i for i in range(20)]
squared_gen = (i * i for i in range(20))
```



Before you begin replacing all your lists as generators, let's discuss a major drawback of the generator. Suppose we had two places in our code that wanted to use the ```squared_gen``` generator. With a list, ```squared_list```, we could easily do:

```
num_to_square = {}
for idx, i in enumerate(squared_list):
    num_to_square[idx] = i
print(num_to_square)
```
```
{0: 0, 1: 1, 2: 4, 3: 9 ...}
```
```
for i in squared_list:
    print(i)
```
```
0
1
4
9
...
```


Using a generator, however, the second loop will **not** run. Like a file, a generator will exhaust all it's elements once the final yield has been executed. Be cautious of this behavior when using generators, like ```squared_gen``` in your code!

```
num_to_square = {}
for idx, i in enumerate(squared_gen):
    num_to_square[idx] = i
print(num_to_square)
```
```
{0: 0, 1: 1, 2: 4, 3: 9 ...}
```
```
for i in squared_gen:
    print(i)
```
```
None
```

### Manipulating Generators in Tasks

It's time to use generators in our pipeline. Recall from the previous lesson that we combined a sequence of maps, filters, reducers, and produced a final count output. Using a sequence of generators, instead of the built-in objects, we will mimic this compose behavior for our pipeline.

To restate our goals for the lesson, we want to perform the following to get from a raw log file to a summarized CSV:

1. Read in the ```example_log.txt``` file.
2. Parse the log file into a series of rows.
3. Format the rows into a CSV file.
4. Run some analysis on the CSV file, and count unique visitors.
5. Format the analysis to a summarized CSV file.

Each step can be isolated as an individual task. Notice that they can also be written to take in an iterable-like object (file, generator, CSV lines, etc), and also output an iterable. This replicable behavior is highly valuable for consistent data pipelines.

Furthermore, we still want to adhere to the general practices of functional programming. The tenets being: highly composable functions with a focus on function purity.

To emphasize composability, we can create a general ```parse()``` function that takes in a log file, splits the lines, and then extracts the fields. In the next screen we will perform some data cleaning, but for now we just want to get the data into a generator.




In [31]:
log = open('example_log.txt')
def parse_log(log):
    for line in log:
        split_line = line.split()
        remote_addr = split_line[0]
        time_local = split_line[3] + " " + split_line[4]
        request_type = split_line[5]
        request_path = split_line[6]
        status = split_line[8]
        body_bytes_sent = split_line[9]
        http_referrer = split_line[10]
        http_user_agent = " ".join(split_line[11:])
        yield (
            remote_addr, time_local, request_type, request_path,
            status, body_bytes_sent, http_referrer, http_user_agent
        )

first_line = next(parse_log(log))

In [32]:
first_line

('200.155.108.44',
 '[30/Nov/2017:11:59:54 +0000]',
 '"PUT',
 '/categories/categories/categories',
 '401',
 '963',
 '"http://www.yates.com/list/tags/category/"',
 '"Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"')

### Data Cleaning in Parse Log

We can update the ```parse_log()``` function to also perform some data cleaning for us. Notice that we've stored a ```local_time``` field with square brackets, the HTTP status code is a string, and there's unnecessary double quotes (```"```) around some fields.

To help fix these, we have exposed a couple utility functions for you in the exercise:

```
def parse_time(time_str):
    """
    Parses time in the format [30/Nov/2017:11:59:54 +0000]
    to a datetime object.
    """
    time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S %z]')
    return time_obj

def strip_quotes(s):
    return s.replace('"', '')
```


* Enhance the parse_log() function by cleaning the following:
    * Set the time_local field to a datetime object.
    * Strip the quotes off request_type, http_referrer, and http_user_agent.
    * Parse the status and body_bytes_sent to int.
* Read the example_log.txt file, and call parse_log() on the file.
* Call next() on the generator, and assign the return value to the variable first_line.

In [38]:
log = open('example_log.txt')
from datetime import datetime 

def parse_time(time_str):
    """
    Parses time in the format [30/Nov/2017:11:59:54 +0000]
    to a datetime object.
    """
    time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S %z]')
    return time_obj

def strip_quotes(s):
    return s.replace('"', '')

def parse_log(log):
    for line in log:
        split_line = line.split()
        remote_addr = split_line[0]
        time_local = parse_time(split_line[3] + " " + split_line[4])
        request_type = strip_quotes(split_line[5])        
        request_path = split_line[6]
        status = int(split_line[8])
        body_bytes_sent = int(split_line[9])
        http_referrer = strip_quotes(split_line[10])
        http_user_agent = strip_quotes(" ".join(split_line[11:]))
        yield (
            remote_addr, time_local, request_type, request_path,
            status, body_bytes_sent, http_referrer, http_user_agent
        )
        
first_line = next(parse_log(log))
print(first_line)

('200.155.108.44', datetime.datetime(2017, 11, 30, 11, 59, 54, tzinfo=datetime.timezone.utc), 'PUT', '/categories/categories/categories', 401, 963, 'http://www.yates.com/list/tags/category/', 'Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332')


### Write to CSV

After parsing our logs into a generator of tuples, it's now time to write a task, and save the rows to a CSV file. This keeps the data in a well known data storage structure that we can use in future tasks. In the next lesson, we will discuss the role of files in a data pipeline.

A CSV is best understood when it has a set of header names for the columns, and the proper data types for its values. After parsing the logs, we have the proper data types, but we don't have the metadata of the column names. At the end of the exercise, we'll want to have the following output for our CSV file:

```
ip,time_local,request_type,request_path,status,bytes_sent,http_referrer,http_user_agent
200.155.108.44,2017-11-30 11:59:54+00:00,PUT,/categories/categories/categories,401,963,http://www.yates.com/list/tags/category/,"Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"
36.139.255.202,2017-11-30 11:59:54+00:00,PUT,/search,404,171,https://www.butler.org/main/tag/category/home.php,"Mozilla/5.0 (Macintosh; PPC Mac OS X 10_5_0) AppleWebKit/5332 (KHTML, like Gecko) Chrome/15.0.813.0 Safari/5332"
```

We have worked with the Python csv module a few times in the data engineering track. Here's an example of how you can use the csv module to write to a file:

```
import csv

rows = [('a', 'b', 'c'), ('a1', 'b1', 'c1')]

# Open file with read and write permissions.
file = open('example_file.csv', 'w+')
writer = csv.writer(file, delimiter=',')
writer.writerows(rows)

# Go to the beginning of the file.
file.seek(0)
print(file.readlines())
```
```
"a,b,c\na1,b1,c1\n"
```

* Write a function build_csv() that takes in a required argument, lines (the parsed rows), file and optional argument header.
    * If there is a header argument, insert the header at the beginning of the parsed rows.
    * Write the CSV to the given file.
    * Return the file.
* Open the file temporary.csv for reading and writing.
* Call build_csv() with the following variables:
    * parsed for lines.
    * A list or tuple of the header names in the screen's example.
    * The temporary.csv file.
* Assign the build_csv() return value to the variable csv_file.
* Call csv_file.readlines() and assign the return value to the variable contents.
* Print the first 5 rows of contents using print().


In [40]:
import csv

log = open('example_log.txt')
parsed = parse_log(log)

def build_csv(lines, file, header=None):
    if header:
        lines = [header] + [l for l in lines]
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    file,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ]
)
contents = csv_file.readlines()
print(contents[:5])

['ip,time_local,request_type,request_path,status,bytes_sent,http_referrer,http_user_agent\n', '200.155.108.44,2017-11-30 11:59:54+00:00,PUT,/categories/categories/categories,401,963,http://www.yates.com/list/tags/category/,"Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"\n', '36.139.255.202,2017-11-30 11:59:54+00:00,PUT,/search,404,171,https://www.butler.org/main/tag/category/home.php,"Mozilla/5.0 (Macintosh; PPC Mac OS X 10_5_0) AppleWebKit/5332 (KHTML, like Gecko) Chrome/15.0.813.0 Safari/5332"\n', '50.112.115.219,2017-11-30 11:59:54+00:00,POST,/main/blog,404,743,http://deleon-bender.com/categories/category.html,"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_5 rv:2.0; apn-IN) AppleWebKit/531.48.1 (KHTML, like Gecko) Version/4.0 Safari/531.48.1"\n', '204.132.56.4,2017-11-30 11:59:54+00:00,POST,/list,404,761,http://smith.com/category.htm,Opera/9.39.(Windows 98; Win 9x 4.90; mn-MN) Presto/2.9.163 Version/12.00\n']


### Chaining Iterators

Unfortunately, if we wanted to append the **header** to the list of rows, then we had to convert those rows from a generator to list. By converting, we're losing the benefit of streaming the rows, and therefore there would be no point in creating a generator in the first place. If we want to keep the generator behavior, and insert a header, then we should use the **itertools.chain()** function to combine the two iterables.

The **itertools.chain()** function combines a list of iterables together to create a single iterable object that runs through every element.

In [41]:
import itertools
import random

nums = [1, 2]
letters = ('a', 'b')
# Random number generator.
randoms = (random.random() for _ in range(2))

for ele in itertools.chain(nums, letters, randoms):
    print(ele)

1
2
a
b
0.48207631504821125
0.8093692433537565


In [42]:
import csv
import itertools

log = open('example_log.txt')
parsed = parse_log(log)

def build_csv(lines, file, header=None):
    # if header:
    #    lines = [header] + [l for l in lines]
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    file,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ]
)
    
contents = csv_file.readlines()
print(contents[:5])

['ip,time_local,request_type,request_path,status,bytes_sent,http_referrer,http_user_agent\n', '200.155.108.44,2017-11-30 11:59:54+00:00,PUT,/categories/categories/categories,401,963,http://www.yates.com/list/tags/category/,"Mozilla/5.0 (Windows CE) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.864.0 Safari/5332"\n', '36.139.255.202,2017-11-30 11:59:54+00:00,PUT,/search,404,171,https://www.butler.org/main/tag/category/home.php,"Mozilla/5.0 (Macintosh; PPC Mac OS X 10_5_0) AppleWebKit/5332 (KHTML, like Gecko) Chrome/15.0.813.0 Safari/5332"\n', '50.112.115.219,2017-11-30 11:59:54+00:00,POST,/main/blog,404,743,http://deleon-bender.com/categories/category.html,"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_5 rv:2.0; apn-IN) AppleWebKit/531.48.1 (KHTML, like Gecko) Version/4.0 Safari/531.48.1"\n', '204.132.56.4,2017-11-30 11:59:54+00:00,POST,/list,404,761,http://smith.com/category.htm,Opera/9.39.(Windows 98; Win 9x 4.90; mn-MN) Presto/2.9.163 Version/12.00\n']


### Counting Unique Request Types

With the log file parsed and formatted to a CSV file, we can finally create the data summarization. Recall that the final step of the pipeline was to count the unique request types in the log file. In this exercise, we want to create a function that takes in the raw CSV file, and returns the following dict:

```
{
    'GET': 3334,
    'PUT': 3367,
    'POST': 3299
}
```

* Create a function called count_unique_requests() that takes in the CSV file, and returns the dict in the example.
* Call count_unique_requests() on csv_file and assign the return value to the variable uniques.
* Print the variable uniques.

In [43]:
import csv
import itertools

log = open('example_log.txt')
parsed = parse_log(log)
file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    file,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ]
)

def count_unique_request(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('request_type')
    
    uniques = {}
    for line in reader:
        if not uniques.get(line[idx]):
            uniques[line[idx]] = 0
        uniques[line[idx]] += 1
    return uniques

uniques = count_unique_request(csv_file)
print(uniques)

{'PUT': 3367, 'POST': 3299, 'GET': 3334}


### Task Reusability

Instead of a dictionary, we want to keep the output of the summarize task consistent with the rest of the pipeline. That is, we want the summarized task to output a generator of tuples. After converting the summarize task to a generator, we can reuse the build_csv() function to create a summarized CSV file like the following:

```
request_type,count
GET,3334
PUT,3367
POST,3299
```

* Change the return value of count_unique_requests() to be a generator of key, value tuples from the dictionary.
* Call build_csv() on the return value of count_unique_requests() with the header from the example and the file summarized.csv.
* Print the return value of .read() from the opened summarized.csv file.

In [45]:
import csv

def count_unique_request(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('request_type')
    
    uniques = {}
    for line in reader:
        
        if not uniques.get(line[idx]):
            uniques[line[idx]] = 0
        uniques[line[idx]] += 1
    return uniques


log = open('example_log.txt')
parsed = parse_log(log)
file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    file,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ]
)
uniques = count_unique_request(csv_file)
def count_unique_request(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('request_type')
    
    uniques = {}
    for line in reader:
        
        if not uniques.get(line[idx]):
            uniques[line[idx]] = 0
        uniques[line[idx]] += 1
    return ((k, v) for k,v in uniques.items())

log = open('example_log.txt')
parsed = parse_log(log)
file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    file,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ]
)
uniques = count_unique_request(csv_file)
summarized_file = open('summarized.csv', 'r+')
summarized_csv = build_csv(uniques, summarized_file, header=['request_type', 'count'])
print(summarized_file.readlines())

['request_type,count\n', 'PUT,3367\n', 'POST,3299\n', 'GET,3334\n']


In this lesson we expanded on the concept of functional programming, and explored how composition naturally creates a data pipeline. We built a sequence of tasks, and completed a pipeline that transformed raw log data into summarized CSV file.

In the next lesson, we will generalize these tasks, and create a general purpose pipeline. We will learn about closures and function decorators that provide additional code reusability in functional programming. Finally, we will rebuild this pipeline using the general purpose pipeline.

# Building a Pipeline Class

Let's take a second and think about what we've accomplished so far. First, we've learned about the concepts of functional programming, and how to write Python code using this paradigm. Next, we built a sequence of tasks that transformed a raw log file into a summarized CSV file.

The problem with the sequence of tasks is that they were written **statically**. That is, they were written for one specific purpose, and we had to declare a variable for each function call. This process is difficult to extend, meaning we're required to rewrite our pipeline process each time we want to add, or change new tasks.

Instead, we want to use a general purpose pipeline that makes it easy to build tasks and dependencies. Rather than calling a function, assigning the return value to an output variable, and then passing that variable to another function, we can use a general pipeline that works for all cases.

By the end of this lesson, you will learn to do all the above using your own Pipeline class:

In [46]:
pipeline = Pipeline()

@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

print(pipeline.run(20))

NameError: name 'Pipeline' is not defined

This lesson will focus on introducing the concepts behind the **@pipeline.task()** syntax, how to combine object oriented design with the functional paradigm, and how to recreate the previous lesson's pipeline of tasks. To begin, we will start by introducing the last functional concept we'll need for our pipeline.

### Inner Functions

In the previous lesson, we wrote a ```build_csv()``` function that took in a Python file object, and then wrote out the file as a CSV format. A better way to write this is to accept either a file object, or a string filename. The benefit is that we can dynamically open a file, if given a filename, or use the given file object.

We can write this behavior using an **inner function**. An inner function is, non-surprisingly, a function within a function. Here's what this would look like:

In [47]:
lines = [[1,2,3], [4,5,6]]

def build_csv(lines, header=None, file=None):
    def open_file(f):
        # If it's a string, then open the file
        # and return the opened file.
        if isinstance(f, str):
            f = open(f, 'w')
        return f

    file = open_file(file)  # add inner function.
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

csv_file = build_csv(lines, file='example.csv')

The benefit of these inner functions is that they are **encapsulated** in the scope of the parent function. You cannot call ```open_file()``` outside of the ```build_csv()``` function! Since we will only use ```open_file()``` within ```build_csv()```, this keeps the function isolated from the global scope of the program.

### Function Closures

The previous example introduced the concept of inner functions, and the benefit of encapsulation, but did not introduce the real benefits of inner functions. Before we start, recall the example of using ```partial``` for an ```add()``` function:


In [48]:
def add(a, b):
    return a + b

add_two = partial(add, 2)
print(add_two(7))

9


The partial function returns a new function using the default ```a=2``` argument. We can replicate this behavior using an inner function instead. Let's see how this could work:

In [49]:
def add(A):
    def inner(b):
        # Use `A` from the argument in 
        # the parent add() function.
        return A + b
    return inner

add_two = add(2)
print(add_two(7))

9


There are two key points to notice in this new ```add()``` function.

    * ```inner()``` uses the ```A``` variable from the parent ```add()``` function.
    * ```add()``` returns the ```inner()``` function.

Let's break down these two points. The first shows that the inner function has access to the parent's scope. The second, because of a function's first-class status, it's possible to return the ```inner()``` function itself!

Finally, notice that when we call the ```add()``` function with 2, the ```inner()``` function is returned to the ```add_two``` variable **with** the saved parent variable, *A=2*. That is, when we call the ```add_two()``` function, we are actually calling the ```inner()``` function with saved parent variables. It means that when the ```add()``` function is called, the ```inner()``` function is saved in working memory, ready to be called with those default values.

These type of functions are called **closures**. A closure is defined by an inner function that has access to its parent's scope (ie. its variables). In Python, we can pass any amount of arguments from the parent function down to the inner function using the * symbol. Here's how we could create a ```add()``` function that added any arbitrary number of arguments: 

In [50]:
# `*args` assigns a variable args to a tuple of
# function arguments.
# *args => args = (arg1, arg2, arg3, ...)
def add(*args):
    parent_args = args
    def inner(*inner_args):
        return sum(parent_args + inner_args)
    return inner

add_nine = add(1, 3, 5)
print(add_nine(2, 4, 6))

21


* Using a closure, recreate the partial() function:
    * The first argument should be a function.
    * The second argument should allow for any arbitrary amount of arguments.
* Using the partial() function you wrote, call partial() on the add() function, and pass in 2.
    * Assign the returned function to the variable add_two.
* Call print() on add_two(7).

In [51]:
def add(a, b):
    return a + b

def partial(func,*args):
    parent_args =args
    def inner(*inner_args):
        return func(*(parent_args + inner_args))
    return inner
add_two = partial(add,2)
print(add_two(7))

9


### Python Decorators

In Python, it's sometimes convenient to log function calls for debugging purposes. Using a closure, it's possible to construct logging for any function.

In [52]:
def add(a, b):
    return a + b

def logger(func):
    def inner(*args):
        print('Calling function: {}'.format(func.__name__))
        print('With args: {}'.format(args))
        return func(*args)
    return inner

logged_add = logger(add)
print(logged_add(1, 2))

Calling function: add
With args: (1, 2)
3


The problem with our implementation, is that it would be tedious having to create a new ```logged_*``` variable for every function we wanted to log. Instead, Python provides **syntactic sugar**, (ie. a better way of writing syntax) to make this process easier. Using the **decorator** syntax (```@```), we can rewrite the example from above:

In [53]:
def logger(func):
    def inner(*args):
        print('Calling function: {}'.format(func.__name__))
        print('With args: {}'.format(args))
        return func(*args)
    return inner

@logger
def add(a, b):
    return a + b

print(add(1, 2))

Calling function: add
With args: (1, 2)
3


Notice that we didn't change anything in the **logger()** function, we just applied some new Python syntax. The syntax creates a decorator, which is a Python callable object (e.g. function), that modifies the behavior of any function, method, or class. By wrapping the **add()** function with the **@logger** decorator, we're telling the Python interpretor to call the **logger()** function for each invocation of **add()**:

In [54]:
@logger
def add(a, b):
    return a + b

# Wrapping `add()` with `@logger`:
add(1, 2) -> logger(add)(1, 2)

SyntaxError: invalid syntax (2366091941.py, line 6)

* Write a decorator function named catch_error() that does the following:
    * Tries to run and return a given function.
    * If any exception is thrown, catches the exception, and returns the exception object.
* Add the decorator to the throws_error() function.
* Run and print the function call of throws_error().

In [55]:
# @catch_error
def throws_error():
    raise Exception('Throws Error')

def catch_error(func):
    def inner(*args):
        try:
            return func(*args)
        except Exception as e:
            return e
    return inner

@catch_error
def throws_error():
    raise Exception('Throws Error')
    
print(throws_error())

Throws Error


### Method Decorators

Recall that we are trying to create a pipeline class which dynamically adds tasks to be run. Rather than using functions as decorators, we can also use instance methods! Here's the behavior we're aiming for:

In [56]:
pipeline = Pipeline()

@pipeline.task()
def first_task(x):
    return x + 1

print(pipeline.tasks)

NameError: name 'Pipeline' is not defined

Notice a couple things here:
* The @pipeline.task() decorator is constructed from a class method.
* The @pipeline.task() decorator is called before wrapping.
* The first_task() function is being dynamically added to a list in the object.


* Add a task() decorator method to the given Pipeline class that dynamically adds functions to the self.tasks list.
* Instantiate a Pipeline object, and assign it to the variable pipeline.
* Wrap the first_task() function with the @pipeline.task() decorator.
* Print the pipeline.tasks property.

In [57]:
class Pipeline:
    def __init__(self):
        self.tasks = []
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self):
        def inner(f):
            self.tasks.append(f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

print(pipeline.tasks)

[<function first_task at 0x109e9b790>]


### Decorator Arguments

If we can call a decorator, before wrapping, then it's possible to pass in optional arguments before wrapping. Let's return to the behavior we are trying to build towards:

```
pipeline = Pipeline()

@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

print(pipeline.tasks)
```

We want the tasks to come in an ordered format. This ordering is based on the execution of our tasks, where we want the pipeline to run starting from the **first_task()** function, then executing the **second_task()**. The **depends_on** keyword argument enforces this ordering, so we can determine the dependency link of each task.


* Add the depends_on optional keyword argument to the Pipeline.task() method.
    * If the task depends on a function, then task() should search through the task list, and add the new task right after it.
* Decorate the second_task() and last_task() functions with pipeline.task():
    * second_task() depends on the first_task().
    * last_task() depends on the second_task().
* Print the pipeline.tasks property.

In [58]:
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self):
        def inner(f):
            self.tasks.append(f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

def second_task(x):
    return x * 2

def last_task(x):
    return x - 4
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

print(pipeline.tasks)

[<function first_task at 0x109e9bb80>, <function second_task at 0x109e9b9d0>, <function last_task at 0x109e9b940>]


### Running the Pipeline

With the tasks being inserted in order, it's now possible to write the pipeline's ```run()``` method. This method should act like functional composition where it takes the previous function's output, and inputs it into the following function. The behavior should execute like so:

```
pipeline = Pipeline()

@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

pipeline.run(20)
# (((20 + 1) * 2) - 4) = 38
```

Notice that we're only inputing, and outputing, a single argument and value for each task.


* Add the run() method to the Pipeline class.
    * The run() method should take in an input_ argument.
    * Then, it should iterate through the self.tasks property, and call each function with the previous output.
* Call pipeline.run(20) and print() the value.

In [59]:
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner

pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner
    
    def run(self, input_):
        output = input_
        for task in self.tasks:
            output = task(output)
        return output
    
pipeline = Pipeline()
    
@pipeline.task()
def first_task(x):
    return x + 1

@pipeline.task(depends_on=first_task)
def second_task(x):
    return x * 2

@pipeline.task(depends_on=second_task)
def last_task(x):
    return x - 4

print(pipeline.run(20))

38


### Challenge: Making Static Tasks Dynamic

With the **pipeline.run()** method, we now have a fully functioning general pipeline! We can use this **Pipeline** class with any tasks that require a dependency ordering. Our previous lessons task pipeline fits this use case.

Using the functions we wrote in the previous lesson, we will rebuild the pipeline using the **Pipeline** class we wrote this lesson. Here's the functions you wrote in the previous lesson, and the sequence of executing the tasks:

```
def parse_log(log):
    for line in log:
        split_line = line.split()
        remote_addr = split_line[0]
        time_local = parse_time(split_line[3] + " " + split_line[4])
        request_type = strip_quotes(split_line[5])
        request_path = split_line[6]
        status = split_line[8]
        body_bytes_sent = split_line[9]
        http_referrer = strip_quotes(split_line[10])
        http_user_agent = strip_quotes(" ".join(split_line[11:]))
        yield (
            remote_addr, time_local, request_type, request_path,
            status, body_bytes_sent, http_referrer, http_user_agent
        )

def build_csv(lines, header=None, file=None):
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

def count_unique_request(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('request_type')

    uniques = {}
    for line in reader:

        if not uniques.get(line[idx]):
            uniques[line[idx]] = 0
        uniques[line[idx]] += 1
    return ((k, v) for k,v in uniques.items())

# Run the static tasks.
log = open('example_log.txt')
parsed = parse_log(log)
file = open('temporary.csv', 'r+')
csv_file = build_csv(
    parsed,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    file=file
)
uniques = count_unique_request(csv_file)
summarized_file = open('summarized.csv', 'r+')
summarized_csv = build_csv(uniques, header=['request_type', 'count'], file=summarized_file)
```

Before we conclude, there's a helpful module that you can optionally use for file reading and writing. Instead of having to open a file, you can use the StringIO object from the io module. The StringIO object mimicks a file-like object that, instead of writing out to disk, keeps a file-like object in memory.

```
import io

log = open('example_log.txt')
parsed = parse_log(log)
csv_file = build_csv(
    parsed,
    header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    # Using file-like object instead of `temporary.csv`.
    file=io.StringIO()
)
uniques = count_unique_request(csv_file)
summarized_csv = build_csv(
    uniques,
    header=['request_type', 'count'],
    # Using file-like object instead of `summarized.csv`.
    file=io.StringIO()
)
print(summarized_csv.readlines())
```

The **io.StringIO** object makes it easier to pass throughout the pipeline, and it includes the same API as a file. In the following exercise, we'll ask to use **io.StringIO**.


* Rebuild the previous lesson's pipeline using the Pipeline class.
    * The functions from the example code are provided to you in the exercise.
    * The pipeline doesn't have to be identical. You can choose to omit, combine, or add on additional functions.
    * Use io.StringIO() as the file object in the build_csv() keyword argument.
* Call pipeline.run() on the log file.
* Assign the return value to the variable summarized_csv, and print() the results of summarized_csv.readlines().

In [60]:
import io

class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner

    
pipeline = Pipeline()
log = open('example_log.txt')
class Pipeline:
    def __init__(self):
        self.tasks = []
        
    def task(self, depends_on=None):
        idx = 0
        if depends_on:
            idx = self.tasks.index(depends_on) + 1
        def inner(f):
            self.tasks.insert(idx, f)
            return f
        return inner
    
    def run(self, input_):
        output = input_
        for task in self.tasks:
            output = task(output)
        return output
    
pipeline = Pipeline()

@pipeline.task()
def parse_logs(logs):
    return parse_log(logs)

@pipeline.task(depends_on=parse_logs)
def build_raw_csv(lines):
    return build_csv(lines, header=[
        'ip', 'time_local', 'request_type',
        'request_path', 'status', 'bytes_sent',
        'http_referrer', 'http_user_agent'
    ],
    file=io.StringIO())

@pipeline.task(depends_on=build_raw_csv)
def count_uniques(csv_file):
    return count_unique_request(csv_file)

@pipeline.task(depends_on=count_uniques)
def summarize_csv(lines):
    return build_csv(lines, header=['request_type', 'count'], file=io.StringIO())

log = open('example_log.txt')
summarized_file = pipeline.run(log)
print(summarized_file.readlines())

['request_type,count\r\n', 'PUT,3367\r\n', 'POST,3299\r\n', 'GET,3334\r\n']


### Multiple Dependency Pipeline

At the end of the previous lesson, we discussed some drawbacks with our initial pipeline implemention. One of the drawbacks was the restriction of only linear running tasks. Using our tasks as an example, we'll show why this is a major drawback that we must address.

In our last lesson's task pipeline, the final task was to summarize logs that are outputted from a parsed CSV file. The summary is run on the request_type column name, but suppose that we wanted to also run a summarize on the status column. This seems doable – our only requirement should be the parsed CSV – but with our linear pipeline this will not work.

Instead of a linear ordering, we need the ability to create multiple branches of dependencies. We're looking to build a data structure that can support the following task pipeline:

This multiple branching pipeline is called a directed acyclic graph, or DAG for short. In this lesson, we will start by building our own DAG in Python, and then use the DAG to enhance our pipeline task scheduling.

### Intro to DAGs

In the introduction, we briefly mentioned the concept of a DAG. To describe the DAG, we're going to supplement the language of a graph with identical terms used in the course on trees. Let's break down what each of the terms mean:

* Graph: The data structure is composed of vertices (nodes) and edges (branches).
* Directed: Each edge of a vertex points only in one direction.
* Acyclic: The graph does not have any cycles.

The graph definition is ambigious, like the definition of a tree, so it's easier to describe with diagrams.

The vertices (or nodes) are each point in the graph, and the edges are the lines that connect them. Note that there is no requirement of the direction of each edge (ie. The edge (2, 1) and (1, 2) are both possible). If we wanted to restrict the direction of an edge to only one possible combination, then it would be called a directed graph.

The edge arrows correspond to the direction of the edges, which written out as a tuples, would be (1, 2), (2, 6), or (3, 5). If we follow a sequence of directed edges, like [(1, 3), (3, 5), (5, 7)], then we call this sequence a path of the graph.

If there are any paths that starts, and ends with one vertex, then the graph contains a cycle. For example, if we had a path like [(1, 3), (3, 5), (5, 4), (4, 1)], then it would be a cycle. If a graph does not contain any cycles, then it is acyclic.

The following graph has a cycle. In the exercise, we will ask you to find the cycle.

### The Dag Class

Let's turn our attention back to the pipeline we wish to create:

We can see from the diagram that the pipeline exhibits the requirements of a DAG. First, there are a set of vertices and edges, second, there is a direction of each task, and finally, there are no cycles. But just because the pipeline can be written as a DAG, why does that mean we should write is as such?

The reason is that the DAG structure is built in a way that naturally creates an efficient ordering of dependent tasks. There is a DAG sorting algorithm for exposing this order that we can take advantage of when scheduling our tasks. We'll see that using a DAG, we can implement task scheduling in linear time, O(V+E)
, where V and E are number of vertcies and edges).

Before getting to deep into the weeds, let's begin by finding a way in Python to describe the DAG. First, notice that to create graph dependency, we could use a class like Vertex that contains a list of vertices. Like the root in a tree, we could build the DAG class with vertices like:

```
class DAG:
    def __init__(self):
        self.root = Vertex()

class Vertex:
    def __init__(self):
        self.to = []
        self.data = None

dag = DAG()
second = Vertex()
dag.root.to.append(second)

print(dag.root.to)
```
While this works, a graph structure does not always start with a single root. Instead, there could be multiple starting points in a DAG:

Let's see if there's another data structure we could use. First, we need the ability to link vertices to multiple nodes in the graph. Then, we need to easily loop through these nodes to create our graph.

A simple data structure which provides us with those each of those behaviors is a dict with list values. Here's how it could represent our example graph:

```
graph = {
    # Node: List of nodes to.
    7: [],
    6: [7],
    5: [7],
    4: [7],
    3: [5],
    2: [6],
    1: [2, 3, 4]
}
```

Note: Thoughtout this lesson we use a DQ class which assists us answer-check our custom classes. If you remove this, your code can't pass our answer-checking mechanism.


* Implement the add() method of the DAG class:
    * Add node to self.graph if it isn't already in the graph and default the value to a list.
    * If there is a to, add that to the node's list, and add to to self.graph defaulting to a list.
* Run the code with the provided .add() test cases.

In [63]:
class DAG:
    def __init__(self):
        self.graph = {}
    
    def add(self, node, to=None):
        if not node in self.graph:
            self.graph[node] = []
        if to:
            if not to in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)

dag = DAG()
dag.add(1)
dag.add(1, 2)
dag.add(1, 3)
dag.add(1, 4)
dag.add(3, 5)
dag.add(2, 6)
dag.add(4, 7)
dag.add(5, 7)
dag.add(6, 7)

### Sorting the DAG

What does it mean to sort the DAG? In our previous sorting algorithms, we compared object values, and sorted them in ascending or descending order. In a DAG, though, what does it mean to be in ascending or descending order?

Remember, our use case for the DAG was to place tasks in an order of dependencies. What we're looking for is an ordering of tasks that start with the most depended on tasks, and end with the least depended on. In our pipeline example, we're trying to order our tasks to start with parsing a file, and ending with summarizing.

Let's take a look back at our DAG example. At a glance, it's reasonable to assume that the node that is depended on the most is the first node, 1. Then, following the paths, we can see that each node decreases in importance.

Another way to phrase this, is that the longer the path to the node, the less that node is depended on. Take a look at 1, the most dependent, it has the smallest directed path: 0 steps. Conversely, the largest directed path is 7 with 4 steps, and it is the least depended on.

Using this hypothesis, we can perform the following: 1. Find the "root" nodes of the graph with 0 dependecies. 2. For each node, find the longest path from the node to the roots. 3. Sort by the longest paths.

This seems reasonable, but there's some major time complexity drawbacks here. For points 2, and 3, the time complexities of each one are O(n2) (longest path) and O(nlog n) (fastest sort), respectively! That's a worst case of O(n2).

We mentioned that the we can find a sorting algorithm in linear time. In the next screen, we'll use the longer path hypothesis to create a linear time sorting algorithm.

### Finding Number of In Degrees

The previous screen's longest path hypothesis wasn't wrong, it was the proposed algorithm that would have taken too long. In the next few screens, we'll develop an efficient algorithm to sort the DAG.

First, to find a longest path, it's necessary to know which nodes "start" the directed graph. By start, we mean the root nodes that the graph expands from. The question is, what determines a root node?

Let's look at the following graph:

Clearly, the root nodes of the graph are 1, 2, and 3. But what makes the root nodes different than every other node in the graph? The answer is, the number of in-degrees.

The number of in-degrees is the total count of edges pointing towards the node. For example, the node 5 has 2 in-degrees, and the node 8 has 3. For each root node, however, the number of in-degrees will always be 0.


* Note that we have created a BaseDAG class that contains the previous methods. It only contains the single add() method you wrote for the last exercise.
* Implement the in_degrees() method:
    * The in_degrees() method should create DAG.degrees attribute containing a dictionary mapping of node to number of in-degrees.
    * Loop through every node, and its pointers, and then count each edge to the pointed node.
* Run the code with the provided .in_degrees() tests.

In [66]:
class DAG():
    def __init__(self):
        self.graph = {}
    def in_degrees(self):
        self.degrees = {}
        for node in self.graph:
            if node not in self.degrees:
                self.degrees[node] = 0
            for pointed in self.graph[node]:
                if pointed not in self.degrees:
                    self.degrees[pointed] = 0
                self.degrees[pointed] += 1
    def add(self, node, to=None):
        if not node in self.graph:
            self.graph[node] = []
        if to:
            if not to in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)

dag = DAG()
dag.add(1)
dag.add(1, 2)
dag.add(1, 3)
dag.add(1, 4)
dag.add(3, 5)
dag.add(2, 6)
dag.add(4, 7)
dag.add(5, 7)
dag.add(6, 7)
dag.in_degrees()

### Challenge: Sorting Dependencies

With the in_degrees of each node, we can implement the next part of the algorithm: the walk. First, let's begin by filtering the root nodes. By the definition in the last screen, these are the nodes with in_degrees of 0.

```
root_nodes = []
for node in in_degrees:
    if in_degrees[node] == 0:
        root_nodes.append(node)
```

Now that we have the root nodes, what do we do with them? Well, let's recall what the hypothesis is: the shorter the path, the more the node is depended on. Another way to state the hypothesis is: if there are root nodes, then those nodes are the most important nodes in the tree.

If roots are the most important nodes, then what we should do is the following:


1. Filter all the root nodes, and pop them off the graph.
2. Search their pointers, and check if they are the new root nodes.
    1. If one is, append it to the root nodes list, and pop it off the graph.
    2. If not, then continue.
3. Once all the nodes have been popped from the graph, return the list of ordered root nodes.

This high level algorithm provides us with a systematic way to order the nodes in the correct order of importance. If we wanted to write the code solution, we could implement it using a queue (similar to tree traversals):


1. Using in_degrees, place the root node(s) in a queue.
2. While the queue is not empty:
    1. Deque a node, node_i.
    2. Check each of the pointers in the node.
    3. Decrement the pointer's in_degrees by 1 (reduce pointers).
    4. If that pointer's # of in-degrees is 0, add it to the queue.
    5. If not, continue.
    6. Once all the pointers have been searched, append node_i to the list searched.
    7. Continue the while loop.
3. Return the searched list.

Using the description of the algorithm, we'll write the Python code for it in the DAG method sort(). This is a tricky algorithm, but using the methods we have already written, it's possible to implement.

In [67]:
from collections import deque

class DAG(BaseDAG):
    def sort(self):
        self.in_degrees()
        to_visit = deque()
        for node in self.graph:
            if self.degrees[node] == 0:
                to_visit.append(node)
        
        searched = []
        while to_visit:
            node = to_visit.popleft()
            for pointer in self.graph[node]:
                self.degrees[pointer] -= 1
                if self.degrees[pointer] == 0:
                    to_visit.append(pointer)
            searched.append(node)
        return searched

dag = DAG()
dag.add(1)
dag.add(1, 2)
dag.add(1, 3)
dag.add(1, 4)
dag.add(3, 5)
dag.add(2, 6)
dag.add(4, 7)
dag.add(5, 7)
dag.add(6, 7)
dependencies = dag.sort()

NameError: name 'BaseDAG' is not defined

### Enhance the Add Method

The algorithm we wrote in the previous screen is called a topological sort. Specifically, the algorithm we implemented was called Kahn's Algorithm, a famous DAG sorting algorithm. An interesting property about this sorting algorithm is that it can also determine if a graph has a cycle or not.

To test for cyclicity we first sort the DAG, return its toplogically sorted list of visited nodes, and then check the length of sorted nodes. If the length of the sorted nodes is greater than the length of the the nodes in the graph, then there must be a cycle! Because the topological sort visits all pointed nodes, if there is a cycle, we will be visiting a previous node making the visited list greater than the number of vertices in the graph.

For robustness, we should not add a node to the DAG if it makes it cyclical.


* Enhance the add() method to raise an error if a new node causes a cycle:
    * Call sort() in the add() method, and check if the sorted length is greater than the number of nodes in the graph.
    * Raise Exception if a cycle is detected.
* Run the code with the provided .add() test cases.

In [68]:
class DAG(BaseDAG):
    def add(self, node, to=None):
        if not node in self.graph:
            self.graph[node] = []
        if to:
            if not to in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)
            
        if len(self.sort()) != len(self.graph):
            raise Exception
            
dag = DAG()
dag.add(1)
dag.add(1, 2)
dag.add(1, 3)
dag.add(1, 4)
dag.add(3, 5)
dag.add(2, 6)
dag.add(4, 7)
dag.add(5, 7)
dag.add(6, 7)
# Add a pointer from 7 to 4, causing a cycle.
dag.add(7, 4)

NameError: name 'BaseDAG' is not defined

### Adding DAG to the Pipeline

With a robust DAG, we now have the scheduler to build our intended pipeline. To start, let's add in the DAG class to the Pipeline. Instead of the tasks list property, we'll convert it to a DAG.


* Rewrite the DAG.task method to add tasks to the self.tasks.graph.
    * Make sure you're declaring the dependencies properly!
* Add first(), second(), third(), and fourth() functions to the pipeline with @pipeline.task().
    * second() depends on first().
    * third() depends on second().
    * fourth() depends on second().
* Assign pipeline.tasks.graph to the variable graph.

In [69]:
class Pipeline(DQ):
    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            pass
        return inner

pipeline = Pipeline()

def first():
    return 20

def second(x):
    return x * 2

def third(x):
    return x // 3

def fourth(x):
    return x // 4
class Pipeline(DQ):
    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner

pipeline = Pipeline()
@pipeline.task()
def first():
    return 20

@pipeline.task(depends_on=first)
def second(x):
    return x * 2

@pipeline.task(depends_on=second)
def third(x):
    return x // 3

@pipeline.task(depends_on=second)
def fourth(x):
    return x // 4

graph = pipeline.tasks.graph

NameError: name 'DQ' is not defined

### Challenge: Running the Pipeline

With the tasks added in order, it's time to run the pipeline. The major thing to notice is that there's no input for the run function. To run, we create a task that begins the pipeline by returning a static object (in our case, first() returns 20).

Furthermore, there is no concept of a "last task" in a DAG. Therefore, the way we can represent our tasks, and their outputs during a run, is by using a dictionary that maps function: output. With this dictionary, we can store outputs after task completion so we can use them as inputs for the next tasks that require them.

With that output dictionary, we can then choose to see the outputs of any function. This is the behavior we're looking for:

```
pipeline = Pipeline()

@pipeline.task()
def first():
    return 20

@pipeline.task(depends_on=first)
def second(x):
    return x * 2

@pipeline.task(depends_on=second)
def third(x):
    return x // 3

@pipeline.task(depends_on=second)
def fourth(x):
    return x // 4

outputs = pipeline.run()
print(outputs[third])
```
```
print(outputs[fourth])
```

* Rewrite the DAG.run method:.
    * Run the tasks.sort() method, and save it to a variable visited.
    * Initialize a dictionary of completed.
    * Iterate through each sorted task:
        * Check every node in the graph, and if the task is referenced, run the task with the proper input and add it to completed.
        * If the task is not referenced, run the task without arguments and add it to completed.
    * Return the completed dictionary.
* Run the pipeline with pipeline.run() and return the results to the variable outputs.


In [70]:
class Pipeline(DQ):
    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner
    
    def run(self):
        pass

pipeline = Pipeline()

@pipeline.task()
def first():
    return 20

@pipeline.task(depends_on=first)
def second(x):
    return x * 2

@pipeline.task(depends_on=second)
def third(x):
    return x // 3

@pipeline.task(depends_on=second)
def fourth(x):
    return x // 4
class Pipeline(DQ):
    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner
    
    def run(self):
        scheduled = self.tasks.sort()
        completed = {}
        
        for task in scheduled:
            for node, values in self.tasks.graph.items():
                if task in values:
                    completed[task] = task(completed[node])
            if task not in completed:
                completed[task] = task()
        return completed

pipeline = Pipeline()

@pipeline.task()
def first():
    return 20

@pipeline.task(depends_on=first)
def second(x):
    return x * 2

@pipeline.task(depends_on=second)
def third(x):
    return x // 3

@pipeline.task(depends_on=second)
def fourth(x):
    return x // 4

outputs = pipeline.run()

NameError: name 'DQ' is not defined

In this lesson, we solved the linear dependency mapping problem of our pipeline. First, we learned about a DAG, and how we can use it as a task scheduler. Then, we implemented the DAG, and added it to our pipeline.

In the upcoming guided project, we'll use this new pipeline you built and run it on a real-life case. We'll learn about its drawbacks, and what changes need to be made. Finally, we'll introduce some thirdparty pipelines that we'll discuss in the upcoming courses.