### Desarrollo de algoritmos de ensamblado

## Implementación de paralelismo con Dask

## **Conjunto de datos sobre diabetes**
Este conjunto de datos proviene del Instituto Nacional de Diabetes y
Enfermedades Digestivas y Renales. El objetivo del conjunto de datos es predecir de forma diagnóstica si un paciente tiene diabetes,
basándose en ciertas mediciones de diagnóstico incluidas en el conjunto de datos.
En particular, todos los pacientes aquí son mujeres de al menos 21 años de edad de ascendencia india

Información sobre los atributos del conjunto de datos -

Embarazos: Para expresar el Número de embarazos

Glucosa: Para expresar el nivel de glucosa en sangre.

BloodPressure: Para expresar la medición de la presión arterial.

SkinThickness: Para expresar el grosor de la piel.

Insulina: Para expresar el nivel de insulina en sangre.

IMC: Para expresar el índice de masa corporal.

DiabetesPedigreeFunción: Expresar el porcentaje de Diabetes.

Edad: Para expresar la edad.

Resultado: Para expresar el resultado final 1 es Sí y 0 es No

https://www.kaggle.com/datasets/akshaydattatraykhare/diabetes-dataset

In [25]:
!pip install xgboost



In [46]:
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import make_scorer
from sklearn.model_selection import RandomizedSearchCV
import pandas as pd
import numpy as np
import time
import psutil
import cProfile
import pstats

In [28]:
#Revisamos el contenido de la data
import pandas as pd
data2=pd.read_csv('diabetes.csv')
data2.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [29]:
from sklearn.model_selection import train_test_split
X = data2[['Glucose','BMI','Age']]  # Seleccionamos las características adecuadas
y = data2['Outcome']
import time
# Medir el tiempo de ejecución
start_time0 = time.time()

# Divide tus datos en conjuntos de entrenamiento y prueba, utilizamos stratify para distribuir bien la cantidades de datos para las clases
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42,stratify=y)

# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

Tiempo de ejecución: 0.05 segundos


In [30]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((614, 3), (614,), (154, 3), (154,))

In [32]:
# Crear el clasificador de XGBoost
xgb_classifier = xgb.XGBClassifier(objective="binary:logistic", random_state=42)


# Medir el tiempo de ejecución
start_time0 = time.time()
# Entrenar el modelo
xgb_classifier.fit(X_train, y_train)

# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

# Predicción en el conjunto de prueba
predictions_xgb = xgb_classifier.predict(X_test)
# Uso de recursos
print(f'CPU usada: {psutil.cpu_percent()}%')
print(f'Memory usada: {psutil.virtual_memory().percent}%')

Tiempo de ejecución: 0.12 segundos
CPU usada: 49.6%
Memory usada: 19.3%


**Métricas**

In [33]:
accuracy = accuracy_score(y_test, predictions_xgb)
accuracy

0.7532467532467533

**Buscar buenos parametros en paralelo con sklearn GridSearchCV**

In [41]:
parametros = {
    'max_depth': [3, 4, 5],
    'eta': [ 1, 0.1, 0.2],
    'subsample': [0.5, 0.7, 1.0],
    'objective': ['binary:logistic']
}
scoring = {
    'accuracy': make_scorer(accuracy_score)
}
grid_search = GridSearchCV(
estimator=xgb.XGBClassifier(),
    param_grid=parametros,
scoring=scoring,  # Métrica de evaluación
    refit='accuracy',
    return_train_score=True,
    n_jobs=-1
)

**n_jobs=-1:** Utiliza todos los procesadores disponibles para realizar la búsqueda de hiperparámetros en paralelo, lo cual puede acelerar significativamente el proceso.

In [42]:
%%time
grid_search.fit(X_train, y_train)

CPU times: user 2.99 s, sys: 605 ms, total: 3.59 s
Wall time: 13.9 s


In [43]:
grid_search.best_params_

{'eta': 1, 'max_depth': 3, 'objective': 'binary:logistic', 'subsample': 0.5}

Cambiamos los valores de los parametros según los calculado

In [44]:
# Crear el clasificador de XGBoost
xgb_classifier = xgb.XGBClassifier(objective="binary:logistic", eta=1, max_depth=3,subsample=0.5, random_state=42)


