# Superando el reto del billón de filas con Python

In [None]:
from src.utils import generate_sample_data, generate_heatmap

from pathlib import Path
import csv
from zipfile import ZipFile
from io import TextIOWrapper
import shutil
import timeit

import numpy as np
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.feather as feather
import duckdb
import ibis
import modin.pandas as mpd
import dask.dataframe as dd

In [None]:
path = Path('.')
(path / 'data').mkdir(mode=0o775, exist_ok=True)
path_data = path / 'data'
filename = "sample50M"

# ¿Qué vamos a hacer?

* Vamos a seleccionar 2 operaciones de consulta sobre los datos cargados en memoria.
* Vamos a reflexionar sobre operaciones costosas y operaciones más ligeras.
* Vamos a ejecutar estas operaciones sobre las combinaciones de ficheros y herramientas que parecen cargar más rápidamente los datos en memoria y compararemos los resultados.
* ...

# Operaciones a ejecutar

Ejecutaremos 2 operaciones de consulta y compararemos su rendmiento:
1. Primero, una operación sencilla como un simple conteo de filas: operación **count**.
2. Y segundo, una operación más compleja, como calcular la media por tipo de producto y ordenar los resultados de forma decreciente por media: operación **average**.

Preguntas:
* ¿De qué forma podríamos implementar estas operaciones sobre un fichero CSV?
* ¿Y cómo podríamos aprovechar al máximo nuestras CPUs para que la ejecución sea más rápida?

Algunas preguntas adicionales:
* Para hacer un conteo de filas, ¿necesitamos leer todas las columnas?
* En el caso de ficheros como Parquet que contienen metadatos, ¿los podríamos usar para acelerar el conteo de filas?
* En el cálculo de la media, ¿qué operación es la más costosa?, ¿el cálculo de la media?, ¿hacer la agrupación de valores?, ¿la ordenación por medias final?

# Selección de formatos y herramientas

Dado que en la sección anterior hemos analizar los formatos y herramientas más convenientes para nuestro objetivo, vamos a centrarnos en este notebook en los que nos han dado mejor rendimiento.

In [None]:
resultados = pd.DataFrame(
    index=('duckdb', 'polars', 'pyarrow'),
    columns=(
        'csv - count',
        'feather - count',
        'feather - average',
        'parquet - count',
        'parquet - average',
    )
)

Importante: vamos a ejecutar las dos operaciones cargando cada vez los datos, cuando lo recomendable sería cargar los datos primero y ejecutar ambas operaciones con los datos ya cargados en memoria, para evitar que en la segunda operación tengamos que cargar los datos otra vez. De esta forma podremos comparar mejor el rendimiento de las herramientas y formatos que usaremos.

# Consulto Feather con PyArrow

In [None]:
def count_with_feather_pyarrow():
    with pa.memory_map(f'{path_data}/{filename}.feather', 'r') as source:
        table = pa.ipc.open_file(source).read_all()
    return len(table)

kk = %timeit -r 3 -o count_with_feather_pyarrow()

In [None]:
resultados.loc['pyarrow', 'feather - count'] = kk.average

In [None]:
resultados.transpose()

In [None]:
def average_with_feather_pyarrow():
    with pa.memory_map(f'{path_data}/{filename}.feather', 'r') as source:
        table = pa.ipc.open_file(source).read_all()
    return table.group_by("product").aggregate([
        ("price", "mean")
    ]).sort_by([("price_mean", "descending")])

kk = %timeit -r 3 -o average_with_feather_pyarrow()

In [None]:
resultados.loc['pyarrow', 'feather - average'] = kk.average

In [None]:
resultados.transpose()

# Consulto Parquet con DuckDB

