## CSV files 3: reading larger-than-memory CSV files in batches
By the end of this lecture you will be able to:
- read larger-than-memory datasets with batching

To work with larger-than-memory datasets we must:
- process the dataset in chunks
- combine the chunks into a single output

We refer to each chunk of a dataset as a *batch*. We can read CSV files in batches in Polars.

In the coming lectures we see how to process larger-than-memory datasets using *streaming*. Streaming better as Polars takes care of the batching and has algorithms to combine the chunks correctly for many operations such as groupbys and joins.

We cover manual batching in this lecture to allow you to:
- understand how Polars carries out streaming underneath the hood
- create your own custom batching algorithms


In [None]:
import polars as pl

pl.Config.set_tbl_rows(8)

Although batching is for large datasets we can still do it with a small dataset

In [None]:
csvFile = "../data/titanic.csv"

## Batched reader
We read a CSV in batches by calling `pl.read_csv_batched` with the path of the CSV file. We tell Polars how many lines we want each batch `DataFrame` to be with the `batch_size` argument

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=10)
reader

The `pl.read_csv_batched` function accept all the standard arguments for CSV processing such as setting delimiters or changing column names

At this stage not much work has been done:
- Polars has opened the CSV file
- Polars has calculated some statistics to estimate the length of each line

We can extract some batches from the CSV by calling `next_batch` on the `reader`

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=10)
batches = reader.next_batches(2)

The output of `reader.next_batches` is a `list` of `DataFrames`

In [None]:
print(type(batches))

We inspect the first `DataFrame` in the `list`

In [None]:
batches[0]

We set `batch_size = 10` so we wanted each `DataFrame` to have 10 rows, but this first batch has 74 rows!

The number of rows in each batch is not guaranteed to equal the `batch_size` argument. This is because with a CSV Polars has to estimate how large a batch will be in bytes before reading it.

### Estimating batch size
When Polars opens a CSV file it cannot know:
- how many lines there are in a file
- where each new line starts

As such it cannot know exactly how many bytes to read to get 10 lines.

Polars makes an estimate by first reading a sample of lines to get the mean and standard deviation of their length in bytes. It uses this to estimate the total number of bytes per line.

If a CSV has mainly numerical or datetime data then the number of bytes per row will be very consistent and the actual batch size will closely match `batch_size`.

However if a CSV has text data with variable length then the number of bytes per row will vary considerable and the actual batch size will differ from `batch_size`.

Typically the relative difference between the actual batch size and `batch_size` will be smaller for larger datasets. A small dataset with variable strings like Titanic is the most challenging case.

## Processing batches
If we keep calling `reader.next_batches` it eventually returns a `NoneType` instead of a `list` when it has gone through all the batches

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=10)
batches0 = reader.next_batches(5)
batches1 = reader.next_batches(5)
batches2 = reader.next_batches(5)
batches3 = reader.next_batches(5)
[type(batches0),type(batches1),type(batches2),type(batches3)]

On the last call of `reader.next_batches` the number of `DataFrames` in the list may be smaller

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=10)
batches0 = reader.next_batches(5)
batches1 = reader.next_batches(5)
batches2 = reader.next_batches(5)
[len(batches0),len(batches1),len(batches2)]

## Custom batched algorithm
We do a simple algorithm on a batched CSV to get the sum of the floating point columns. To do this we:
1. create a `reader` by calling `pl.read_csv_batched`
2. get batches of 5 `DataFrames` from `reader` at a time
3. get the sum of the floating point columns for each `DataFrame`
4. get the sum of the floating point columns for the batch of 5 `DataFrames`
5. get the sum of the floating point columns for all the batches

We pre-define the following function to do steps 3 (inside the list comprehension) and 4 (on `pl.concat`)

In [None]:
def sumBatch(batch:list):
    return (
    # Step 4
    pl.concat(
        # Step 3
        [
            (
                df
                .select(
                    pl.col(pl.Float64)
                )
                .sum()
            ) for df in batch
        ]
    ).sum()
)

We now process the all of the batches in the cell below.

We do step 1 to create `reader`

We do step 2 in a `while` loop that stops when `reader.next_batches` returns a `NoneType`

We do step 5 by calling `pl.concat().sum()` at the end

In [None]:
# Step 1
reader = pl.read_csv_batched(csvFile,batch_size=5)
proceed = True
df_list = []
# Step 2
while proceed:
    batch = reader.next_batches(3)
    if not isinstance(batch,list):
        proceed = False
    else:
        df_list.append(sumBatch(batch=batch))
# Step 5
(
    pl.concat(df_list)
    .sum()
)

This example shows why it is nice that Polars can do this with built-in streaming functions for many operations! 

## Exercises
In the exercises you will develop your understanding of:
- reading a CSV in batches
- developing a batched algorithm

### Exercise 1

Get the average of the `Age` and `Fare` columns by batch processing the CSV file.

This is a trickier exercise. If you want a challenge you can implement it yourself, otherwise you can use the step-by-step approach below.

Compare your answer with this non-batched version

In [None]:
(
    pl.read_csv(csvFile)
    .select(["Age","Fare"])
    .mean()
)

#### Step-by-step approach

Re-use the algorithm above to get the **sum** of columns using a batched approach. Rename `df_list` to `df_sum_list`

Add a function called `countBatch` that counts the number of rows in each batch. Only the rows without `null` values should be counted!

Get the sum of all the batches in `df_sum_list` and the total count of all the batches in `df_count_list`. Concatenate all the batched to get the total sum and the total count of rows

Divide the sum of all the batches by the total count of all the batches - compare with the non-batched solution

## Solutions

### Solution to exercise 1
Re-use the algorithm above to get the sum of columns using a batched approach

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=5)
proceed = True
df_sum_list = []
while proceed:
    batch = reader.next_batches(3)
    if not isinstance(batch,list):
        proceed = False
    else:
        df_sum_list.append(sumBatch(batch=batch))
        
pl.concat(df_sum_list).sum()

Add a function called `countBatch` that counts the number of rows in each batch. Only the rows without `null` values should be counted!

In [None]:
def countBatch(batch:list):
    return (
    pl.concat(
        [
            (
                df
                .select(
                    [
                        pl.col("Age").filter(pl.col("Age").is_not_null()).count(),
                        pl.col("Fare").filter(pl.col("Fare").is_not_null()).count()
                    ]
                )
                
            ) for df in batch
        ]
    ).sum()
)

Get the sum of all the batches in `df_sum_list` and the total count of all the batches in `df_count_list`. Concatenate all the batched to get the total sum and the total count of rows

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=5)
proceed = True
df_sum_list = []
df_count_list = []
while proceed:
    batch = reader.next_batches(3)
    if not isinstance(batch,list):
        proceed = False
    else:
        df_sum_list.append(sumBatch(batch=batch))
        df_count_list.append(countBatch(batch=batch))
pl.concat(df_sum_list).sum()
pl.concat(df_count_list).sum() 

Divide the sum of all the batches by the total count of all the batches 

In [None]:
reader = pl.read_csv_batched(csvFile,batch_size=5)
proceed = True
df_sum_list = []
df_count_list = []
while proceed:
    batch = reader.next_batches(3)
    if not isinstance(batch,list):
        proceed = False
    else:
        df_sum_list.append(sumBatch(batch=batch))
        df_count_list.append(countBatch(batch=batch))
        
(
    pl.concat(df_sum_list)
    .sum()
) / (
    pl.concat(df_count_list)
    .sum()
)