# Medir el tiempo de ejecución
start_time0 = time.time()
# Entrenar el modelo
xgb_classifier.fit(X_train, y_train)

# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

# Predicción en el conjunto de prueba
predictions_xgb = xgb_classifier.predict(X_test)
# Uso de recursos
print(f'CPU usada: {psutil.cpu_percent()}%')
print(f'Memory usada: {psutil.virtual_memory().percent}%')

Tiempo de ejecución: 0.13 segundos
CPU usada: 66.7%
Memory usada: 21.3%


In [45]:
accuracy = accuracy_score(y_test, predictions_xgb)
accuracy

0.7272727272727273

**Buscar buenos parametros en paralelo con sklearn RandomizedSearchCV**

In [50]:
parametros = {
    'max_depth': [3, 4, 5],
    'eta': [ 1, 0.1, 0.2],
    'subsample': [0.5, 0.7, 1.0],
    'objective': ['binary:logistic']
}
scoring = {
    'accuracy': make_scorer(accuracy_score)
}
ran_search =RandomizedSearchCV(
estimator=xgb.XGBClassifier(),
    param_distributions=parametros,
scoring=scoring,  # Métrica de evaluación
    refit='accuracy',
    return_train_score=True,
    n_jobs=-1
)

In [51]:
%%time
ran_search.fit(X_train, y_train)

CPU times: user 2.08 s, sys: 381 ms, total: 2.46 s
Wall time: 10 s


In [53]:
ran_search.best_params_

{'subsample': 0.5, 'objective': 'binary:logistic', 'max_depth': 3, 'eta': 0.1}

In [56]:
# Crear el clasificador de XGBoost
xgb_classifier = xgb.XGBClassifier(objective="binary:logistic", eta=0.1, max_depth=3,subsample=0.5, random_state=42)


# Medir el tiempo de ejecución
start_time0 = time.time()
# Entrenar el modelo
xgb_classifier.fit(X_train, y_train)

# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

# Predicción en el conjunto de prueba
predictions_xgb1 = xgb_classifier.predict(X_test)
# Uso de recursos
print(f'CPU usada: {psutil.cpu_percent()}%')
print(f'Memory usada: {psutil.virtual_memory().percent}%')

Tiempo de ejecución: 0.17 segundos
CPU usada: 75.4%
Memory usada: 22.7%


In [57]:
accuracy = accuracy_score(y_test, predictions_xgb1)
accuracy

0.7272727272727273

## **Dask**

In [None]:
pip install dask

In [None]:
pip install dask_ml

In [None]:
pip install dask_xgboost

In [61]:
pip install xgboost



In [68]:
from dask.distributed import Client
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
import time
import dask
import xgboost
import dask_xgboost
import psutil
import numpy as np
from sklearn.metrics import accuracy_score
from dask_ml.xgboost import XGBClassifier
from dask_ml.model_selection import GridSearchCV, RandomizedSearchCV

In [63]:
#Configurar cliente Dask
client = Client(n_workers=4, threads_per_worker=2)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33431 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:43947
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:33431/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45881'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46613'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38913'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46827'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:35789', name: 2, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35789
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:36058
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:38245', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.schedul

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:33431/status,

0,1
Dashboard: http://127.0.0.1:33431/status,Workers: 4
Total threads: 8,Total memory: 12.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43947,Workers: 4
Dashboard: http://127.0.0.1:33431/status,Total threads: 8
Started: Just now,Total memory: 12.67 GiB

0,1
Comm: tcp://127.0.0.1:45967,Total threads: 2
Dashboard: http://127.0.0.1:39957/status,Memory: 3.17 GiB
Nanny: tcp://127.0.0.1:45881,
Local directory: /tmp/dask-scratch-space/worker-kcrkddxx,Local directory: /tmp/dask-scratch-space/worker-kcrkddxx

0,1
Comm: tcp://127.0.0.1:38245,Total threads: 2
Dashboard: http://127.0.0.1:43653/status,Memory: 3.17 GiB
Nanny: tcp://127.0.0.1:46613,
Local directory: /tmp/dask-scratch-space/worker-p77eywzb,Local directory: /tmp/dask-scratch-space/worker-p77eywzb

