In [None]:
#!pip install dask[complete] dask-ml pandas scikit-learn --user
#import os 
#!pip install --upgrade numpy pandas --user

In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask_ml.model_selection as dml
from dask_ml.model_selection import train_test_split

import dask_ml.metrics as dmm
from dask_ml.linear_model import LogisticRegression
from dask.distributed import Client


  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


# Trabajo grupal 2

## Parte I

**1. Relación entre Dask y el Método de Foster**

Dask, una biblioteca de Python para la computación en paralelo, se relaciona con el método de Foster al estructurar y descomponer tareas complejas en sub-tareas manejables. El método de Foster consta de varias etapas: particionamiento, comunicación, agregación y mapeo. Estas etapas encuentran un paralelo en cómo Dask maneja las tareas mediante su motor de ejecución de grafos.
 
Por ejemplo, cuando utilizamos Dask para procesar un gran dataframe, este se divide en múltiples dataframes más pequeños (particionamiento). Luego, Dask optimiza la distribución de estas tareas a través de la red de hardware disponible, minimizando la sobrecarga de comunicación. Además, Dask ejecuta las sub-tareas de manera eficiente y paralela (agregación y mapeo). Un ejemplo práctico sería la carga y análisis de un gran conjunto de datos, donde Dask divide el dataframe y asigna cada parte a diferentes núcleos o nodos, realizando operaciones en paralelo y reuniendo los resultados finales.

**2. Ineficiencias de la paralelización**

La paralelización puede volverse ineficiente en ciertas circunstancias, principalmente debido a la sobrecarga de comunicación y la granularidad de las tareas. Permíteme explicarte:

- **Sobrecarga de comunicación:** Cuando las tareas requieren una alta comunicación entre procesos, como en algoritmos con fuertes dependencias de datos entre los nodos, la sobrecarga de comunicación puede superar los beneficios de la paralelización. Por ejemplo, en un algoritmo de búsqueda de camino mínimo, si cada nodo debe comunicarse frecuentemente con otros, la latencia y el tiempo de transmisión de datos pueden hacer que la paralelización sea ineficiente.
- **Granularidad de las tareas:** Si las tareas son demasiado pequeñas (granularidad fina), el tiempo dedicado a administrar las tareas y coordinar entre núcleos puede ser mayor que el tiempo de ejecución de las propias tareas. Un ejemplo ilustrativo es la paralelización de operaciones simples, como la suma de una pequeña lista de números. En este caso, el costo de distribuir las tareas y reunir los resultados puede ser mayor que simplemente realizar la operación en serie

**3. Good data vs Big data**

El artículo de Desouza y Smith (2014) en la Stanford Social Innovation Review resalta la promesa y los desafíos del big data para la innovación social. Por otro lado, Andrew Ng en su presentación en el Data+AI Summit 2022 destacó la importancia de los datos de calidad ("Good Data") para el éxito de los proyectos de inteligencia artificial. La discusión sobre si "Good Data" reemplazará al "Big Data" es crucial.

¿Uno reemplazará al otro?

En base al articulo y a la conferencia, podemos deducir que es poco probable que la "Good Data" reemplace a la "Big Data". Ambos conceptos abordan necesidades **distintas y complementarias**. La "Big Data" se refiere a la capacidad de manejar grandes volúmenes de datos variados y rápidos, esenciales para descubrir patrones y tendencias a gran escala. En contraste, "Good Data" enfatiza la calidad, precisión y relevancia de los datos, cruciales para la toma de decisiones informadas y la creación de modelos de IA efectivos. Una manera de resaltar el porqué no son reemplazables es colocarnos en el caso hipotético en el que tenemos un volumen de datos gigante pero que es difuso y no tiene un etiquetado correcto (no seríamos eficientes). 

¿Se pueden complementar y cómo?
Sí, "Good Data" y "Big Data" pueden y deben complementarse. Lo mejor de la Big Data proporciona es **la amplitud y diversidad** necesarias para captar una visión completa de fenómenos complejos. Por otro lado, la Good Data asegura que las decisiones y modelos derivados sean **precisos y confiables**. Integrar ambos enfoques puede maximizar el valor de los datos, logrando resultados más robustos y significativos.

Ejemplos en el contexto peruano:

- Salud Pública: En Perú, el uso de Big Data para monitorear la propagación de enfermedades como el dengue puede ser muy efectivo. Sin embargo, la calidad de los datos recopilados es vital. Usar Good Data en la validación de fuentes y limpieza de datos asegura que las políticas de salud basadas en estos análisis sean precisas y efectivas. Por ejemplo, no nos serviría mucho tener millones de testeos de dengue con pruebas poco precisas; sería mejor optimizar los recursos y basarnos en menso pruebas que sean más precisas.

