# Streaming approach

- Lists vs generators
- Pandas streaming approach
- Pandas + pickle
- Pandas methods vs python in built

In [None]:
# this is the base of pandas - provides high performance narray storage (much faster than python lists)
import numpy as np

# pandas builds on top of numpy to provide extended features (data)
import pandas as pd

# e.g. can see numpy in pandas source code
# https://github.com/pandas-dev/pandas/blob/master/pandas/core/arrays/masked.py

import os
from pathlib import Path
from datetime import datetime as dt

## Lists vs generators

In [None]:
nums = [1,2,3,4,5]

# define a new list with list comprehension
nums2_list = [ n**2 for n in nums ]

# define a new generator with genexp
nums2_gen = ( n**2 for n in nums )

### Guess the output

In [None]:
nums2_list[2]

In [None]:
nums2_gen[2]

In [None]:
nums2_list

In [None]:
nums2_gen

In [None]:
len(nums2_list)

In [None]:
len(nums2_gen)

In [None]:
for n in nums2_list:
    print(n)

In [None]:
for n in nums2_gen:
    print(n)

In [None]:
for n in nums2_gen:
    print(n)

In [None]:
next(nums2_list)

In [None]:
next(nums2_gen)

## Pandas streaming approach

In [None]:
data_dir = Path(os.getcwd()) / 'data'

input_path = data_dir / 'journals-large.csv'
output_path = data_dir / 'journals-large-out.csv'

if (output_path.is_file()):
    print('output file exists')

In [None]:
chunks = pd.read_csv(
    input_path,
    chunksize=200000
)

In [None]:
chunks

In [None]:
chunks[3]

In [None]:
len(chunks)

In [None]:
chunk = next(chunks)
chunk.head()

In [None]:
len(chunk.index)

In [None]:
del chunks, chunk

In [None]:
chunks = pd.read_csv(
    input_path,
    chunksize=1000000
)

In [None]:
def time_diff(start_time):
    return round((dt.now() - start_time).total_seconds(), 2)

In [None]:
def process_and_output(chunk, headers: bool):
    
    # whatever processing you need per chunk in here...
    
    start_calcs = dt.now()
    # remove Journal from journal ID column
    chunk.loc[:, 'Journal ID clean'] = chunk.loc[:, 'Journal ID'].str.replace('Journal ', '', regex=False)
    
    # divide amount by 100 
    chunk.loc[:, 'Amount divided'] = chunk.loc[:, 'Amount'] / 100
    
    print(f'Calculations:\t{time_diff(start_calcs)}')
    
    grouped = chunk.groupby(['Journal ID'])
    
    start_output = dt.now()
    # doesn't have to be here - could return the chunk and output elsewhere
    chunk.to_csv(
        output_path,
        index=False,      # don't output the index
        header=headers,   # only output headers when this is true (first chunk)
        mode='a',         # APPEND - need this or each chunk will override - CHECK FILE DOESN'T EXIST ALREADY
    )    
    
    print(f'Disk output:\t{time_diff(start_output)}')

In [None]:
start = dt.now()
row_count = 0

for idx, chunk in enumerate(chunks):
    
    # next chunk is loaded into memory
    start_chunk = dt.now()
    
    # add to row count
    row_count += len(chunk.index)
    
    # do whatever we need to with this chunk
    print(f'Processing chunk {idx+1}')
    process_and_output(chunk, idx == 0)
    print(f'Chunk complete:\t{time_diff(start_chunk)}\n')
    
    # when the loop goes to the next chunk, the reference to the previous is removed (python will garbage collect it)
    
print(f'Processed {row_count} rows in {time_diff(start)}')

## Disk operations vs in memory

- Reading or writing to disk is **orders of magitude** slower than in memory operations (e.g. on the CPU or GPU)
- Optimise streaming approach to have as much in memory as possible at a time

## Pandas optimised functions

In [None]:
df = pd.read_csv(output_path)
df.head()

In [None]:
df.loc[:, 'Amount'] = df.loc[:, 'Amount'].astype(float)

In [None]:
%timeit df['Amount'].

In [None]:
def divide(amount):
    return amount /10

%timeit df['Amount'].apply(divide)

## Pandas pickle

- Faster and somewhat compressed data storage (binary)

In [None]:
pickle_path = data_dir / 'journal-pickle.pkl'
df.to_pickle(data_dir / 'journal-pickle.pkl')

In [None]:
del df

In [None]:
%timeit pd.read_pickle(data_dir / 'journal-pickle.pkl')
# 5.22 s ± 26.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [None]:
%timeit pd.read_csv(output_path)
# 14.7 s ± 144 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

## Other libraries

- Dask! - keen to explore, can do grouping and aggregate calculations with data on disk