<font size=6>

<b>Curso de Análisis de Datos con Python</b>
</font>

<font size=4>
    
Curso de formación interna, CIEMAT. <br/>
Madrid, Junio de 2023

Antonio Delgado Peris (Cristina Labajo Villaverde)
</font>

https://github.com/andelpe/curso-python-analisis-datos

<br/>

# Tema 10 - Manejo de grandes volúmenes de datos y computación paralela

## Objetivos

TODO: finish this

- Datasets de gran tamaño.

- Larger-than-memory

- Polars

- Dask

# Trabajando con datasets de gran tamaño

## Problema

Al usar algo como ``numpy.load`` o ``pandas.read_*``, cargamos en memoria (a un ndarray o DataFrame) todos los datos de un fichero. Si el tamaño del ficheros es muy grande, estaremos ralentizando nuestro equipo (o, peor, un equipo compartido), o incluso podremos llegar a desbordar la RAM disponible, y producir un error.

Vamos a limitar nuestra memoria artificalmente, para ilustrar el problema.

In [None]:
import resource

In [None]:
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
print(soft, hard)

In [None]:
LOW_LIMIT = 6*1024*1024*1024  # 6 GB !
resource.setrlimit(resource.RLIMIT_AS, (LOW_LIMIT, hard))

In [None]:
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
print(soft, hard)

<br/>

También vamos a preparar una utilidad para controlar la memoria capturada por Python

In [None]:
import os, psutil, gc

def getPythonMemory():
    process = psutil.Process()
    print(process.memory_info().rss/1024/1024)  # in MB

In [None]:
getPythonMemory()

<br/>

Ahora vamos a intentar leer ficheros de gran tamaño con Pandas.

El primer caso debería funcionar sin problemas (fichero de 2.3 MB)

In [None]:
import pandas as pd
fNormal = "../data/kc_house_data.csv"
fBig = "../data/BIG_house_data.csv"

In [None]:
df = pd.read_csv(fNormal)

In [None]:
getPythonMemory()

Sin embargo, el siguiente ejemplo debería fallar (fichero de 1.2 GB)

In [None]:
df = pd.read_csv(fBig)

In [None]:
getPythonMemory()

## Posibles soluciones

Vamos a explorar algunas posibilidades para afrontar este problema (existen otras).

1. Leer solo los datos que realmente necesitamos
2. Utilizar tipos de datos más pequeños
3. Trocear el archivo
4. Usar otras librerías como Polars
5. Utilizar sistemas distribuidos como Dask

### Leer solo los datos que realmente necesitamos

In [None]:
df = pd.read_csv(fNormal)

In [None]:
df.info(verbose=False, memory_usage="deep")

In [None]:
df.head()

Vamos a leer solo 3 columnas de nuestro fichero. Veremos que el DataFrame ocupa menos memoria.

In [None]:
df2 = pd.read_csv(fNormal, usecols=["id", "price", "bedrooms"])                               
df2.info(verbose=False, memory_usage="deep")

<br/>

Ahora probamos lo mismo con el fichero de mayor tamaño. Esta vez deberíamos tener éxito.
Nota: previamente, intentamos limpiar la memoria ocupada anteriormente.

In [None]:
getPythonMemory()

In [None]:
del df
del df2
del pd

In [None]:
gc.collect()

In [None]:
getPythonMemory()

In [None]:
import pandas as pd

In [None]:
df2 = pd.read_csv(fBig, usecols=["id", "price", "bedrooms"])                               
df2.info(verbose=False, memory_usage="deep")

### Utilizar tipos de datos más pequeños

Por defecto, Pandas utilizar `int64` para datos enteros, pero si el rango de nuestros valores es pequeño, podemos utilizar un tipo más reducido.

In [None]:
df2['bedrooms'].dtype

In [None]:
df2['bedrooms'].max()

In [None]:
df3 = pd.read_csv(fNormal, usecols=["id", "price", "bedrooms"], dtype={'bedrooms': 'int8'})
df3.info(verbose=False, memory_usage="deep")

