# Reading Parquet Files - RAM/CPU Optimization 

The Bengali AI dataset is used to explore the different methods available for reading Parquet files (pandas + pyarrow).

A common source of trouble for Kernel Only Compeitions, is Out-Of-Memory errors, and the 120 minute submission time limit.

This notebook contains:
- Syntax and performance for reading Parquet via both Pandas and Pyarrow
- Kaggle Kernel RAM/CPU allocation
  - 18G RAM
  - 2x Intel(R) Xeon(R) CPU @ 2.00GHz CPU
- RAM optimized generator function around `pandas.read_parquet()`
  - trade 50% RAM (1700MB -> 780MB) for 2x disk IO time (5.8min -> 10.2min runtime) 
- RAM/CPU profiling of implict dataframe dtype casting 
  - beware of implicit cast between `unit8` -> `float64` = 8x memory usage
  - `skimage.measure.block_reduce(train, (1,2,2,1), func=np.mean, cval=0)` can downsample images


# RAM/CPU Available In Kaggle Kernel

In theory there is 18GB of Kaggle RAM, but loading the entire dataset at once often causes out of memory errors, and doesn't leave anything for the tensorflow model. In practice, datasets need to be loaded one file at a time (or even 75% of a file) to permit a successful compile and submission run.

In [1]:
!free -h

              total        used        free      shared  buff/cache   available
Mem:            18G        898M         17G        928K        757M         17G
Swap:            0B          0B          0B


2x Intel(R) Xeon(R) CPU @ 2.00GHz CPU

In theory this might allow for optimizations using `pathos.multiprocessing`

In [2]:
!cat /proc/cpuinfo

processor	: 0
vendor_id	: GenuineIntel
cpu family	: 6
model		: 85
model name	: Intel(R) Xeon(R) CPU @ 2.00GHz
stepping	: 3
microcode	: 0x1
cpu MHz		: 2000.160
cache size	: 39424 KB
physical id	: 0
siblings	: 4
core id		: 0
cpu cores	: 2
apicid		: 0
initial apicid	: 0
fpu		: yes
fpu_exception	: yes
cpuid level	: 13
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat md_clear arch_capabilities
bugs		: cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf m

# Available Libaries

Both `pandas` and `pyarrow` are the two possible libaries to use

NOTE: `parquet` and `fastparquet` are not in the Kaggle pip repo, even with the latest available docket images. Whilst these can be obtained via `!pip install parquet fastparquet`, this requires an internet connection which is not allowed for Kernel only competitions.

In [3]:
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
from pyarrow.parquet import ParquetFile

try:   import parquet
except Exception as exception: print(exception)
    
try:   import fastparquet
except Exception as exception: print(exception)    

No module named 'parquet'
No module named 'fastparquet'


Other imports

In [4]:
import pandas as pd
import numpy as np
import pyarrow
import glob2
import gc
import time
import sys
import humanize
import math
import time
import psutil
import gc
import simplejson
import skimage
import skimage.measure
from timeit import timeit
from time import sleep
from pyarrow.parquet import ParquetFile
import pyarrow
import pyarrow.parquet as pq

pd.set_option('display.max_columns',   500)
pd.set_option('display.max_colwidth',   -1)

# Read Parquet via Pandas

Pandas is the simplest and recommended option
- it takes 40s seconds to physically read all the data
- pandas dataset is 6.5GB in RAM. 


In [5]:
!python --version  # Python 3.6.6 :: Anaconda, Inc == original + latest docker (2020-03-14)

Python 3.6.6 :: Anaconda, Inc.


In [6]:
pd.__version__  # 0.25.3 == original + latest docker (2020-03-14)

'0.25.3'

In [7]:
filenames = sorted(glob2.glob('../input/bengaliai-cv19/train_image_data_*.parquet')); filenames

['../input/bengaliai-cv19/train_image_data_0.parquet',
 '../input/bengaliai-cv19/train_image_data_1.parquet',
 '../input/bengaliai-cv19/train_image_data_2.parquet',
 '../input/bengaliai-cv19/train_image_data_3.parquet']

In [8]:
def read_parquet_via_pandas(files=4, cast='uint8', resize=1):
    gc.collect(); sleep(5);  # wait for gc to complete
    memory_before = psutil.virtual_memory()[3]
    # NOTE: loading all the files into a list variable, then applying pd.concat() into a second variable, uses double the memory
    df = pd.concat([ 
        pd.read_parquet(filename).set_index('image_id', drop=True).astype('uint8')
        for filename in filenames[:files] 
    ])
    memory_end= psutil.virtual_memory()[3]        

    print( "sys.getsizeof total", humanize.naturalsize(sys.getsizeof(df)) )
    print( "memory total",        humanize.naturalsize(memory_end - memory_before), '+system', humanize.naturalsize(memory_before) )        
    return df


