
### Pontificia Universidad Javeriana

*Autor:* S. Gonzalez

*Fecha:* 5-nov-24

*Cuaderno:* Primeros pasos con PySpark

*Tema:* Limpieza de datos y predicción usando PySpark

Se instalan las bibliotecas para (Una sola vez se instala):

- Spark (findspark)
- Numpy
- Seaborn
- matplotlib
- scikit-learn
- squarify

In [1]:
!pip install numpy



In [2]:
!pip install pyspark



In [3]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql.types import *
from pyspark.sql.functions import input_file_name, mean, col, split, regexp_extract, when, lit, isnan, count
from pyspark import SparkFiles

from pyspark.conf import SparkConf
import os 

## Se configura el entorno spark



In [4]:
SPARK_MASTER_URL = os.getenv("SPARK_MASTER_URL","spark://dos01:7077")
configura=SparkConf()
configura.setMaster(SPARK_MASTER_URL)
configura.set('spark.local.dir','/almacen/TrabajosSpark/')
configura.setAppName("Proyecto00_Spark_Stroke_Gonzalez_Meneses")

configura.set('spark.cores.max',6)
configura.set('spark.executor.cores',6)

spark = SparkSession.builder.config(conf=configura).getOrCreate()
SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

print("Sesion creada: HPC")
spark

/almacen/Spark/conf/spark-env.sh: línea 49: SPARK_MASTER_HOST: orden no encontrada
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:03:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/12 21:03:21 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


Sesion creada: HPC


In [5]:
dfStroke00 = spark.read.csv("stroke_pyspark.csv", header=True, inferSchema=True)

                                                                                

In [6]:
dfStroke00.show(2)

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
only showing top 2 rows



In [7]:
### Tipo de datos

dfStroke00.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [8]:
# Se requiere cambiar los nombres de las columnas

nuevosNombres= ['ID',
 'Genero',
 'Edad',
 'Hipertension',
 'Enfermedad',
 'Casado',
 'Trabajo',
 'Tipo_Residencia',
 'Nivel_Prom_Glucosa',
 'IMC',
 'Fumador',
 'Paro_Cardiaco']

dfStroke01=dfStroke00

for antes, nuevo in zip(dfStroke00.columns, nuevosNombres):
    dfStroke01 = dfStroke01.withColumnRenamed(antes, nuevo) 

In [9]:
### Pasar IMC a double

dfStroke01=dfStroke01.withColumn('IMC', dfStroke01.IMC.cast(DoubleType()))

In [10]:
nulos = (201/dfStroke01.count())*100
print(nulos)

3.9334637964774952


                                                                                

In [11]:
## Se estratifica por edades y género, de modo que cada 10años se saca el ICM promedio por género

dfStroke01.groupby(['Genero']).count().show()

+------+-----+
|Genero|count|
+------+-----+
|Female| 2994|
| Other|    1|
|  Male| 2115|
+------+-----+



In [12]:
# Se elimina el other

dfStroke02 = dfStroke01.where("Genero <> 'Other'")
dfStroke02.groupby(['Genero']).count().show()

[Stage 11:>                                                         (0 + 1) / 1]

+------+-----+
|Genero|count|
+------+-----+
|Female| 2994|
|  Male| 2115|
+------+-----+



                                                                                

In [13]:
AvIMC_0_10 = dfStroke02.where((col("Genero") == lit("Female")) & (col("Edad") < 10)).select(mean(col("IMC"))).collect()

In [14]:
print(AvIMC_0_10)

[Row(avg(IMC)=18.687962962962963)]


In [15]:
#Se hace la funcion para el cambio

def cambioPromedio(df, col01, catGenero, col02, minEdad, maxEdad, col03):
    prom = df.where((col(col01) == lit(catGenero)) & (col(col02) > minEdad) & (col(col02) < maxEdad)).select(mean(col(col03))).collect()[0][0]
    print(prom)
    dfResul = df.withColumn(col03, when(((df[col01] == catGenero) & (df[col03].isNull()) & (df[col02]<maxEdad)), prom).otherwise(df[col03]))
    return dfResul

In [16]:
dfStroke03 = dfStroke02
for promedio in range(0,100,10):
    dfStroke03 = cambioPromedio(dfStroke03, "Genero", "Female", "Edad", promedio, promedio+10, "IMC")

18.687962962962963


                                                                                

25.553556485355664
28.418373493975906


                                                                                

31.142359249329765
31.123333333333324


                                                                                

31.915686274509792
30.62181208053691
29.3374558303887
28.101428571428578
None


                                                                                