0,1
Comm: tcp://127.0.0.1:35789,Total threads: 2
Dashboard: http://127.0.0.1:35541/status,Memory: 3.17 GiB
Nanny: tcp://127.0.0.1:38913,
Local directory: /tmp/dask-scratch-space/worker-hdusgatc,Local directory: /tmp/dask-scratch-space/worker-hdusgatc

0,1
Comm: tcp://127.0.0.1:40067,Total threads: 2
Dashboard: http://127.0.0.1:44089/status,Memory: 3.17 GiB
Nanny: tcp://127.0.0.1:46827,
Local directory: /tmp/dask-scratch-space/worker-qdi8q54r,Local directory: /tmp/dask-scratch-space/worker-qdi8q54r


In [64]:
#Revisamos el contenido de la data
data1 = dd.read_csv('/content/diabetes.csv')
data1.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


#### Predice si tiene diabetes sabiendo su nivel de glucosa,  índice de masa corporal y edad

In [65]:
X = data1[['Glucose','BMI','Age']]  # Seleccionamos las características adecuadas
y = data1['Outcome']

# Medir el tiempo de ejecución
start_time0 = time.time()

# Divide tus datos en conjuntos de entrenamiento y prueba, utilizamos stratify para distribuir bien la cantidades de datos para las clases
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True,)#,stratify=y


# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

Tiempo de ejecución: 0.04 segundos


In [66]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((Delayed('int-e81bf793-e3d0-4498-985f-c951ab3d2945'), 3),
 (dd.Scalar<size-ag..., dtype=int64>,),
 (Delayed('int-dbd2a2e1-63cc-49da-b34a-b20824b7d8c6'), 3),
 (dd.Scalar<size-ag..., dtype=int64>,))

In [67]:
y_train.compute()

128    1
355    1
591    0
97     0
186    1
      ..
345    0
448    1
75     0
532    0
429    1
Name: Outcome, Length: 619, dtype: int64

### **XGBoost:**

## **Con Dask**

min_child_weight: Peso mínimo requerido en una hoja para continuar dividiendo, afecta la simplicidad del modelo.

Entrenamiento del modelo

In [76]:
parametros = {'objective': 'binary:logistic',
          'eta': 1, 'max_depth': 3, 'subsample': 0.5}

# Medir tiempo y uso de recursos antes del entrenamiento
start_time0 = time.time()
cpu_usage_before = psutil.cpu_percent(interval=1)
memory_usage_before = psutil.virtual_memory().percent


xg = dask_xgboost.train(client, parametros, X_train, y_train, num_boost_round=10)


# Medir tiempo y uso de recursos después del entrenamiento
end_time = time.time()
execution_time = end_time - start_time0
cpu_usage_after = psutil.cpu_percent(interval=1)
memory_usage_after = psutil.virtual_memory().percent
print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))
print(f'CPU antes del entrenamiento: {cpu_usage_before}%')
print(f'CPU después del entrenamiento: {cpu_usage_after}%')
print(f'Memoria antes del entrenamiento: {memory_usage_before}%')
print(f'Memoria después del entrenamiento: {memory_usage_after}%')



INFO:distributed.worker:Run out-of-band function 'start_tracker'
Exception in thread Thread-16 (join):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'. Did you mean: 'is_alive'?


Tiempo de ejecución: 4.26 segundos
CPU antes del entrenamiento: 97.0%
CPU después del entrenamiento: 83.8%
Memoria antes del entrenamiento: 24.6%
Memoria después del entrenamiento: 25.0%


In [77]:
y_hat = dask_xgboost.predict(client, xg, X_test).persist()
y_hat

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan,) (nan,) Dask graph 1 chunks in 1 graph layer Data type float32 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Dask graph,1 chunks in 1 graph layer,1 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [78]:
xg

<xgboost.core.Booster at 0x7cb3546bf100>

In [79]:
results = y_hat.compute()


In [80]:

# Opcional: aplicar un umbral para convertir las probabilidades en etiquetas de clase binarias
umbral = 0.5
predictions_umbral = (results >= umbral).astype(int)