gc.collect(); sleep(2);  # wait for gc to complete
print('single file:')
time_start = time.time()
read_parquet_via_pandas(files=1); gc.collect()
print( "time: ", time.time() - time_start )
print('----------')
print('pd.concat() all files:')
time_start = time.time()
read_parquet_via_pandas(files=4); gc.collect()
print( "time: ", time.time() - time_start )
pass

single file:
sys.getsizeof total 1.6 GB
memory total 1.8 GB +system 987.4 MB
time:  22.754446029663086
----------
pd.concat() all files:
sys.getsizeof total 6.5 GB
memory total 6.6 GB +system 1.1 GB
time:  66.73364019393921


# Read Parquet via PyArrow

Creating a `ParquetFile` is very quick, and memory efficent. It only creates a pointer to the file, but allows us to read the metadata.

However there the dataset only contains a single `row_group`, meaning the file can only be read out as a single chunk (no easy row-by-row streaming)

In [9]:
import pyarrow
import pyarrow.parquet as pq
from pyarrow.parquet import ParquetFile

pyarrow.__version__  # 0.16.0 == original + latest docker (2020-03-14)

'0.16.0'

In [10]:
# DOCS: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
def read_parquet_via_pyarrow_file():
    pqfiles = [ ParquetFile(filename) for filename in filenames ]
    print( "sys.getsizeof", humanize.naturalsize(sys.getsizeof(pqfiles)) )
    for pqfile in pqfiles[0:1]: print(pqfile.metadata)
    return pqfiles

gc.collect(); sleep(2);  # wait for gc to complete
time_start = time.time()
read_parquet_via_pyarrow_file(); gc.collect()
print( "time: ", time.time() - time_start )
pass

sys.getsizeof 96 Bytes
<pyarrow._parquet.FileMetaData object at 0x7f3f38cd9f98>
  created_by: parquet-cpp version 1.4.1-SNAPSHOT
  num_columns: 32334
  num_rows: 50210
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 6089354
time:  1.4875917434692383


Using a pyarrow.Table is faster than pandas (`28s` vs `45s`), but uses more memory (`7.6GB` vs `6.5GB`) and causes an Out-Of-Memory exception if everything is read at once

In [11]:
# DOCS: https://arrow.apache.org/docs/python/parquet.html
# DOCS: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html
# NOTE: Attempting to read all tables into memory, causes an out of memory exception
def read_parquet_via_pyarrow_table():
    shapes  = []
    classes = []
    sizes   = 0
    for filename in filenames:
        table = pq.read_table(filename) 
        shapes.append( table.shape )
        classes.append( table.__class__ )
        size = sys.getsizeof(table); sizes += size
        print("sys.getsizeof:",   humanize.naturalsize(sys.getsizeof(table))  )        
    print("sys.getsizeof total:", humanize.naturalsize(sizes) )
    print("classes:", classes)
    print("shapes:",  shapes)    


gc.collect(); sleep(2);  # wait for gc to complete
time_start = time.time()
read_parquet_via_pyarrow_table(); gc.collect()
print( "time: ", time.time() - time_start )
pass

sys.getsizeof: 1.9 GB
sys.getsizeof: 1.9 GB
sys.getsizeof: 1.9 GB
sys.getsizeof: 1.9 GB
sys.getsizeof total: 7.6 GB
classes: [<class 'pyarrow.lib.Table'>, <class 'pyarrow.lib.Table'>, <class 'pyarrow.lib.Table'>, <class 'pyarrow.lib.Table'>]
shapes: [(50210, 32334), (50210, 32334), (50210, 32334), (50210, 32334)]
time:  29.592018842697144


A generator can be written around pyarrow, but this still reads the contents of an entire file into memory and this function is really slow

In [12]:
import time, psutil, gc

gc.collect(); sleep(2)  # wait for gc to complete
mem_before   = psutil.virtual_memory()[3]
memory_usage = []

def read_parquet_via_pyarrow_table_generator(batch_size=128):
    for filename in filenames[0:1]:  # only loop over one file for demonstration purposes
        gc.collect(); sleep(1)
        for batch in pq.read_table(filename).to_batches(batch_size):
            mem_current = psutil.virtual_memory()[3]
            memory_usage.append( mem_current - mem_before )
            yield batch.to_pandas()
  

time_start = time.time()
count = 0
for batch in read_parquet_via_pyarrow_table_generator():
    count += 1

print( "time:  ", time.time() - time_start )
print( "count: ", count )
print( "min memory_usage: ", humanize.naturalsize(min(memory_usage))  )
print( "max memory_usage: ", humanize.naturalsize(max(memory_usage))  )
print( "avg memory_usage: ", humanize.naturalsize(np.mean(memory_usage)) )
pass    

