# Tarea Dask

## Instrucciones generales
- Esta tarea debe realizarse de manera individual
- Este notebook (resuelto) debe ser subido al github del proyecto en la carpeta de tareas (creen una carpeta dentro de esa carpeta y agreguen su notebook reuelto)
- Fecha límite: Lunes 25 de noviembre de 2024 a las 11:59 p.m
- Deben realizar las cuatro secciones
- Puedes agregar tantas celdas de código y explicaciones como veas necesario, solo manten la estructura general

## Sección 0 Creación y Configuración del cliente de Dask
Ejercicio 0: Configuración del cliente
1. Crea un cliente local de Dask que inicie un clúster en tu máquina.
2. Configura el cliente para que tenga las siguientes características (elige un par de las opciones de trabajadores e hilos):
    - Número de trabajadores: 2 / 4
    - Memoria máxima por trabajador: 1GB
    - Threads por trabajador: 4 / 2
3. Verifica que el cliente esté funcionando correctamente mostrando:
    - Resumen de los trabajadores activos.
    - Dashboard disponible (URL del panel de control de Dask).
    * Tip: Checa los parámetros del cliente que creeaste.

*Nota*: Puedes hacer que corra en el puerto que desees.

In [1]:
import numpy as np
import pandas as pd
import dask
from dask import delayed
from dask.distributed import Client
from dask import visualize
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client, wait
import time

In [8]:
client = Client(n_workers=4, threads_per_worker=2, memory_limit='1GB')
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42687 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:42687/status,

0,1
Dashboard: http://127.0.0.1:42687/status,Workers: 4
Total threads: 8,Total memory: 3.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35277,Workers: 4
Dashboard: http://127.0.0.1:42687/status,Total threads: 8
Started: Just now,Total memory: 3.73 GiB

0,1
Comm: tcp://127.0.0.1:42595,Total threads: 2
Dashboard: http://127.0.0.1:32929/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:35209,
Local directory: /tmp/dask-scratch-space/worker-5q9sf9lm,Local directory: /tmp/dask-scratch-space/worker-5q9sf9lm

0,1
Comm: tcp://127.0.0.1:41515,Total threads: 2
Dashboard: http://127.0.0.1:41499/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:46567,
Local directory: /tmp/dask-scratch-space/worker-h0wkayri,Local directory: /tmp/dask-scratch-space/worker-h0wkayri

0,1
Comm: tcp://127.0.0.1:46185,Total threads: 2
Dashboard: http://127.0.0.1:33915/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:38753,
Local directory: /tmp/dask-scratch-space/worker-nbx9ykzd,Local directory: /tmp/dask-scratch-space/worker-nbx9ykzd

0,1
Comm: tcp://127.0.0.1:33489,Total threads: 2
Dashboard: http://127.0.0.1:38741/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:34097,
Local directory: /tmp/dask-scratch-space/worker-wvr56xa3,Local directory: /tmp/dask-scratch-space/worker-wvr56xa3


## Sección 1 Delayed
Ejercicio 1: Procesamiento de datos 

1. Genera datos simulados (por ejemplo, ventas diarias) para 10 sucursales durante 365 días.
    - Cada sucursal debe tener datos generados aleatoriamente para "Ingresos" y "Costos".
    - Utiliza una función para generar los datos simulados.
2. Usa Dask Delayed para calcular:
    - Las ganancias diarias por sucursal.
    - La sucursal con mayor ganancia promedio.
3. Genera un grafo de tareas que visualice estas operaciones y explica por qué elegiste paralelizar de esa forma, genera una visualización del grafo.

In [None]:

def generate_branch_data(branch_id, days=365):
    np.random.seed(branch_id)  
    ingresos = np.random.randint(1000, 5000, size=days)  
    costos = np.random.randint(500, 3000, size=days)     
    return pd.DataFrame({
        "Sucursal": branch_id,
        "Día": range(1, days + 1),
        "Ingresos": ingresos,
        "Costos": costos
    })

# Crear datos para 10 sucursales 
branches = []
for branch_id in range(10):
    branches.append(delayed(generate_branch_data)(branch_id))


data = delayed(pd.concat)(branches)

# Calcular ganancias diarias por sucursal
@delayed
def calculate_daily_profits(df):
    df["Ganancias"] = df["Ingresos"] - df["Costos"]
    return df


data_with_profits = calculate_daily_profits(data)

#Encontrar la sucursal con mayor ganancia promedio
@delayed
def find_best_branch(df):
    branch_avg_profits = df.groupby("Sucursal")["Ganancias"].mean()
    best_branch = branch_avg_profits.idxmax()
    return best_branch, branch_avg_profits[best_branch]

# Identificar la mejor sucursal
best_branch = find_best_branch(data_with_profits)


dask.visualize(best_branch, filename="task_graph", format="png")


final_data, (best_branch_id, best_avg_profit) = dask.compute(data_with_profits, best_branch)

# Resultados
print("Sucursal con mayor ganancia promedio:")
print(f"Sucursal: {best_branch_id}, Ganancia promedio: {best_avg_profit}")


ExecutableNotFound: failed to execute PosixPath('dot'), make sure the Graphviz executables are on your systems' PATH

## Sección 2 Dask Dataframes
Ejercicio 2: Limpieza y análisis de datos reales