Uno de los problemas que tenemos a la hora de medir el rendimiento de DuckDB es que tiene un sistema de caché muy efectivo. Y por lo tanto, repetir muchas veces la misma consulta lo único que hace es medir lo buena o mala que es esa caché. Por este motivo vamos a cambiar la medición para tomar únicamente 2 ejecuciones y quedarnos con la media de esos dos valores (siguiendo la idea de [H2O.ai Database-like Ops Benchmark](https://duckdb.org/2023/04/14/h2oai.html)):

In [None]:
kk = %timeit -r 1 -n 2 -o duckdb.sql(f"SELECT COUNT(*) FROM read_parquet('./{str(path_data)}/{filename}.parquet')")

In [None]:
resultados.loc['duckdb', 'parquet - count'] = kk.average

Si en lugar de usar SQL, prefieres usar Ibis, puedes escribir el mismo código de la siguiente forma:

In [None]:
def count_with_ibis():
    con = ibis.connect('duckdb://')
    table = ibis.read_parquet(f'./{str(path_data)}/{filename}.parquet')
    return table.count().execute()

kk = %timeit -r 1 -n 2 -o count_with_ibis()

In [None]:
resultados.transpose()

* ¿Por qué DuckDB con Parquet es tan rápido en el conteo?, ¿qué puede estar haciendo para tener este buen rendimiento?

Probemos ahora cargando el fichero CSV...

In [None]:
kk = %timeit -r 1 -n 2 -o duckdb.sql(f"SELECT COUNT(*) FROM read_csv('./{str(path_data)}/{filename}.csv')")

In [None]:
resultados.loc['duckdb', 'csv - count'] = kk.average

In [None]:
resultados.transpose()

El principal motivo es que DuckDB utiliza los metadatos que el fichero Parquet contiene para que su ejecución sea muy rápida y evita tener que cargar todos los datos del fichero Parquet en memoria para calcular el conteo.

Veamos ahora qué tal se comporta en el cálculo de la media por producto:

In [None]:
kk = %timeit -r 1 -n 2 -o duckdb.sql(f"SELECT product, AVG(price) FROM read_parquet('./{str(path_data)}/{filename}.parquet') GROUP BY product ORDER BY AVG(price) DESC")

DuckDB sigue la siguiente estrategia para ejecutar la anterior instrucción:
1. Crear tantos hilos como CPUs haya disponibles en nuestro PC.
2. Cada uno de estos hilos se encargan de leer una parte diferente del fichero Parquet hasta que ya no quedan filas por leer (más información: https://duckdb.org/2024/07/09/memory-management.html).
3. Para almacenar los resultados, crea una tabla hash en la memoria principal en la que cada clave de la tabla es un producto (A, B, C, D, E o F).
4. Si esa tabla hash no cabe en la memoria principal, escribe partes de esta tabla hash en la memoria secundaria (más información: https://duckdb.org/2024/03/29/external-aggregation.html). Y si no hay espacio suficiente en la memoria secundaria da un error por falta de memoria.
5. Y después de leer todas las filas del fichero Parquet devuelve los resultados después de ordenarlos en memoria principal.

In [None]:
resultados.loc['duckdb', 'parquet - average'] = kk.average

También, si en lugar de usar SQL prefieres utilizar Ibis, podrías escribir el mismo código de la siguiente forma:

In [None]:
def average_with_ibis():
    con = ibis.connect('duckdb://')
    table = ibis.read_parquet(f'./{str(path_data)}/{filename}.parquet')
    return (table.group_by('product')
            .aggregate(avg_price=table['price'].mean())
            .order_by(ibis.desc('avg_price'))
            .execute())

kk = %timeit -r 1 -n 2 -o average_with_ibis()

In [None]:
resultados.transpose()

# Consulto Parquet con Polars

En Polars también podemos utilizar los metadatos de Parquet para optimizar el conteo de filas. Para ello, debemos usar el tipo `LazyFrame` y el método `collect()` que evita cargar el fichero Parquet en la memoria principal para contar las filas:

In [None]:
def count_with_parquet_polars():
    lf = pl.scan_parquet(path_data / f"{filename}.parquet")
    return lf.select(pl.len()).collect()
    
kk = %timeit -r 3 -o count_with_parquet_polars()

In [None]:
resultados.loc['polars', 'parquet - count'] = kk.average

In [None]:
resultados.transpose()

In [None]:
def average_with_parquet_polars():
    lf = pl.scan_parquet(path_data / f"{filename}.parquet")
    return lf.group_by("product").mean().sort("price", descending=True).collect()

kk = %timeit -r 3 -o average_with_parquet_polars()

In [None]:
resultados.loc['polars', 'parquet - average'] = kk.average

In [None]:
resultados.transpose()

En este caso vemos que el tiempo dedicado en la operación **average** es mucho mayor que en el conteo, ¿cómo podríamos saber a qué operaciones dedica más tiempo Polars?

Una forma muy sencilla de hacerlo es usar `profile()` en lugar de `collect()`. También, podemos pasar el parámetro `profile(show_plot=True)` para ver adicionalmente un gráfico con matplotlib. Por ejemplo, para el conteo de filas:

In [None]:
lf = pl.scan_parquet(path_data / f"{filename}.parquet")
lf.select(pl.len()).profile(show_plot=True)

¿Podrías hacer lo mismo para la operación **average**?, ¿qué conclusiones podemos sacar de los resultados obtenidos?, ¿se te ocurre alguna forma de acelerar el nodo más lento?

In [None]:
# Escribe aquí el código para mostrar el tiempo dedicado a cada paso para el cálculo de la operación average en Polars



Finalmente, representaremos los datos obtenidos para visualizar de manera más clara la comparación entre todos los resultados de esta sección.

In [None]:
generate_heatmap(resultados.transpose(), 'Heatmap de los tiempos de consulta de datos (seg.).')

# Ejercicio final

Una vez que hemos visto diferentes técnicas y formatos de ficheros, ¿cuál es el tamaño máximo de fichero que puedes ejecutar en tu portátil? Escoge un formato de fichero y una herramienta e intenta ver cuál es el tamaño máximo que puedes ejecutar en un tiempo razonable tanto para la operación **average** (por ejemplo, menos de 60 segundos).

In [None]:
# Genera primero los ficheros que necesites...

# Por ejemplo, si quieres generar un Parquet de 500M de filas puedes hacerlo con la siguiente instrucción:
generate_sample_data(
    lines=500_000_000,
    fmt = 'parquet',
    filename = f'{str(path_data)}/sample500M'
)

In [None]:
# Escribe aquí el código para calcular la operación average con el máximo número de filas que puedas en menos de 60 segundos




## Apéndice

Aquí mostramos tiempos del código anterior ejecutado en otros PCs

---
SO: linux (Kernel: 5.15.0-122-generic x86_64)

Procesador: AMD Ryzen 9 3900X (cache: L1: 768 KiB L2: 6 MiB L3: 64 MiB)

HDD: Crucial model: CT1000BX500SSD1 size: 931.51 GiB speed: 6.0 Gb/s type: SSD serial

RAM: 16GiB DIMM DDR4 Speed 2666 MT/s (x 4) = 64 GiB

![Resultados](./images/resultados_kiko_desktop_linux_consultando.png)

---
SO: Windows 10 Enterprise (64 bits)

Procesador: 13th Gen Intel(R) Core(TM) i7-1365U

HDD: NVMe CL4-3D512-Q11 NVMe SSSTC 512GB

RAM: 2 x 8 GB 3200 MHz DDR4-SDRAM = 16 GB 

![Resultados](./images/resultados_kiko_pccurr_windows_consultando.png)

---
SO: macOS Sequoia (15.0)

Procesador: Apple M2 con 8 núcleos

HDD: APPLE SSD AP0512Z 512 GB

RAM: LPDDR5 16 GB

![Resultados](./images/resultados_jordi_mba_consultando.png)

---
SO: Windows 11 Pro 2 (64 bits)

Procesador: 14th Intel(R) Core(TM) i7-14700HX  

HDD: NVMe™ TLC M.2 de 1 TB SSD 

RAM: 32 GB de RAM DDR5-4800 MHz (2 x 16 GB)

![Resultados](./images/resultados_ernesto_windows_consultando.png)