# Iterators

## Topics

- Stream larger-than-memory data through a pipeline
- Composable thanks to the iterator protocol

My favorite "feature" of pandas is that it's written in Python.
Python has great language-level features for handling streams of data
that may not fit in memory.
This can be a useful pre-processing step to reading the data into a DataFrame or
NumPy array.
You can get quite far using just the builtin data structures as David Beazley proves in [this PyData keynote](https://www.youtube.com/watch?v=lyDLAutA88s).

In [None]:
import os
import gzip
from itertools import islice, takewhile

import numpy as np
import pandas as pd
import seaborn as sns
import dask.dataframe as dd
from toolz import partition_all, partitionby
import matplotlib.pyplot as plt

In [None]:
%matplotlib inline

In [None]:
pd.options.display.max_rows = 10
sns.set(context='talk')
plt.style.use("default")

## Beer Reviews Dataset

- A review is a list of lines
- Each review line is formated like `meta/field: value`
- Reviews are separated by blank lines (i.e. the line is just `'\n'`)


Stanford has a [dataset on beer reviews](https://snap.stanford.edu/data/web-BeerAdvocate.html). The raw file is too large for me to include, but I split off a couple subsets for us to work with.

Pandas can't read this file natively, but we have Python!
We'll use Python to parse the raw file and tranform it into a tabular format.

In [None]:
with gzip.open("data/beer-raw-small.txt.gz", "r") as f:
    print(f.read(1500).decode('utf-8'))

The full compressed raw dataset is about 500MB, so reading it all into memory might not be pleasent (we're working with a small subset that would fit in memory, but pretend it didn't).
Fortunately, Python's iterator protocol and generators make dealing with large streams of data pleasent.

## Developing a solution

Let's build a solution together. I'll provide some guidance as we go along.

In [None]:
# Get a handle to the data
f = gzip.open("data/beer-raw-small.txt.gz", "rt")

Usually you'd use a context manager like `with gzip.open(...) as f`, but for debugging, it's OK to do it this way.

## Parsing Tasks

1. split the raw text stream into individual reviews
2. transform each individual review into a data container
3. combine a chunk of transformed individual reviews into a collection
4. store the chunk to disk

Let's grab the first review using [`takewhile`](https://docs.python.org/3/library/itertools.html#itertools.takewhile) till the first `'\n'`.
`takewhile` scans a stream, returning each item (line) until it hits the sentinal value it's looking for.

In [None]:
from itertools import takewhile

In [None]:
f.seek(0);  # make the cell idempotent
first = list(takewhile(lambda x: x != '\n', f))
first

<div class="alert alert-success" data-title="Format Review">
  <h1><i class="fa fa-tasks"  aria-hidden="true"></i> Exercise: Format Review</h1>
</div>
<p>Write a function `format_review` that converts an item like `first` into a dict</p>

It will have one entry per line, where the are the stuff to the left of the colon and the values are the stuff to the right.
For example, the first line would be

`'beer/name: Sausa Weizen\n',` => `'beer/name': 'Sausa Weizen'`

Make sure to clean up the line endings too.

- Hint: Check out the [python string methods](https://docs.python.org/3/library/stdtypes.html#string-methods)

You can check your function against `expected` by evaluating the next cell.
If you get a failure, adjust your `format_review` until it passes.

In [None]:
import unittest
from typing import List, Dict

f.seek(0);  # make the cell idempotent
review = list(takewhile(lambda x: x != '\n', f))


def format_review(review: List[str]) -> Dict[str, str]:
    """Your code goes below"""
    

class TestFormat(unittest.TestCase):
    maxDiff = None

    def test_format_review(self):
        result = format_review(review)
        expected = {
            'beer/ABV': '5.00',
            'beer/beerId': '47986',
            'beer/brewerId': '10325',
            'beer/name': 'Sausa Weizen',
            'beer/style': 'Hefeweizen',
            'review/appearance': '2.5',
            'review/aroma': '2',
            'review/overall': '1.5',
            'review/palate': '1.5',
            'review/profileName': 'stcules',
            'review/taste': '1.5',
            'review/text': 'A lot of foam. But a lot.\tIn the smell some banana, and then lactic and tart. Not a good start.\tQuite dark orange in color, with a lively carbonation (now visible, under the foam).\tAgain tending to lactic sourness.\tSame for the taste. With some yeast and banana.\t\t',
            'review/time': '1234817823'
        }
        self.assertEqual(result, expected)

suite = unittest.TestLoader().loadTestsFromModule(TestFormat())
unittest.TextTestRunner().run(suite)

In [None]:
%load solutions/groupby_format_review.py

Notice that optional argument to split, which controls the number of splits made; If a review text had contained a literal `': '`, we'd be in trouble since it'd get split again.

Make sure you executed the above solution cell twice (first to load, second to execute) as we'll be using that `format_review` function down below

## To a DataFrame

Assuming we've processed many reviews into a list, we'll then build up a DataFrame.

In [None]:
r = [format_review(first)]  # imagine a list of many reviews

col_names = {
    'beer/ABV': 'abv',
    'beer/beerId': 'beer_id',
    'beer/brewerId': 'brewer_id',
    'beer/name': 'beer_name',
    'beer/style': 'beer_style',
    'review/appearance': 'review_appearance',
    'review/aroma': 'review_aroma',
    'review/overall': 'review_overall',
    'review/palate': 'review_palate',
    'review/profileName': 'profile_name',
    'review/taste': 'review_taste',
    'review/text': 'text',
    'review/time': 'time'
}
df = pd.DataFrame(r)
numeric = ['abv', 'review_appearance', 'review_aroma',
           'review_overall', 'review_palate', 'review_taste']
df = (df.rename(columns=col_names)
        .replace('', np.nan))
df[numeric] = df[numeric].astype(float)
df['time'] = pd.to_datetime(df.time.astype(int), unit='s')
df

Again, writing that as a function:

In [None]:
def as_dataframe(reviews):
    col_names = {
        'beer/ABV': 'abv',
        'beer/beerId': 'beer_id',
        'beer/brewerId': 'brewer_id',
        'beer/name': 'beer_name',
        'beer/style': 'beer_style',
        'review/appearance': 'review_appearance',
        'review/aroma': 'review_aroma',
        'review/overall': 'review_overall',
        'review/palate': 'review_palate',
        'review/profileName': 'profile_name',
        'review/taste': 'review_taste',
        'review/text': 'text',
        'review/time': 'time'
    }
    df = pd.DataFrame(list(reviews))
    numeric = ['abv', 'review_appearance', 'review_aroma',
               'review_overall', 'review_palate', 'review_taste']
    df = (df.rename(columns=col_names)
            .replace('', np.nan))
    df[numeric] = df[numeric].astype(float)
    df['time'] = pd.to_datetime(df.time.astype(int), unit='s')
    return df

## Full pipeline

1. `file -> review_lines : List[str]`
2. `review_lines -> reviews : Dict[str, str]`
3. `reviews -> DataFrames`
4. `DataFrames -> CSV`

The full pipeline would look something like:

In [None]:
from toolz import partition_all, partitionby


BATCH_SIZE = 100  # Number of reviews to process per chunk
                  # Intentionally small for demostration    


with gzip.open("data/beer-raw-small.txt.gz", "rt") as f:

    # Filter out a null byte at the end
    lines = (x for x in f if not x.startswith('\x00'))
    
    review_lines_and_newlines = partitionby(lambda x: x == '\n', lines)
    # that goes [review, \n, review, \n, ...]
    # so filter out the newlines
    review_lines = filter(lambda x: x != ('\n',), review_lines_and_newlines)
    
    # generator expression to go from List[str] -> Dict[str, str]
    reviews = (format_review(x) for x in review_lines)
    
    # `reviews` yields one dict per review.
    # Won't fit in memory, so do `BATCH_SIZE` per chunk
    chunks = partition_all(BATCH_SIZE, reviews)
    dfs = (as_dataframe(chunk) for chunk in chunks)
    os.makedirs("data/beer/", exist_ok=True)

    # the first time we read from disk
    for i, df in enumerate(dfs):
        df.to_csv("data/beer/chunk_%s.csv.gz" % i, index=False,
                  compression="gzip")
        print(i, end='\r')


This runs comfortably in memory. At any given time, we only have `BATCH_SIZE` reviews in memory.