# Imprimir las predicciones con umbral
print("\nPredicciones binarias con umbral de {}: ".format(umbral))
print(predictions_umbral)


Predicciones binarias con umbral de 0.5: 
[1 0 0 1 0 0 1 0 0 1 1 1 0 1 0 0 1 0 0 0 0 1 0 1 1 0 1 0 0 0 1 1 0 0 0 0 0
 0 0 0 0 0 1 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 1 0 1 1 0 0 0
 1 0 0 0 0 0 1 1 1 0 0 1 1 0 0 0 1 0 1 0 0 1 0 0 0 0 0 1 0 0 0 1 0 1 0 0 0
 0 0 0 0 1 1 0 1 0 0 1 0 0 0 0 0 1 1 0 0 0 0 0 1 1 0 0 0 1 0 0 0 0 0 0 0 0
 0]


In [81]:
y_test.head(10)

622    0
25     1
191    0
332    1
113    0
502    1
728    0
316    0
290    0
364    0
Name: Outcome, dtype: int64

In [82]:
#Calcular la precisión del modelo
accuracy = accuracy_score(y_test,predictions_umbral )
print("Precisión del modelo: {:.2f}%".format(accuracy * 100))

Precisión del modelo: 75.84%


**PERFILACIÓN DE CÓDIGO** (Detectar cuellos de botella)

In [83]:
# Perfilar el proceso de adaptación de k-means
# Creamos un objeto cProfile.Profile para rastrear el tiempo de ejecución de
#diferentes partes del código.
profiler = cProfile.Profile()
# Habilitar la creación de perfiles
profiler.enable()

#Entrenar el modelo
xg = dask_xgboost.train(client, parametros, X_train, y_train, num_boost_round=10)

profiler.disable() # Deshabilitar la creación de perfiles

# Cargar y analizar estadísticas
stats = pstats.Stats(profiler) #creamos un objeto a partir de los datos del perfilador
stats.strip_dirs().sort_stats('time').print_stats(10)  # Imprima las 10 funciones que más tiempo consumen

INFO:distributed.worker:Run out-of-band function 'start_tracker'
Exception in thread Thread-18 (join):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'. Did you mean: 'is_alive'?


         121 function calls in 9.094 seconds

   Ordered by: internal time
   List reduced from 67 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    9.068    2.267    9.068    2.267 {method 'acquire' of '_thread.lock' objects}
        1    0.011    0.011    0.011    0.011 {method 'send' of '_socket.socket' objects}
        2    0.009    0.004    0.014    0.007 contextlib.py:279(helper)
        2    0.005    0.003    0.005    0.003 contextlib.py:102(__init__)
        2    0.000    0.000    0.000    0.000 {built-in method builtins.compile}
        2    0.000    0.000    9.080    4.540 interactiveshell.py:3512(run_code)
        1    0.000    0.000    9.079    9.079 utils.py:376(sync)
        1    0.000    0.000    0.000    0.000 functools.py:35(update_wrapper)
        1    0.000    0.000    9.079    9.079 utils.py:347(sync)
        2    0.000    0.000    0.000    0.000 interactiveshell.py:3337(_update_code_co_name)




<pstats.Stats at 0x7cb364f1ddb0>

In [84]:
client.rebalance()

In [85]:
# Perfilar el proceso de adaptación de k-means
# Creamos un objeto cProfile.Profile para rastrear el tiempo de ejecución de
#diferentes partes del código.
profiler = cProfile.Profile()
# Habilitar la creación de perfiles
profiler.enable()

#Entrenar el modelo
xg = dask_xgboost.train(client, parametros, X_train, y_train, num_boost_round=10)

profiler.disable() # Deshabilitar la creación de perfiles

# Cargar y analizar estadísticas
stats = pstats.Stats(profiler) #creamos un objeto a partir de los datos del perfilador
stats.strip_dirs().sort_stats('time').print_stats(10)  # Imprima las 10 funciones que más tiempo consumen

