In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql import functions as f
from pyspark.sql.functions import col, expr, count, when, sum as spark_sum

from pathlib import Path
import os

# Rutas

In [3]:
BASE_DIR = Path(os.getcwd()).parent
RAW_DATA_FILE = BASE_DIR / 'data' / 'raw' / '220720COVID19MEXICO.csv' 

# Crear SparkSession

In [4]:
spark = SparkSession.builder.appName('comorbilidades').getOrCreate()
df = spark.read.csv(
    str(RAW_DATA_FILE),
    header = True,
    inferSchema=True,
    mode = 'DROPMALFORMED',
    multiLine=False,
    escape = '\"')
df.show()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/21 14:05:06 WARN Utils: Your hostname, jair-Vivobook-ASUSLaptop-X1404ZA-F1404ZA, resolves to a loopback address: 127.0.1.1; using 192.168.0.13 instead (on interface wlo1)
26/02/21 14:05:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/21 14:05:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/21 14:06:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+----------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+---------------------+------------------+-------------------+--------+-----------------+-----------+---+
|FECHA_ACTUALIZACION|ID_REGISTRO|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO|FECHA_SINTOMAS| FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_LAB|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI|
+-------------------+-----------+------+------+-------

### Eliminar columnas de fechas

In [6]:
fechas = [col for col in df.columns if 'FECHA_' in col and col != 'FECHA_DEF' and col != 'FECHA_INGRESO']
eliminar = ['ID_REGISTRO'] + fechas
df = df.drop(*eliminar)
df.show()

+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+----------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+---------------------+------------------+-------------------+--------+-----------------+-----------+---+
|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO| FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_LAB|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI|
+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+----------+--------+--------+----+------------+----

### ELiminar los nulos


In [8]:
antes_null = df.count()
df = df.na.drop()
despues_null = df.count()
print(f'Valores nulos: {antes_null - despues_null}')



Valores nulos: 13


                                                                                

### Correcciónes ortográficas y fechas

In [9]:
df = df.withColumn('PAIS_NACIONALIDAD',
              when(col('PAIS_NACIONALIDAD') ==  'MÃ©xico', 'México'))

df = df.withColumn('FECHA_DEF',
              when(col('FECHA_DEF') != '9999-99-99', col('FECHA_DEF')).otherwise(None))

df = df.withColumn('FECHA_DEF',
                   expr("try_cast(FECHA_DEF as date)"))

df = df.filter((col('EDAD') >= 0 ) & (col('EDAD') <= 100))

### Castear datos que son numericos

In [10]:
numericos = [col for col in df.columns if col != 'PAIS_NACIONALIDAD' and col != 'FECHA_DEF' and col != 'FECHA_INGRESO']
for c in numericos:
    df.withColumn(c, col(c).cast(IntegerType()))

### Nuevas columnas para un análisis estadístico

In [17]:
def crear_variables_comorblidad(df):
    condiciones = ['DIABETES', 'HIPERTENSION', 'OBESIDAD', 'ASMA', 'EPOC', 'INMUSUPR', 'RENAL_CRONICA', 'TABAQUISMO']
    # Ponerles un contador
    df = df.withColumn('N_COMORBILIDADES',
                       when(col('DIABETES') == 1, 1).otherwise(0) + 
                       when(col('HIPERTENSION') == 1, 1).otherwise(0)+
                       when(col('OBESIDAD') == 1, 1).otherwise(0)+
                       when(col('ASMA') == 1, 1).otherwise(0)+
                       when(col('EPOC') == 1, 1).otherwise(0)+
                       when(col('INMUSUPR') == 1, 1).otherwise(0)+
                       when(col('RENAL_CRONICA') == 1, 1).otherwise(0)+
                       when(col('TABAQUISMO') == 1, 1).otherwise(0)                    
                       )
    df = df.withColumn('COMOR_CARSIOVASCULAR',
                       when((col('HIPERTENSION') == 1) | (col('DIABETES') == 1) | (col('OBESIDAD') == 1), 1).otherwise(0)
                       )
    
    df = df.withColumn('COMOR_RESPIRATORIA',
                       when((col('ASMA') == 1) | (col('EPOC') == 1), 1).otherwise(0)
                       )
    
    df = df.withColumn('COMR_INMUSUPR',
                       when(col('INMUSUPR') == 1, 1).otherwise(0)
                       )
    
    df = df.withColumn('COMR_RENAL',
                       when(col('RENAL_CRONICA') == 1, 1).otherwise(0)
                       )
    
    df = df.withColumn('RIESGO', 
                       when(col('N_COMORBILIDADES') >= 3, 'EXTREMO')
                       .when(col('N_COMORBILIDADES') == 2, 'ALTO')
                       .when(col('N_COMORBILIDADES') == 1, 'MODERADO')
                       .when(col('N_COMORBILIDADES') == 0, 'NORMAL')
                       )
    return df
df = crear_variables_comorblidad(df)