# Monitoreo y Depuración de Apache Spark

Este laboratorio te enseñará cómo monitorear y depurar una aplicación Spark a través de la interfaz web.

## Objetivos

Después de completar este laboratorio, podrás:

1. Iniciar un clúster Spark en modo independiente y conectarte con el shell de PySpark.
2. Crear un DataFrame y abrir la interfaz web de la aplicación.
3. Depurar un error en tiempo de ejecución localizando la tarea fallida en la interfaz web.
4. Ejecutar una consulta SQL para monitorear y luego escalar agregando otro trabajador al clúster.

# Ejercicio 1 : Iniciar un Clúster Standalone de Spark

En este ejercicio, inicializarás un Clúster Standalone de Spark con un Maestro y un Trabajador. A continuación, iniciarás una consola de PySpark que se conecta al clúster y abrirás la interfaz web de la aplicación Spark para monitorearla. Estaremos utilizando el terminal Theia para ejecutar comandos y contenedores basados en docker para lanzar los procesos de Spark.

### Tarea A : Descargar Datos de Ejemplo para Spark

1. Abre un terminal Theia haciendo clic en el elemento del menú `Terminal -> New Terminal`.
    
2. Utiliza el siguiente comando para descargar el conjunto de datos que utilizaremos en este laboratorio al contenedor que ejecuta Spark.
    

```bash
wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/cars.csv

```

# Tarea B : Inicializar el Clúster

1. Detén cualquier contenedor que esté corriendo previamente con el comando:

```bash
for i in `docker ps | awk '{print $1}' | grep -v CONTAINER`; do docker kill $i; done
```

2. Eliminar cualquier contenedor previamente utilizado:  
      
    Ignorar cualquier error que diga “No existe tal contenedor”

```bash
docker rm spark-master spark-worker-1 spark-worker-2
```

3. Inicia el servidor Master de Spark:

```bash
docker run \
    --name spark-master \
    -h spark-master \
    -e ENABLE_INIT_DAEMON=false \
    -p 4040:4040 \
    -p 8080:8080 \
    -v `pwd`:/home/root \
    -d bde2020/spark-master:3.1.1-hadoop3.2
```

4. Iniciar un trabajador de Spark que se conectará al Maestro:

```bash
docker run \
    --name spark-worker-1 \
    --link spark-master:spark-master \
    -e ENABLE_INIT_DAEMON=false \
    -p 8081:8081 \
    -v `pwd`:/home/root \
    -d bde2020/spark-worker:3.1.1-hadoop3.2
```

# Tarea C : Conectar un Shell de PySpark al Clúster y Abrir la Interfaz de Usuario

1. Inicie un shell de PySpark en el contenedor del Maestro de Spark en ejecución:

```bash
docker exec \
    -it `docker ps | grep spark-master | awk '{print $1}'` \
    /spark/bin/pyspark \
    --master spark://spark-master:7077
```

2. Crea un DataFrame en la terminal con:
    
    > **NOTA:** Presiona Enter dos veces para continuar después de ejecutar el comando en la terminal.
    

```bash
df = spark.read.csv("/home/root/cars.csv", header=True, inferSchema=True) \
    .repartition(32) \
    .cache()
df.show()
```

3. Haz clic en el botón Skills Network a la izquierda, se abrirá la “Caja de herramientas de Skills Network”. Luego haz clic en `OTHER` y después en `Launch Application`. Desde allí deberías poder ingresar el número de puerto como `4040` y lanzar la interfaz de usuario de la aplicación Spark en tu navegador.

    ![](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Launch_Application--new_IDE.png)

  
  

4. Verifica que puedes ver la página de trabajos de la aplicación que debería verse como la siguiente, aunque no necesariamente exactamente igual:

    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/SparkUI-Initial-Page.png)
    

# Ejercicio 2 : Ejecutar una consulta SQL y depurar en la interfaz de la aplicación

En este ejercicio, definirás una función definida por el usuario (UDF) y ejecutarás una consulta que resulta en un error. Localizaremos ese error en la interfaz de la aplicación y encontraremos la causa raíz. Finalmente, corregiremos el error y volveremos a ejecutar la consulta.

### Tarea A : Ejecutar una consulta SQL

1. Define una UDF para mostrar el tipo de motor. Copia y pega el código y haz clic en `Enter`.

```python
from pyspark.sql.functions import udf
import time

@udf("string")
def engine(cylinders):
    time.sleep(0.2)  # Intentionally delay task
    eng = {6: "V6", 8: "V8"}
    return eng[cylinders]
```

2. Agrega la UDF como una columna en el DataFrame

```
df = df.withColumn("engine", engine("cylinders"))
```

3. Agrupar el DataFrame por “cylinders” y agregar otras columnas

```python
dfg = df.groupby("cylinders")
```
```python
dfa = dfg.agg({"mpg": "avg", "engine": "first"})
```
```python
dfa.show()
```

4. La consulta habrá fallado y deberías ver muchos mensajes y salidas en la consola. La siguiente tarea será localizar el error en la interfaz de usuario de la aplicación y determinar la causa raíz.

# Tarea B : Depurar el error en la interfaz de la aplicación