INFO:distributed.worker:Run out-of-band function 'start_tracker'
Exception in thread Thread-20 (join):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'. Did you mean: 'is_alive'?


         121 function calls in 2.343 seconds

   Ordered by: internal time
   List reduced from 67 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    2.336    0.584    2.336    0.584 {method 'acquire' of '_thread.lock' objects}
        2    0.007    0.004    0.007    0.004 interactiveshell.py:3337(_update_code_co_name)
        2    0.000    0.000    0.000    0.000 {built-in method builtins.compile}
        2    0.000    0.000    2.336    1.168 interactiveshell.py:3512(run_code)
        1    0.000    0.000    2.336    2.336 utils.py:347(sync)
        1    0.000    0.000    2.336    2.336 utils.py:376(sync)
        1    0.000    0.000    0.000    0.000 gen.py:177(coroutine)
        2    0.000    0.000    0.000    0.000 codeop.py:117(__call__)
        1    0.000    0.000    0.000    0.000 {method 'send' of '_socket.socket' objects}
        1    0.000    0.000    0.000    0.000 functools.py:35(update_wrapper)




<pstats.Stats at 0x7cb35a38beb0>

In [86]:
parametros = {'objective': 'binary:logistic',
          'eta': 1, 'max_depth': 3, 'subsample': 0.5}

# Medir tiempo y uso de recursos antes del entrenamiento
start_time0 = time.time()
cpu_usage_before = psutil.cpu_percent(interval=1)
memory_usage_before = psutil.virtual_memory().percent


xg = dask_xgboost.train(client, parametros, X_train, y_train, num_boost_round=10)


# Medir tiempo y uso de recursos después del entrenamiento
end_time = time.time()
execution_time = end_time - start_time0
cpu_usage_after = psutil.cpu_percent(interval=1)
memory_usage_after = psutil.virtual_memory().percent
print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))
print(f'CPU antes del entrenamiento: {cpu_usage_before}%')
print(f'CPU después del entrenamiento: {cpu_usage_after}%')
print(f'Memoria antes del entrenamiento: {memory_usage_before}%')
print(f'Memoria después del entrenamiento: {memory_usage_after}%')

INFO:distributed.worker:Run out-of-band function 'start_tracker'
Exception in thread Thread-22 (join):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'. Did you mean: 'is_alive'?


Tiempo de ejecución: 2.48 segundos
CPU antes del entrenamiento: 100.0%
CPU después del entrenamiento: 98.5%
Memoria antes del entrenamiento: 26.2%
Memoria después del entrenamiento: 26.2%


## **Con Pyspark**

In [None]:
pip install sparkxgb

In [None]:
pip install pyspark

In [None]:
pip install xgboost.spark

In [None]:
pip install xgboost

In [30]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XGBoostApp").getOrCreate()
spark.sparkContext.addPyFile("https://pypi.org/packages/source/x/xgboost/xgboost-1.7.5.tar.gz")




*Importante: Cuando haya problemas al ejecutar from xgboost.spark import SparkXGBClassifier desconecta y borra el tiempo de ejecucioón y vuelve a correr .*

In [31]:
from pyspark.ml.feature import VectorAssembler
import time
import psutil
from xgboost.spark import SparkXGBClassifier

In [32]:
# Iniciar el servidor PySpark
# "local[4]" indica que Spark debe usar 4 núcleos de CPU.
spark = SparkSession.builder \
    .appName('cluster') \
    .master("local[4]") \
    .getOrCreate()
print('Spark Version: {}'.format(spark.version))

Spark Version: 3.1.1


In [33]:
#Cargar los datos
dataset = spark.read.csv("/content/diabetes.csv",header=True,inferSchema=True)

#show permite mostrar los datos del archivo
dataset.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [34]:
# VectorAssembler para convertir las columnas de características en un solo vector
vec_assembler = VectorAssembler(inputCols = ['Glucose','BMI','Age','Outcome'],
                                outputCol='Features')

data_final = vec_assembler.transform(dataset)
data_final.select('Features').show(5)

