# Práctica 5:
En el siguiente notebook vamos a trabajar con PySpark. En él, vamos a usar las sentencias necesarias para obtener una muestra de un conjunto de datos.

El primer paso que debemos dar es conectar Google Colab con nuestro Google Drive. Para ello, lanzaremos el siguiente trozo de código.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!ls

drive  sample_data


Una vez hemos conectado Google Drive, pasamos a instalar Apache Spark en nuestro notebook. En concreto, vamos a trabajar con la versión 3.4.4.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q --show-progress https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz



In [None]:
!ls


drive  sample_data  spark-3.5.4-bin-hadoop3.tgz


In [None]:
!tar xf  spark-3.5.4-bin-hadoop3.tgz

Además de la instalación habitual, hay un paso más que debemos dar, y es la instalación de la librería *findspark*. Esta librería nos permitirá encontrar la instalación de Apache Spark en nuestro sistema.

Establecemos también un par de variables de entorno.

In [None]:
!pip install -q findspark
!pip show findspark

Name: findspark
Version: 2.0.1
Summary: Find pyspark to make it importable.
Home-page: https://github.com/minrk/findspark
Author: Min RK
Author-email: benjaminrk@gmail.com
License: BSD (3-clause)
Location: /usr/local/lib/python3.11/dist-packages
Requires: 
Required-by: 


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.5.4-bin-hadoop3'

Una vez tenemos instalado nuestro framework, es momento de crear nuestra variable SparkSession, de la cual parten la mayoría de funcionalidades de SparkSQL

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Pyspark_SQL")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

Además de la instalación habitual, hay un paso más que debemos dar, y es la instalación de la librería *findspark*. Esta librería nos permitirá encontrar la instalación de Apache Spark en nuestro sistema.

Establecemos también un par de variables de entorno.

Una vez tenemos instalado nuestro framework, es momento de crear nuestra variable SparkSession, de la cual parten la mayoría de funcionalidades de SparkSQL

### Lectura de datos

Ahora sí, podemos comenzar a trabajar con SparkSQL. Comenzaremos leyendo nuestro conjunto de datos, el cual contiene información de las calificaciones de alumnos en algunas materias.

In [None]:
!pwd

/content


In [None]:
# Importar las librerías necesarias
from google.colab import files
import pandas as pd

data_path = '/content/drive/MyDrive/TokioSchool/data/datos_dashboard_simulados.xlsx'

# Cargar el archivo Excel
excel_df = pd.read_excel(data_path, engine='openpyxl')

# 3. Revisar los tipos de datos en el DataFrame de pandas
print("Tipos de datos antes de la conversión:")
print(excel_df.dtypes)

# 4. Uniformizar los tipos de datos
# Supongamos que quieres que todas las columnas numéricas sean del tipo 'int'
# También puedes optar por 'float' si es necesario

# Cambiar tipos de datos a int, si son numéricos
for column in excel_df.columns:
    if pd.api.types.is_numeric_dtype(excel_df[column]):
        excel_df[column] = pd.to_numeric(excel_df[column], downcast='integer', errors='coerce')

# Verificar los tipos de datos después de la conversión
print("Tipos de datos después de la conversión:")
print(excel_df.dtypes)

# 5. Convertir el DataFrame de pandas a un DataFrame de PySpark
df = spark.createDataFrame(excel_df)

# Verificar la estructura del DataFrame de PySpark
df.printSchema()
df.show()

Tipos de datos antes de la conversión:
ID_Estudiante                         int64
Conexiones_Semanales                float64
Tareas_Entregadas                     int64
Participacion_Foros                   int64
Riesgo_Abandono_Cal                   int64
Abandono                              int64
Calificacion_Antes                  float64
Calificacion_Despues                float64
Tiempo_Respuesta_Tutor (minutos)      int64
Microabandono                         int64
EvoluciónDesempeño                  float64
Motivacion_Encuesta                   int64
Tasa_Retención_Estudiantes          float64
Participación_Estudiante            float64
Índice_Progreso_Curso               float64
Evolución_desempeño                 float64
Tiempo_Respuesta_Tutor              float64
Tasa_Abandono                       float64
Porcentaje_Tareas_entregadas          int64
Porcentaje_participación_foro         int64
Porcentaje_acceso                   float64
Índice_participación_general        f





### Cálculos

In [None]:
#Importamos las librerías necesarias
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

#Modificación del tipo de dato a float
df = df.withColumn('Conexiones_Semanales', col('Conexiones_Semanales').cast("float"))
df = df.withColumn('Tareas_Entregadas', col('Tareas_Entregadas').cast("float"))
df = df.withColumn('Participacion_Foros', col('Participacion_Foros').cast("float"))
df = df.withColumn('Riesgo_Abandono_Cal', col('Riesgo_Abandono_Cal').cast("float"))