<br/>
En otras ocasiones, podemos aplicar un cambio similar aunque perdamos algo de precisión. P.ej. si pasamos
de `float64` a `float32` (o incluso `int`).

In [None]:
df3.price.head()

In [None]:
df4 = df3.copy()

In [None]:
df4.price = df4.price.astype('int32')

In [None]:
df4.info(verbose=False, memory_usage="deep")

In [None]:
# Comprobamos que no hay diferencia entre las dos columnas
df3.price.compare(df4.price)

En otras ocasiones, podríamos aplicar un cambio similar aunque perdamos algo de precisión. P.ej. si pasamos
de `float64` a `float32` (o incluso `int`).


Finalmente, otra posibilidad sería convertir determinadas columnas numéricas a tipos categóricos, que como ya vimos en el tema 5, pueden necesitar menos memoria.

### Trocear el archivo


Hasta ahora siempre hemos leído todas las líneas de un fichero de una sola vez. 

P. ej., así calcularíamos la longitud y el valor medio de la columna `price` de un fichero completo:

In [None]:
df = pd.read_csv(fNormal)
print(f"Longitud total es {len(df)}")
print(f"Precio medio es {df.price.mean()/1000:.2f} k")

<br/>

Pero Pandas también nos permite leer `chunks`, "trozos", de un fichero, y, así, podríamos realizar el mismo proceso sobre trozos del fichero, iterativamente:

In [None]:
nlines = 1000
total_length = 0

iters = 0
with pd.read_csv(fNormal, chunksize=nlines) as reader:
    for chunk in reader:
        total_length += len(chunk)
        iters += 1

print(f"Número de iteraciones: {iters}")
print(f"Longitud total es {total_length}")

<br/>

De nuevo, si intentáramos cargar un archivo muy grande de una sola vez, tendríamos un error. Pero, en este caso, podemos obtener los mismos resultados usando la lectura por partes.

In [None]:
sumPrice = 0
nlines=50*1000

iters = 0
with pd.read_csv(fBig, chunksize=nlines) as reader:
    for chunk in reader:
        sumPrice += chunk.price.sum()
        iters += 1

In [None]:
print(f"Número de iteraciones: {iters}")
avgPrice = sumPrice/(iters*nlines)
print(f"Precio medio es {avgPrice/1000:.2f} k")

### Usar librerías como Polars

Librería para _DataFrames_ con énfasis en la velocidad de procesamiento. Está implementada en Rust y usa Apache Arrow para su modelo de memoria (columnar). Es una alternativa a Pandas, con un interfaz similar (aunque más parecido a `dplyr` de R), diseñado desde el principio para la computación paralela (multi-thread, en una máquina).

En primer lugar, vamos a realizar un cálculo con pandas, y repetirlo con Polars, para ver los tiempos de ejecución que consiguen.

In [None]:
%%time
# Pandas
df = pd.read_csv(fNormal)
res = df[df.price > 5000000]
avg = res['price'].mean()
print('Num expensive:', len(res))
print('Average:', avg)

<br/>
Ahora lo hacemos con Polars, y vemos que es más eficiente

In [None]:
import polars as pl

In [None]:
%%time
# Polars 
df = pl.read_csv(fNormal)
res = df.filter(pl.col('price') > 5000000)
avg = res['price'].mean()
print('Num expensive:', len(res))
print('Average:', avg)

<br/>
A continuación, vamos a intentar hacer lo mismo con el fichero de gran tamaño.

Como antes, no podemos hacerlo directamente con Pandas:

In [None]:
%%time
# Pandas
df = pd.read_csv(fBig)
res = df[df.price > 5000000]
avg = res['price'].mean()
print('Num expensive:', len(res))
print('Average:', avg)

In [None]:
getPythonMemory()

In [None]:
gc.collect()

In [None]:
getPythonMemory()

 <br/>
 Podemos recurrrir, como antes, a leerlo por trozos (de nuevo, con Pandas):