1. Descarga un conjunto de datos masivo (puedes usar la colección de *nycflights* que se encuentra en `data/nycflights/`).
2. Carga los datos en un Dask DataFrame. 
    - Elige adecuadamente el número de particiones (que quepan en memoria de los `workers`)
3. Realiza las siguientes tareas:
    - Limpia los valores faltantes en las columnas `ArrDelay` y `DepDelay`, rellenándolos con la mediana de cada columna.
    - Calcula el retraso promedio (`DepDelay`) por mes y aerolínea.
    - Encuentra el aeropuerto de origen con más vuelos retrasados.

*Nota*: **Evita** convertir el DataFrame a pandas e **intenta** realizar `.compute()` solo cuando sea necesario.

In [None]:

data_path = "data/nycflights/"  #
files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith(".csv")]


df = dd.read_csv(files, assume_missing=True)


# Calcular la mediana para cada columna 
# Calcular la mediana aproximada con el percentil 50%
arr_delay_median = df['ArrDelay'].quantile(0.5).compute()  # Mediana aproximada
dep_delay_median = df['DepDelay'].quantile(0.5).compute()

# Rellenar valores faltantes
df = df.fillna({'ArrDelay': arr_delay_median, 'DepDelay': dep_delay_median})


# Retraso promedio 
avg_delay = df.groupby(['Month', 'UniqueCarrier'])['DepDelay'].mean().compute()
print("Retraso promedio (DepDelay) por mes y aerolínea:")
print(avg_delay)

# Aeropuerto de origen con más vuelos retrasados
# Un vuelo se considera retrasado si DepDelay > 0
delayed_flights = df[df['DepDelay'] > 0]
origin_airport = delayed_flights.groupby('Origin').size().idxmax().compute()
print(f"Aeropuerto de origen con más vuelos retrasados: {origin_airport}")


NameError: name 'os' is not defined

## Sección 3 Dask Arrays

Ejercicio 3: Procesamiento numérico avanzado

1. Crea un arreglo de 10,000 x 10,000 con valores aleatorios usando Dask Array, utiliza un tamaño de chunks adecuado, ¿es mejor que sean cuadrados?.
2. Realiza las siguientes operaciones:
    - Calcula la suma de cada fila.
    - Encuentra la fila con el valor máximo promedio.
    - Multiplica todo el arreglo por un factor escalar (por ejemplo, 2.5).
3. Divide el arreglo nuevamente en 100 bloques y compara la rapidez.

In [None]:
shape = (10000, 10000)
chunk_size = (1000, 1000) 
arr = da.random.random(shape, chunks=chunk_size)


row_sums = arr.sum(axis=1)

# Encontrar la fila con el valor máximo promedio
row_means = row_sums / arr.shape[1]
max_row_index = row_means.argmax().compute()  
max_row_mean = row_means[max_row_index].compute()  

#Multiplicar todo el arreglo por un factor escalar 
scaled_array = arr * 2.5

# Re-dividir el arreglo en 100 bloques y comparar la rapidez
# Nuevo tamaño de chunk para dividir en 100 bloques
new_chunks = (arr.shape[0] // 10, arr.shape[1] // 10) 
rechunked_array = scaled_array.rechunk(new_chunks)

# Comparar tiempos 
print("Tiempo para procesar con el nuevo chunk:")
%time rechunked_array.sum().compute()

# Mostrar resultados
print(f"Índice de la fila con el promedio máximo: {max_row_index}")
print(f"Promedio máximo: {max_row_mean}")


Tiempo para procesar con el nuevo chunk:
CPU times: user 585 ms, sys: 44.1 ms, total: 629 ms
Wall time: 809 ms
Índice de la fila con el promedio máximo: 5768
Promedio máximo: 0.5109867090046055


## Sección 4 Futures
Ejercicio 4: Distribución de tareas dinámicas

1. Implementa una función que calcule la raíz cuadrada de una lista de 100,000 números enteros generados aleatoriamente.
2. Divide la lista en 10 partes iguales y usa Dask Futures para calcular la raíz cuadrada de cada parte en paralelo.
3. Recolecta los resultados y calcula:
    - El promedio de todos los números procesados.
    - El tiempo total de ejecución (incluyendo envío y recolección de tareas).
4. Observa como se distribuye la carga en el cliente.

*Nota*: en los ejercicios ya vimos como determinar si ya se cumplío una tarea.

In [None]:

data = np.random.randint(1, 1000000, size=100000)  

# Dividir la lista en 10 partes iguales
n_parts = 10
split_data = np.array_split(data, n_parts)


def compute_sqrt(numbers):
    return np.sqrt(numbers)

# Enviar tareas usando Futures
start_time = time.time()

futures = [client.submit(compute_sqrt, part) for part in split_data]


results = client.gather(futures)
end_time = time.time()

# Calcular el promedio de todos los números procesados
all_results = np.concatenate(results)
mean_result = np.mean(all_results)

# Calcular el tiempo total de ejecución
execution_time = end_time - start_time

#Mostrar resultados
print(f"Promedio de los números procesados: {mean_result}")
print(f"Tiempo total de ejecución (incluyendo envío y recolección): {execution_time:.2f} segundos")

Promedio de los números procesados: 665.5510933901787
Tiempo total de ejecución (incluyendo envío y recolección): 0.12 segundos