# Mostrar el esquema para verificar los tipos de datos
df.printSchema()
df.show()

root
 |-- ID_Estudiante: long (nullable = true)
 |-- Conexiones_Semanales: float (nullable = true)
 |-- Tareas_Entregadas: float (nullable = true)
 |-- Participacion_Foros: float (nullable = true)
 |-- Riesgo_Abandono_Cal: float (nullable = true)
 |-- Abandono: long (nullable = true)
 |-- Calificacion_Antes: double (nullable = true)
 |-- Calificacion_Despues: double (nullable = true)
 |-- Tiempo_Respuesta_Tutor (minutos): long (nullable = true)
 |-- Microabandono: long (nullable = true)
 |-- EvoluciónDesempeño: double (nullable = true)
 |-- Motivacion_Encuesta: long (nullable = true)
 |-- Tasa_Retención_Estudiantes: double (nullable = true)
 |-- Participación_Estudiante: double (nullable = true)
 |-- Índice_Progreso_Curso: double (nullable = true)
 |-- Evolución_desempeño: double (nullable = true)
 |-- Tiempo_Respuesta_Tutor: double (nullable = true)
 |-- Tasa_Abandono: double (nullable = true)
 |-- Porcentaje_Tareas_entregadas: long (nullable = true)
 |-- Porcentaje_participación_foro

In [None]:
#Creación de la columna 'features'
feat = VectorAssembler(inputCols=["Conexiones_Semanales", "Tareas_Entregadas", "Participacion_Foros","Riesgo_Abandono_Cal"], outputCol="features")
feat_df = feat.transform(df)

In [None]:
#Mostramos como se ha creado correctamente la columna 'features'
feat_df.show()

+-------------+--------------------+-----------------+-------------------+-------------------+--------+------------------+--------------------+--------------------------------+-------------+------------------+-------------------+--------------------------+------------------------+---------------------+-------------------+----------------------+-------------+----------------------------+-----------------------------+-----------------+----------------------------+--------------------+------------------+-----------------+
|ID_Estudiante|Conexiones_Semanales|Tareas_Entregadas|Participacion_Foros|Riesgo_Abandono_Cal|Abandono|Calificacion_Antes|Calificacion_Despues|Tiempo_Respuesta_Tutor (minutos)|Microabandono|EvoluciónDesempeño|Motivacion_Encuesta|Tasa_Retención_Estudiantes|Participación_Estudiante|Índice_Progreso_Curso|Evolución_desempeño|Tiempo_Respuesta_Tutor|Tasa_Abandono|Porcentaje_Tareas_entregadas|Porcentaje_participación_foro|Porcentaje_acceso|Índice_participación_general|Riesgo_Mi

In [None]:
#Importamos las librerías
from pyspark.ml.clustering import BisectingKMeans

#Definimos el número de clusters que vamos a crear (3 en este caso)
bkm = BisectingKMeans().setK(3).setSeed(1)

In [None]:
#Entrenamos el modelo
model = bkm.fit(feat_df)

#Asignamos un custer a cada una de las filas de nuestro dataset
predictions = model.transform(feat_df)

#Mostramos las predicciones
predictions.show(100)

+-------------+--------------------+-----------------+-------------------+-------------------+--------+------------------+--------------------+--------------------------------+-------------+------------------+-------------------+--------------------------+------------------------+---------------------+-------------------+----------------------+-------------+----------------------------+-----------------------------+-----------------+----------------------------+--------------------+------------------+--------------------+----------+
|ID_Estudiante|Conexiones_Semanales|Tareas_Entregadas|Participacion_Foros|Riesgo_Abandono_Cal|Abandono|Calificacion_Antes|Calificacion_Despues|Tiempo_Respuesta_Tutor (minutos)|Microabandono|EvoluciónDesempeño|Motivacion_Encuesta|Tasa_Retención_Estudiantes|Participación_Estudiante|Índice_Progreso_Curso|Evolución_desempeño|Tiempo_Respuesta_Tutor|Tasa_Abandono|Porcentaje_Tareas_entregadas|Porcentaje_participación_foro|Porcentaje_acceso|Índice_participación_gen

In [None]:
# Eliminar la columna 'features' y seleccionar columnas necesarias
result_df = predictions.select("ID_Estudiante", "Conexiones_Semanales", "Tareas_Entregadas", "Participacion_Foros", "Riesgo_Abandono_Cal", "prediction")

# Descargar tabla
output_path = '/content/drive/MyDrive/TokioSchool/data/predicciones_kmeans.csv'

# Guardar las predicciones como CSV
result_df.write.mode("overwrite").csv(output_path, header=True)