1. Encuentra el error en la interfaz de la aplicaciónAbre la interfaz de usuario en los trabajos, mira la lista de trabajos fallidos, haz clic en el primer trabajo.

    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Failure-Jobs-tab.png)

2. Esto mostrará los detalles del trabajo  
    con una lista de etapas para ese trabajo. En la lista de etapas fallidas, haz clic en la primera  
    etapa fallida para mostrar los detalles de la etapa con una lista de tareas para esa etapa.

    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Failure-Jobs-detail.png)

3. Aquí vemos muchastareas fallidas. Al mirar la primera, la columna de la derecha muestra detalles de la falla.

    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Failure-Stage-list.png)

Haz clic para expandir los detalles.

![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Failure-Task-details.png)

Desplázate hacia abajo un poco hasta que puedas ver la última parte del error de Python que muestra la causa. Deberías poder ver que esto fue causado por un KeyError en nuestra UDF  
`engine()`.

También podrías ver estos errores mirando la columna que tiene enlaces a los registros y hacer clic en  
“std err” para mostrar el registro de errores estándar.

Cierra la pestaña del navegador de PySpark.

4. En la terminal de Theia, corrige la UDF agregando una entrada al diccionario de tipos de motor y proporciona un valor predeterminado para todos los demás tipos. Copia y pega este código y presiona `Enter`.

```python
@udf("string")
def engine(cylinders):
    time.sleep(0.2)  # Retrasar intencionadamente la tarea
    eng = {4: "inline-four", 6: "V6", 8: "V8"}
    return eng.get(cylinders, "other")
```

5. Vuelve a ejecutar la consulta. Tendrás que agregar la columna “engine” nuevamente e ingresar la consulta ya que cambiamos la UDF.

```python
df = df.withColumn("engine", engine("cylinders"))
```
```python
dfg = df.groupby("cylinders")
```
```python
dfa = dfg.agg({"mpg": "avg", "engine": "first"})
```
```python
dfa.show()
```

Once the query completes without errors, you should see output similar to this.

```bash
+---------+------------------+-------------+                                    
|cylinders|          avg(mpg)|first(engine)|
+---------+------------------+-------------+
|        6|19.985714285714288|           V6|
|        3|             20.55|        other|
|        5|27.366666666666664|        other|
|        4|29.286764705882348|  inline-four|
|        8|14.963106796116506|           V8|
+---------+------------------+-------------+
```

# Ejercicio 3 : Monitorear el Rendimiento de la Aplicación con la UI

Ahora que hemos ejecutado nuestra consulta con éxito, escalaremos nuestra aplicación añadiendo un trabajador al clúster. Esto permitirá que el clúster ejecute más tareas en paralelo y mejore el rendimiento general.

### Tarea A : Agregar un Trabajador al Clúster

1. Ve a la pestaña de Etapas, luego haz clic en la etapa con 32 tareas. En esa etapa, nuestra UDF se está aplicando a cada partición del DataFrame.

![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Perf-stage-udf.png)

Al observar la línea de tiempo, puedes ver que hay un único trabajador con id `0 / <ip-address>` que puede ejecutar hasta una cierta cantidad de tareas en paralelo al mismo tiempo. Agregar otro trabajador permitirá que se ejecute una tarea adicional en paralelo.

![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Perf-parallel-tasks-1.png)

2. Abre un nuevo terminal de Theia haciendo clic en el elemento del menú `Terminal -> New Terminal`.
    
3. Agrega un segundo trabajador al clúster con el comando en el nuevo terminal:
    

```bash
docker run \
    --name spark-worker-2 \
    --link spark-master:spark-master \
    -e ENABLE_INIT_DAEMON=false \
    -p 8082:8082 \
    -d bde2020/spark-worker:3.1.1-hadoop3.2
```

4. Si el comando es exitoso, habrá una única salida que mostrará el id del contenedor:

```bash
theia@theiadocker-user:/home/project$ docker run \
>     --name spark-worker-2 \
>     --link spark-master:spark-master \
>     -e ENABLE_INIT_DAEMON=false \
>     -p 8082:8082 \
>     -d bde2020/spark-worker:3.1.1-hadoop3.2
1935a71827668ae3476e6a16f0bebcd4c2a342a21271dc22be487aa1b1731708
theia@theiadocker-user:/home/project$
```

5. Haz clic de nuevo en la primera terminal que tiene el shell de PySpark abierto para continuar.

# Tarea B : Volver a ejecutar la consulta y verificar el rendimiento

1. Vuelve a ejecutar la consulta, esta vez simplemente podemos llamar a `show()` nuevamente:

```python
dfa.show()
```

2. Inicie la aplicación en el número de puerto `4040` siguiendo el mismo proceso que arriba, para abrir el navegador de PySpark.  
    Vaya a la pestaña **Stages** y vea el Id de la etapa más reciente. 
     
    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/RecentStageView.png)
    
3. Verá que el trabajador adicional con id `1 / <ip-address>` está listado y ahora permite que se ejecuten más tareas en paralelo. La línea de tiempo de las tareas debería parecerse a lo siguiente.
    

    ![image](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/Perf-parallel-tasks-2.png)