time:   438.52048230171204
count:  393
min memory_usage:  4.6 GB
max memory_usage:  5.7 GB
avg memory_usage:  5.1 GB


# Pandas Batch Generator Function

It is possible to write a batch generator using pandas. In theory this should save memory, at the expense of disk IO. 

- Timer show that disk IO increase linarly with the number of filesystem reads. 
- Memory measurements require `gc.collect(); sleep(1)`, but show that average/min memory reduces linearly with filesystem reads

There are 8 files to read (including test files in the submission), so the tradeoffs are as follows:
- reads_per_file 1 |  44s * 8 =  5.8min + 1700MB RAM (potentually crashing the kernel)
- reads_per_file 2 |  77s * 8 = 10.2min +  781MB RAM (minimum required to solve the memory bottleneck)
- reads_per_file 3 | 112s * 8 = 14.9min +  508MB RAM (1/8th of total 120min runtime)
- reads_per_file 5 | 183s * 8 = 24.4min +  314MB RAM (1/5th of total 120min runtime)

This is a memory/time tradeoff, but demonstrates a practical solution to out-of-memory errors

In [13]:
memory_before = psutil.virtual_memory()[3]
memory_usage  = []

def read_parquet_via_pandas_generator(batch_size=128, reads_per_file=5):
    for filename in filenames:
        num_rows    = ParquetFile(filename).metadata.num_rows
        cache_size  = math.ceil( num_rows / batch_size / reads_per_file ) * batch_size
        batch_count = math.ceil( cache_size / batch_size )
        for n_read in range(reads_per_file):
            cache = pd.read_parquet(filename).iloc[ cache_size * n_read : cache_size * (n_read+1) ].copy()
            gc.collect(); sleep(1);  # sleep(1) is required to allow measurement of the garbage collector
            for n_batch in range(batch_count):            
                memory_current = psutil.virtual_memory()[3]
                memory_usage.append( memory_current - memory_before )                
                yield cache[ batch_size * n_batch : batch_size * (n_batch+1) ].copy()

                
for reads_per_file in [1,2,3,5]: 
    gc.collect(); sleep(5);  # wait for gc to complete
    memory_before = psutil.virtual_memory()[3]
    memory_usage  = []
    
    time_start = time.time()
    count = 0
    for batch in read_parquet_via_pandas_generator(batch_size=128, reads_per_file=reads_per_file):
        count += 1
        
    print( "reads_per_file", reads_per_file, '|', 
           'time', int(time.time() - time_start),'s', '|', 
           'count', count,  '|',
           'memory', {
                "min": humanize.naturalsize(min(memory_usage)),
                "max": humanize.naturalsize(max(memory_usage)),
                "avg": humanize.naturalsize(np.mean(memory_usage)),
                "+system": humanize.naturalsize(memory_before),               
            }
    )
pass    

reads_per_file 1 | time 46 s | count 1572 | memory {'min': '-1277952 Bytes', 'max': '1.7 GB', 'avg': '821.7 MB', '+system': '4.9 GB'}
reads_per_file 2 | time 82 s | count 1576 | memory {'min': '-352256 Bytes', 'max': '514.7 MB', 'avg': '46.3 MB', '+system': '4.9 GB'}
reads_per_file 3 | time 116 s | count 1572 | memory {'min': '2.3 MB', 'max': '555.6 MB', 'avg': '81.9 MB', '+system': '4.9 GB'}
reads_per_file 5 | time 188 s | count 1580 | memory {'min': '10.6 MB', 'max': '809.1 MB', 'avg': '184.5 MB', '+system': '4.9 GB'}


# Dtypes and Memory Usage

Memory useage can vary by an order of magnitude based on the implcit cast dtype. 

- Raw pixel values are read from the parquet file as `uint8`
- `/ 255.0` or `skimage.measure.block_reduce()` will do an implict cast of `int` -> `float64`
- `float64` results in a datastructure 8x as large as `uint8` (`13.0 GB` vs `1.8 GB`)
  - This can be avoided by doing an explict cast to `float16` (`3.3 GB`)
- `skimage.measure.block_reduce(df, (1,n,n,1), func=np.mean, cval=0)` == `AveragePooling2D(n)` 
  - reduces data structure memory by `n^2` 

CPU time: 
- `float32` (+0.5s) is the fastest cast; `float16` (+8s) is 2x slower than cast `float64` (+4s).
- `skimage.measure.block_reduce()` is an expensive operation (3-5x IO read time)