In [None]:
%%time
sumPrice = 0
nlines=50*1000
numExpensive = 0

iters = 0
with pd.read_csv(fBig, chunksize=nlines) as reader:
    for chunk in reader:
        res = chunk[chunk.price > 5000000]
        numExpensive += len(res)
        sumPrice += res.price.sum()
        iters += 1
        
print(f"Número de iteraciones: {iters}")
avgPrice = sumPrice/(iters*nlines)
print('Num expensive:', numExpensive)
print('Average:', sumPrice/numExpensive)

In [None]:
print('Average:', sumPrice/numExpensive)

Como vemos, esto nos obliga a complicar nuestro código, y además necesitamos mucho más tiempo de procesado.

<br/>

Con Polars, podemos usar el modo `streaming`, de manera más sencilla, y más eficiente (primero, liberamos memoria).

In [None]:
getPythonMemory()

In [None]:
del pd

In [None]:
gc.collect()

In [None]:
getPythonMemory()

In [None]:
import polars as pl

In [None]:
%%time

df = pl.scan_csv(fBig, low_memory=True)
res = df.filter(pl.col('price') > 5000000).collect(streaming=True)
avg = res['price'].mean()

print('Num expensive:', len(res))
print('Average:', avg)

<br/>

Sin embargo, hay que tener cuidado. Este código Polars es más eficiente en memoria, pero también tiene sus exigencias.

Si bajamos el límite más, llegaremos a desbordar la memoria disponible:

In [None]:
LOW_LIMIT = 3*1024*1024*1024  # 3 GB
resource.setrlimit(resource.RLIMIT_AS, (LOW_LIMIT, hard))

In [None]:
%%time

df = pl.scan_csv(fBig, low_memory=True)
res = df.filter(pl.col('price') > 5000000).collect(streaming=True)
avg = res['price'].mean()

print('Num expensive:', len(res))
print('Average:', avg)

### Usar un entorno distribuido como Dask
Usando `Dask` podemos dividir el trabajo de procesar un Dataframe entre varios _workers_, de manera que cada uno de ellos necesitará menos memoria. Los `workers` pueden correr en la misma máquina, o bien en un cluster distribuido, en la nube, etc.

Veremos el ejemplo con Dask en la siguiente sección.

# Dask

## Introducción

Dask es una librería para computación paralela y distribuida, que puede escalar el ecosistema existente de librerías de datos de Python. Además, Dask provee mecanismos para trabajar con datos de grandes dimensiones (_larger-than-memory_). Dask permite escalar desde una máquina a la nube.

Dask ofrece APIs para trabajar a alto o bajo nivel:

- Alto nivel: Dask ofrece _colecciones_, con interfaces similares a los de NumPy o Pandas, pero que pueden trabajar de forma paralela (rápida), y por partes (para soportar grandes conjuntos de datos). Las colecciones son `Array`, `Bag` y `DataFrame`.

- Bajo nivel: Si las colecciones de alto nivel no son apropiadas para un determinado problema (p.ej. no queremos procesar unos datos datos, sino realizar determinada computación), entonces podemos usar estructuras de bajo nivel como _Delayed_ o _Futures_ para conseguir computaciones distribuidas personalizadas.

Casi siempre que usemos Dask, estaremos utilizando un planificador de tareas distribuidas, en el contexto de un cluster Dask:

<img src="images/t10_dask_cluster.png" width="600"/>


## Dask scheduling

### Prefacio: Python GIL
TOOD: hablar sobre el GIL, y algunas maneras de evitar el problema

### Schedulers en Dask
TODO: completar con info desde https://docs.dask.org/en/stable/scheduling.html

## Dask Dataframes

Un Dataframe de Dask es una estructura distribuida compuesta por varios Dataframes de Pandas (separando por conjuntos de filas). Las diferentes partes de un Dataframe de Dask estarán repartidas entre varios _workers_ de Dask, que pueden residir en la misma máquina o en diferentes máquinas de un cluster.

<img src="images/t10_dask_dataframe.png" width="400"/>