- Educación: Big Data puede ayudar a identificar tendencias en el rendimiento académico a nivel nacional. Complementarlo con Good Data, recolectando información precisa y relevante sobre factores socioeconómicos y métodos de enseñanza, permite desarrollar intervenciones educativas más acertadas y personalizadas. Por ejemplo, sería ideal tener datos de las notas de todos los estudiantes del Perú; sin embargo, si no concoemos cómo se generaron estas (las evaluaciones), no tendríamos medidas precisas que reflejen el desepeño de los estudiantes.

En conclusión, mientras que **Big Data proporciona la amplitud necesaria para capturar la complejidad de problemas sociales**, **Good Data garantiza que las soluciones derivadas sean precisas y efectivas**. Integrar ambos enfoques es esencial para abordar problemas complejos de manera holística y eficiente

**4. Dask en contextos reales** 

Dask puede ser extremadamente útil para acelerar el procesamiento y análisis de registros de datos. Dask permite manejar grandes volúmenes de datos dividiéndolos en partes más pequeñas y procesándolos en paralelo, lo que es crucial para manejar eficientemente datos a gran escala.

**Ejemplo:** Imaginemos que la empresa en la que trabajmos necesita analizar las ventas por región para identificar patrones de comportamiento de los clientes y optimizar las estrategias de marketing. Los datos de ventas incluyen millones de registros con información como ID de venta, fecha, región, producto, categoría , cantidad vendida y precio. Sigamos los siguientes pasos:


1. Cargar datos en paralelo: Utilizando dask.dataframe.read_csv, se pueden cargar múltiples archivos CSV en paralelo, lo que acelera significativamente el proceso de carga comparado con pandas.

 ```python
   import dask.dataframe as dd

   # Cargar datos de ventas en paralelo
   df = dd.read_csv('ruta_a_data_ventas/data.csv')
 ```   
 

2. Transformación de datos: Dask permite realizar operaciones de transformación, como agrupar y sumar ventas por región, utilizando una sintaxis similar a pandas, pero ejecutándose en paralelo.

 ```python
sales_by_region = df.groupby('region')['sales_amount'].sum().compute()
 ```

3. Manejo de datos que no caben en memoria: Dask puede manejar datasets que no caben en la memoria dividiéndolos en partes más pequeñas y procesándolos en discos duros. Esto es especialmente útil para grandes volúmenes de datos de ventas.

4. Visualización y análisis en tiempo real: Usando el dashboard de Dask, los analistas pueden monitorear el progreso del procesamiento y ajustar las tareas en tiempo real.

**Ventajas de Dask:**

- **Paralelización**: Dask descompone el trabajo en tareas más pequeñas que se ejecutan simultáneamente, maximizando el uso de los recursos de CPU y memoria disponibles, hacemos un uso eficiente de nuestros recursos. Esto reduce considerablemente el tiempo de procesamiento en comparación con el enfoque secuencial de pandas. Lo cual es importante en contextos donde necesitamos los datos ASAP.
- **Escalabilidad**: Dask puede adaptarse desde una sola laptop hasta un clúster de computadoras, permitiendo manejar conjuntos de datos que van desde gigabytes hasta terabytes sin necesidad de modificar el código.
- **Interoperabilidad**: Dask se integra de manera fluida con otras bibliotecas de Python como NumPy y pandas, permitiendo a los desarrolladores aprovechar sus herramientas y habilidades existentes sin tener que aprender una nueva sintaxis. Esto ayudaría un monton pues la curva de aprendizaje de esta sintaxis sería muy beneficiosa en comparacion a otro método.
- **Elasticidad y tolerancia a fallos**: Dask ajusta dinámicamente la distribución de tareas según los recursos de hardware disponibles y maneja fallos en los nodos sin interrumpir el procesamiento general.


In [2]:
# Cargar los datos con Dask
df = pd.read_csv('..\data\ENPOVE2022_V_200-300-400-500-600-700-800.csv')
df

Unnamed: 0,CCDD,DEPARTAMENTO,CCPP,PROVINCIA,CCDI,DISTRITO,CIUDAD,CONGLOMERADO,NSELV,VIVIENDA,...,P805_3,P805_4,P805_5,P805_6,P805_7,P805_8,P805_9,P806,P807,factorfinal
0,2,ANCASH,18,SANTA,1,CHIMBOTE,Chimbote,1206802,9340,3,...,,,,,,,,1,1,13.175820
1,2,ANCASH,18,SANTA,1,CHIMBOTE,Chimbote,1206802,9340,3,...,,,,,,,,,,13.175820
2,2,ANCASH,18,SANTA,1,CHIMBOTE,Chimbote,1206802,9340,3,...,,,,,,,,,,13.175820
3,2,ANCASH,18,SANTA,1,CHIMBOTE,Chimbote,1206802,9340,3,...,,,,,,,,,,13.175820
4,2,ANCASH,18,SANTA,1,CHIMBOTE,Chimbote,1206802,9340,3,...,,,,,,,,1,2,13.175820
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12482,24,TUMBES,1,TUMBES,1,TUMBES,Tumbes,44789,396,13,...,,,,,,,,3,2,12.281439
12483,24,TUMBES,1,TUMBES,1,TUMBES,Tumbes,44789,397,14,...,,,,,,,,,,12.281439
12484,24,TUMBES,1,TUMBES,1,TUMBES,Tumbes,44789,397,14,...,,,,,,,,,,12.281439
12485,24,TUMBES,1,TUMBES,1,TUMBES,Tumbes,44789,397,14,...,,,,,,,,,,12.281439