In [14]:
def read_single_parquet_via_pandas_with_cast(dtype='uint8', normalize=False, denoise=False, invert=True, resize=1):
    gc.collect(); sleep(2);
    
    memory_before = psutil.virtual_memory()[3]
    time_start = time.time()        
    
    train = (pd.read_parquet(filenames[0])
               .set_index('image_id', drop=True)
               .values.astype(dtype)
               .reshape(-1, 137, 236, 1))
    
    if invert:                                         # Colors | 0 = black      | 255 = white
        train = (255-train)                            # invert | 0 = background | 255 = line
   
    if denoise:                                        # Set small pixel values to background 0
        if invert: train *= (train >= 25)              #   0 = background | 255 = line  | np.mean() == 12
        else:      train += (255-train)*(train >= 230) # 255 = background |   0 = line  | np.mean() == 244     
        
    if isinstance(resize, bool) and resize == True:
        resize = 2    # Reduce image size by 2x
    if resize and resize != 1:                  
        # NOTEBOOK: https://www.kaggle.com/jamesmcguigan/bengali-ai-image-processing/
        # Out of the different resize functions:
        # - np.mean(dtype=uint8) produces produces fragmented images (needs float16 to work properly - but RAM intensive)
        # - np.median() produces the most accurate downsampling
        # - np.max() produces an enhanced image with thicker lines (maybe slightly easier to read)
        # - np.min() produces a  dehanced image with thiner lines (harder to read)
        resize_fn = resize_fn or (np.max if invert else np.min)
        cval      = 0 if invert else 255
        train = skimage.measure.block_reduce(train, (1, resize,resize), cval=cval, func=resize_fn)
        
    if normalize:
        train = train / 255.0          # division casts: int -> float64 


    time_end     = time.time()
    memory_after = psutil.virtual_memory()[3] 
    return ( 
        str(round(time_end - time_start,2)).rjust(5),
        # str(sys.getsizeof(train)),
        str(memory_after - memory_before).rjust(5), 
        str(train.shape).ljust(20),
        str(train.dtype).ljust(7),
    )
    

gc.collect(); sleep(2);  # wait for gc to complete

for dtype in ['uint8', 'uint16', 'uint32', 'float16', 'float32']:  # 'float64' caused OOM error
    seconds, memory, shape, dtype = read_single_parquet_via_pandas_with_cast(dtype=dtype)
    print(f'dtype {dtype}'.ljust(18) + f'| {dtype} | {shape} | {seconds}s | {humanize.naturalsize(memory).rjust(8)}')

for denoise in [False, True]:
    seconds, memory, shape, dtype = read_single_parquet_via_pandas_with_cast(denoise=denoise)
    print(f'denoise {denoise}'.ljust(18) + f'| {dtype} | {shape} | {seconds}s | {humanize.naturalsize(memory).rjust(8)}')

for normalize in [False, True]:
    seconds, memory, shape, dtype = read_single_parquet_via_pandas_with_cast(normalize=normalize)
    print(f'normalize {normalize}'.ljust(18) + f'| {dtype} | {shape} | {seconds}s | {humanize.naturalsize(memory).rjust(8)}')
for dtype in ['float16', 'float32']:
    seconds, memory, shape, dtype = read_single_parquet_via_pandas_with_cast(dtype=dtype, normalize=True)
    print(f'normalize {dtype}'.ljust(18) + f'| {dtype} | {shape} | {seconds}s | {humanize.naturalsize(memory).rjust(8)}')
    
# skimage.measure.block_reduce() casts: unit8 -> float64    
for resize in [2, 3, 4]:
    for dtype in ['float16', 'float32', 'uint8']:
        seconds, memory, shape, dtype = read_single_parquet_via_pandas_with_cast(dtype=dtype, resize=resize)
        print(f'resize {resize} {dtype}'.ljust(18) + f'| {dtype} | {shape} | {seconds}s | {humanize.naturalsize(memory).rjust(8)}')

dtype uint8       | uint8   | (50210, 137, 236, 1) |  9.34s | 110.2 MB
dtype uint16      | uint16  | (50210, 137, 236, 1) |  10.9s |   3.2 GB
dtype uint32      | uint32  | (50210, 137, 236, 1) | 18.05s |   6.5 GB
dtype float16     | float16 | (50210, 137, 236, 1) | 57.95s |   3.3 GB
dtype float32     | float32 | (50210, 137, 236, 1) | 13.77s |   6.5 GB
denoise False     | uint8   | (50210, 137, 236, 1) |   9.7s |  50.7 MB
denoise True      | uint8   | (50210, 137, 236, 1) | 11.25s | -6905856 Bytes
normalize False   | uint8   | (50210, 137, 236, 1) |  9.31s | 153.7 MB
normalize True    | float64 | (50210, 137, 236, 1) | 15.53s |  13.0 GB
normalize float16 | float16 | (50210, 137, 236, 1) | 95.19s |   3.3 GB
normalize float32 | float32 | (50210, 137, 236, 1) |  15.6s |   6.5 GB


UnboundLocalError: local variable 'resize_fn' referenced before assignment