+--------------------+
|            Features|
+--------------------+
|[148.0,33.6,50.0,...|
|[85.0,26.6,31.0,0.0]|
|[183.0,23.3,32.0,...|
|[89.0,28.1,21.0,0.0]|
|[137.0,43.1,33.0,...|
+--------------------+
only showing top 5 rows



In [35]:
# Medir el tiempo de ejecución
start_time0 = time.time()

train_data, test = data_final.randomSplit(weights=[0.8,0.2], seed=30)

# Calcular el tiempo de ejecución
end_time = time.time()
execution_time = end_time - start_time0

print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))

Tiempo de ejecución: 0.02 segundos


In [36]:
train_data.count()

617

In [37]:
test.count()

151

In [38]:
#Entrenar el modelo
xgb_classifier = SparkXGBClassifier(features_col='Features',  label_col="Outcome",  num_workers=2,eta= 1, max_depth= 3, subsample= 0.5)


# Medir tiempo y uso de recursos antes del entrenamiento
start_time0 = time.time()
cpu_usage_before = psutil.cpu_percent(interval=1)
memory_usage_before = psutil.virtual_memory().percent

# Entrenar el modelo
model=xgb_classifier.fit(train_data)

# Medir tiempo y uso de recursos después del entrenamiento
end_time = time.time()
execution_time = end_time - start_time0
cpu_usage_after = psutil.cpu_percent(interval=1)
memory_usage_after = psutil.virtual_memory().percent
print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))
print(f'CPU antes del entrenamiento: {cpu_usage_before}%')
print(f'CPU después del entrenamiento: {cpu_usage_after}%')
print(f'Memoria antes del entrenamiento: {memory_usage_before}%')
print(f'Memoria después del entrenamiento: {memory_usage_after}%')

INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 3, 'subsample': 0.5, 'eta': 1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


Tiempo de ejecución: 9.02 segundos
CPU antes del entrenamiento: 24.1%
CPU después del entrenamiento: 10.2%
Memoria antes del entrenamiento: 16.2%
Memoria después del entrenamiento: 18.0%


**PERFILACIÓN DE CÓDIGO (Detectar cuellos de botella)**

https://docs.python.org/es/3/library/profile.html

La perfilación de código nos permite identificar partes de nuestro código que
son cuellos de botella, tanto en términos de tiempo de cálculo como de uso de la memoria

*profile* es un conjunto de estadísticas que describe con qué frecuencia y durante cuánto tiempo se ejecutaron varias partes del programa. Estas estadísticas pueden formatearse en informes a través del módulo *pstats*.

**ncalls:** Número de veces que se ha llamado a la función.

**tottime:** Tiempo total gastado en la función, sin contar el tiempo en llamadas a subfunciones.

**percall:** Promedio de tiempo por llamada (igual a tottime/ncalls).

**cumtime:** Tiempo acumulado gastado en esta función y todas sus subfunciones (desde la entrada hasta la salida de esta función).

**percall:** Promedio de tiempo por llamada para el tiempo acumulado (igual a cumtime/ncalls).

**filename** (function): Archivo y línea donde se define la función.

In [39]:
import cProfile
import pstats

# Perfilar el proceso de adaptación de k-means
# Creamos un objeto cProfile.Profile para rastrear el tiempo de ejecución de
#diferentes partes del código.
profiler = cProfile.Profile()
# Habilitar la creación de perfiles
profiler.enable()
# Ejecutar el código que queremos analizar
# código que queremos analizar
# Entrenar el modelo
model=xgb_classifier.fit(train_data)

profiler.disable() # Deshabilitar la creación de perfiles

# Cargar y analizar estadísticas
stats = pstats.Stats(profiler) #creamos un objeto a partir de los datos del perfilador
stats.strip_dirs().sort_stats('time').print_stats(10)  # Imprima las 10 funciones que más tiempo consumen

INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 3, 'subsample': 0.5, 'eta': 1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


         34491 function calls (34352 primitive calls) in 8.476 seconds

   Ordered by: internal time
   List reduced from 602 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      311    8.338    0.027    8.338    0.027 {method 'recv_into' of '_socket.socket' objects}
      297    0.039    0.000    0.039    0.000 {method 'sendall' of '_socket.socket' objects}
     1056    0.013    0.000    0.013    0.000 {built-in method builtins.getattr}
        1    0.008    0.008    0.008    0.008 core.py:1840(save_config)
     4589    0.006    0.000    0.006    0.000 {built-in method builtins.isinstance}
        8    0.005    0.001    0.005    0.001 socket.py:545(send)
        1    0.004    0.004    0.004    0.004 core.py:2562(load_model)
        1    0.004    0.004    0.016    0.016 __init__.py:267(<listcomp>)
      297    0.004    0.000    8.378    0.028 java_gateway.py:1178(send_command)
        3    0.003    0.001    0.005    0.002 cloudpic