In [3]:
# Definir las variables
variables_a_excluir = [
    'CCDD', 'DEPARTAMENTO', 'CCPP', 'PROVINCIA', 'CCDI', 'DISTRITO',
    'CIUDAD', 'CONGLOMERADO', 'NSELV', 'VIVIENDA', 'THOGAR', 'NHOGAR',
    'ESTRATO', 'VRESFIN', 'RESFIN', 'P15', 'P15_N', 'INF_200', 'P200_N',
    'P609_COD', 'P611_COD', 'P625A_COD_DEPA', 'P625_COD_PROV', 'P625_COD_DIST',
    'factorfinal', 'P602'
]

# Obtener la lista de todas las columnas
todas_las_columnas = df.columns.tolist()

# Determinar las covariables (X) excluyendo las especificadas
columnas_covariables = [col for col in todas_las_columnas if col not in variables_a_excluir]

# Definir predictores (X) y target (y)
X = df[columnas_covariables]
y = df['P602']  # Empleo


# Configuración del cliente Dask
Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')

# Particionamos los datos de antemano
X = dd.from_pandas(X, npartitions=10)
y = dd.from_pandas(y, npartitions=10)


# División de datos en entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    random_state=4200, 
    shuffle=True  # Explicitly set shuffle
)

# Definir el modelo
model = LogisticRegression()
# Definir el grid para GridSearch
param_grid = {
    'penalty': ['l1', 'l2'],
    'C': [0.1, 1, 10]
}

# Configurar GridSearchCV
grid_search = dml.GridSearchCV(model, param_grid, cv=5, scoring='accuracy')

# Ajustar el modelo usando GridSearchCV
grid_search.fit(X_train, y_train)

# Selección del modelo óptimo
best_model = grid_search.best_estimator_

# Evaluación en el conjunto de entrenamiento
y_train_pred = best_model.predict(X_train)
train_accuracy = dmm.accuracy_score(y_train, y_train_pred)
train_roc_auc = dmm.roc_auc_score(y_train, best_model.predict_proba(X_train)[:, 1])

# Evaluación en el conjunto de prueba
y_test_pred = best_model.predict(X_test)
test_accuracy = dmm.accuracy_score(y_test, y_test_pred)
test_roc_auc = dmm.roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])

# Imprimir resultados
print("Mejores parámetros encontrados:")
print(grid_search.best_params_)

print("\nIndicadores de calidad de ajuste en entrenamiento:")
print(f"Accuracy: {train_accuracy}")
print(f"ROC AUC: {train_roc_auc}")

print("\nIndicadores de calidad de ajuste en prueba:")
print(f"Accuracy: {test_accuracy}")
print(f"ROC AUC: {test_roc_auc}")

# Reporte de clasificación para la muestra de prueba
from dask_ml.metrics import classification_report
print("\nReporte de clasificación en prueba:")
print(classification_report(y_test, y_test_pred))
client.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 53170 instead
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "C:\Users\DDelgado\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3457, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\DDelgado\AppData\Local\Temp\ipykernel_13868\1084635565.py", line 49, in <module>
    grid_search.fit(X_train, y_train)
  File "C:\Users\DDelgado\AppData\Roaming\Python\Python39\site-packages\dask_ml\model_selection\_search.py", line 1272, in fit
    for batch in ac.batches():
  File "C:\Users\DDelgado\anaconda3\lib\site-packages\distributed\client.py", line 5967, in batches
    yield self.next_batch(block=True)
  File "C:\Users\DDelgado\anaconda3\lib\site-packages\distributed\client.py", line 5939, in next_batch
    batch = [next(self)]
  File "C:\Users\DDelgado\anaconda3\lib\site-packages\distributed\client.py", line 5895, in __next__
    self.thread_condition.wait(timeout=0.100)
  File "C:\Users\DDelgado\anaconda3\lib\threading.py", line 316, in wait
    gotit = waiter.

TypeError: object of type 'NoneType' has no len()

## Limitaciones y posibles extensiones
1. El ajuste del modelo puede no ser óptimo si el grid de búsqueda es pequeño
2. Los indicadores de calidad de ajuste dependen de la calidad de los datos.
3. Se podría usar una mayor variedad de modelos y técnicas para comparación.
4. La escalabilidad puede ser un problema con datasets extremadamente grandes.