Si el Dataframe de Dask está en una sola máquina pero es demasiado grande para ser cargado en memoria, Dask lo lee por partes desde el disco.

Nota: si un Dataframe cabe en memoria, entonces usar Pandas directamente puede ser más eficiente, salvo que se requiera un procesado muy pesado que se beneficie del paralelismo de los _workers_ de Dask.

In [None]:
# Si tenemos un cluster distribuido disponible (p.ej. lanzado desde la extensión Dask para Jupyter)
# podemos conectarnos a él, usando el siguiente código (con la IP y puerto correctos)
from dask.distributed import Client

port = "XXXX"
client = Client("tcp://127.0.0.1:"+port)
client

TODO: dar la opción de usar un scheduling local (multi-thread), para entornos donde no hay un cluster

In [None]:
import pandas as pd

In [None]:
fBig = "../data/BIG_house_data.csv"

In [None]:
%%time
df = pd.read_csv(fBig)

In [None]:
df.info()

In [None]:
df.head()

In [None]:
%%time
df2 = df[df.price > 1000000].groupby("zipcode").bedrooms.mean()

In [None]:
df2.head()

In [None]:
df2.shape

In [None]:
import dask.dataframe as dd

In [None]:
%%time
ddf =  dd.read_csv(fBig)

In [None]:
type(ddf)

In [None]:
ddf.info()

In [None]:
ddf.visualize()

In [None]:
%%time
ddf2 = ddf[ddf.price > 1000000].groupby("zipcode").bedrooms.mean()

In [None]:
type(ddf2)

In [None]:
ddf2.visualize()

In [None]:
%%time
res = ddf2.compute()

In [None]:
res.shape

In [None]:
res.head()

Ahora limitamos la memoria y comprobamos que Dask sigue pudiendo realizar la operación

In [None]:
import resource
limit = 1*1024*1024*1024  # 1 GB !
resource.setrlimit(resource.RLIMIT_AS, (limit, -1))

In [None]:
%%time
# Lo hacemos de nuevo con Dask
ddf =  dd.read_csv(fBig)
ddf2 = ddf[ddf.price > 1000000].groupby("zipcode").bedrooms.mean()
res = ddf2.compute()

In [None]:
res.shape

In [None]:
res.head()

In [None]:
%%time
# Hacerlo con pandas directamente, ahora no es posible
df = pd.read_csv(fBig)
df2 = df[df.price > 1000000].groupby("zipcode").bedrooms.mean()

In [None]:
import resource
resource.setrlimit(resource.RLIMIT_AS, (-1, -1))

## Dask Delayed

Como hemos comentado, con Dask _Delayed_ podemos paralelizar código arbitrario, sin usar una colección Dask de alto nivel.

Partamos de un ejemplo no paralelizado (incluimos un _sleep_ para simular una computación costosa).

In [None]:
from time import sleep

def calc(x):
    sleep(1)
    return x + 1

def merge(vals):
    return sum(vals)

In [None]:
%%time
# Esta función tardará 1 segundo por cada argumento incluido

vals = [5, 10, 2, 7, 9]
res = []
for val in vals:
    res.append(calc(val))
merge(res)

Vamos ahora a hacer lo mismo con Dask Delayed.

In [None]:
import dask

@dask.delayed
def calc(x):
    sleep(1)
    return x + 1

@dask.delayed
def merge(vals):
    return sum(vals)

El código siguiente es inmediato, porque solamente construye el gráfico de operaciones

In [None]:
%%time

vals = [5, 10, 2, 7, 9]
res = []
for val in vals:
    res.append(calc(val))
z = merge(res)

In [None]:
z.visualize()

In [None]:
type(z)

Ahora realicemos la verdadera computación (en paralelo!)

In [None]:
%%time
# Ahora 
z.compute()

Como todas las operaciones `calc` se han realizado a la vez, el tiempo total ha sido de solo 1 segundo.

</b>

Para acabar, cerraremos el cliente Dask.

In [None]:
client.close()