<pstats.Stats at 0x7bfad84a71f0>

Aplicamos persistencia de los datos para que se obtenga más rapido los valores

In [40]:
train_data.persist()

DataFrame[Pregnancies: int, Glucose: int, BloodPressure: int, SkinThickness: int, Insulin: int, BMI: double, DiabetesPedigreeFunction: double, Age: int, Outcome: int, Features: vector]

In [41]:
#Entrenar el modelo
xgb_classifier = SparkXGBClassifier(features_col='Features',  label_col="Outcome",  num_workers=2,eta= 1, max_depth= 3, subsample= 0.5)


# Medir tiempo y uso de recursos antes del entrenamiento
start_time0 = time.time()
cpu_usage_before = psutil.cpu_percent(interval=1)
memory_usage_before = psutil.virtual_memory().percent

# Entrenar el modelo
model=xgb_classifier.fit(train_data)

# Medir tiempo y uso de recursos después del entrenamiento
end_time = time.time()
execution_time = end_time - start_time0
cpu_usage_after = psutil.cpu_percent(interval=1)
memory_usage_after = psutil.virtual_memory().percent
print("Tiempo de ejecución: {:.2f} segundos".format(execution_time))
print(f'CPU antes del entrenamiento: {cpu_usage_before}%')
print(f'CPU después del entrenamiento: {cpu_usage_after}%')
print(f'Memoria antes del entrenamiento: {memory_usage_before}%')
print(f'Memoria después del entrenamiento: {memory_usage_after}%')

INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 3, 'subsample': 0.5, 'eta': 1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


Tiempo de ejecución: 7.01 segundos
CPU antes del entrenamiento: 64.1%
CPU después del entrenamiento: 59.4%
Memoria antes del entrenamiento: 18.6%
Memoria después del entrenamiento: 18.7%


In [42]:
#Realizar predicciones
predictions_prob =model.transform(test)
predictions_prob.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+----------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            Features|       rawPrediction|prediction|         probability|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+--------------------+----------+--------------------+
|          0|     74|           52|           10|     36|27.8|                   0.269| 22|      0|[74.0,27.8,22.0,0.0]|[5.63122272491455...|       0.0|[0.99642860889434...|
|          0|     78|           88|           29|     40|36.9|                   0.434| 21|      0|[78.0,36.9,21.0,0.0]|[5.63122272491455...|       0.0|[0.99642860889434...|
|          0|     86|           68|           32|      0|35.8|                   0.238| 25|      0|[86.0,35.8,25.0,0.0]|[5.6312227

In [46]:
# Show the predictions with predicted labels
predictions_prob.select("Outcome", "prediction", "probability").show(20)

+-------+----------+--------------------+
|Outcome|prediction|         probability|
+-------+----------+--------------------+
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      1|       1.0|[0.00630712509155...|
|      0|       0.0|[0.99642860889434...|
|      1|       1.0|[0.00630712509155...|
|      1|       1.0|[0.00630712509155...|
|      1|       1.0|[0.00630712509155...|
|      0|       0.0|[0.99642860889434...|
|      0|       0.0|[0.99642860889434...|
|      1|       1.0|[0.00630712509155...|
|      1|       1.0|[0.00630712509155...|
+-------+----------+--------------

In [44]:
test.select("Outcome").show(5)

+-------+
|Outcome|
+-------+
|      0|
|      0|
|      0|
|      0|
|      0|
+-------+
only showing top 5 rows



In [47]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Evaluar el modelo
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_prob)
print(f"Accuracy: {accuracy}")

Accuracy: 1.0


In [48]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction",  metricName="weightedPrecision")
precision = evaluator.evaluate(predictions_prob)
print(f"Precision: {precision}")

Precision: 1.0


In [26]:
# Cerrar la sesión de Spark al finalizar
spark.stop()