# Modelo de Regresión Logística

---

#### Carlos David Nieto Loya

#### Tarea 4

* **Objetivo:** Predecir -a partir de sus características físicas y médicas- si una persona infectada de COVID-19 tiene probabilidades altas de ser internada en un hospital a causa de que la enfermedad se complicó.

In [1]:
import math
import requests
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from requests.exceptions import ConnectionError

In [2]:
try:
    from pyspark.sql import SparkSession
except:
    import findspark
    findspark.init()
    from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("COVID") \
        .config("hive.exec.dynamic.partition", "true")\
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport()\
        .getOrCreate()

sc=spark.sparkContext

In [4]:
spark.version

'2.4.5'

In [5]:
#configurado para traer 10000 rows
# url = 'https://datos.cdmx.gob.mx/api/records/1.0/search/?dataset=casos-asociados-a-covid-19&q=&rows=10000&facet=fecha_actualizacion&facet=origen&facet=sector&facet=entidad_um&facet=sexo&facet=entidad_nac&facet=entidad_res&facet=municipio_res&facet=tipo_paciente&facet=fecha_ingreso&facet=fecha_def&facet=edad&facet=nacionalidad&facet=embarazo&facet=habla_lengua_indi&facet=diabetes&facet=epoc&facet=asma&facet=inmusupr&facet=hipertension&facet=cardiovascular&facet=obesidad&facet=renal_cronica&facet=tabaquismo&facet=resultado&facet=migrante&facet=pais_nacionalidad&facet=rango_edad'

In [6]:
# def getData(url):
#     """
#     Funcion de obtiene la data de la API REST
#     """
#     try:
#         resp = requests.get(url)
#         if resp.status_code != 200:
#             print("Pagina no encontrada se produjo el error " + str(resp.status_code))
#             return None
#         else:
#             print("El numero de hits(registros) es",resp.json()["nhits"])
#             rdd=sc.parallelize(resp.json()["records"])
#             return rdd
#     except ConnectionError as e:    
#            print("Se produjo un error de conexión revise su url\n" +str(e))

#### Ejercicio 1 
1. La funcion deberá recibir el rdd de arriba y devolver un dataframe, haga las validaciones necesarias con excepciones ya que el rdd puede ser vacío.    
    
    1. Investigue y explique que es una API REST
    2. No deberá incluir ninguna acción, únicamente una transformación no print's.
    3. Deberan comentar todo.

**Respuesta A:** 

Una API REST es un servicio web que sigue una serie de [métodos](https://www.restapitutorial.com/lessons/httpmethods.html) (POST, GET, PUT, PATCH, DELETE) del protocolo HTTP para la transferencia de datos. Normalmente estas API REST devuelven los datos en XML o en JSON.


En la actualidad se prefiere enviar los datos en JSON ya que son archivos más ligeros.


Es un backend que tiene una serie de urls en el que hacemos una serie de peticiones, y ésta nos devuelve los datos que solicitamos. Es una herramienta muy potente en la actualidad para el desarrollo de aplicaciones por el lado de servidores.

In [7]:
# def rddtoDF(rdd,features):
#     """
#     Esta funcion transforma el rdd en un dtaframe conservando los features que recibe como paraetros 
#     :param rdd: un rdd con los datos de la api
#     :param features:  los covariables que deseo conservar
#     :return: df, regresa un dataframe con la información
#     """
#     #Variable temporal con los datos que necesitamos
#     temp = rdd.map(lambda row: row['fields'])
    
#     #Transformamos el RDD en un DataFrame
#     df = temp.toDF()
    
#     #Seleccionamos unicamente las covariables que se desean conservar
#     df = df.select(features)
    
#     #Regresamos el DataFrame resultado
#     return df

In [8]:
# rdd = getData(url)

#### Ejercicio 2 

2. La función deberá recibir el dataframe de arriba y guardar la información en una tabla en HIVE en hdfs, para fines de tener un historico.

    1. El formato de guardado de datos de la tabla Hive debe ser un parquet file.
    2. La tabla debe estar particionada por fecha.
    3. Suponga que el proceso se correrá todos los dias, así que deben contemplar que una vez que corra la    primera vez el proceso, para los dias siguientes únicamente se deberá almacenar los datos de ese día.
    4. El modo es un parametro que indica si desea reescribir los datos o hacer un append, para pruebas use el   modo overwrite.
    5. Cuando su modelo funcione correctamente debera cambiar el número de rows por el número de Hits

In [9]:
# #funcion axiliar por si la necesita
# def recorreVentana(fecha):
#     #insertar codigo
#     return None   

In [10]:
def guardardatosenHive(df,modo,table="covid.casoscovid"):
    #insertar codigo
#     df.write.partitionBy("particion").mode("append").format("parquet").saveAsTable(tableNameResult)
    return None

#### Ejercicio 3
3. Implemente una función que filtre aquellos registros que dieron positivo para sars-covid

In [11]:
def filtraPositivos(df):
    """
    Esta función filtra los casos positivos de un DataFrame dado.
    df: spark DataFrame con la columna 'resultado'
    """
    #Devolvemos nuestro df con los casos positivos
    return df.filter(df.RESULTADO == 'Positivo SARS-CoV-2')

#### Ejercicio 4
4. Implemente una función que filtre los nulls si es que existen. Cuente el número de registros nulos en cada campo.

In [12]:
def limpiaNulls(df):
    """
    Función que limpia nulls en un DF de spark e imprime
    la cantidad de registros que borró (cantidad de valores
    nulos).
    """
    cleanDF = df.na.drop()

    print("Numero de registros con algun valor nulo: ", df.count() - cleanDF.count())
    return cleanDF

In [13]:
# #características físcias de las personas que pueden
# #ayudar a que se hospitalice y las fechas para
# #graficar las series de tiempo
# features = ['inmusupr', 'tipo_paciente', 'cardiovascular','neumonia', 
#             'hipertension', 'otra_com', 'epoc', 'asma', 'tabaquismo',
#             'embarazo', 'sexo', 'obesidad', 'fecha_sintomas',
#             'diabetes', 'edad', 'fecha_ingreso', 'renal_cronica','resultado']

# #transformamos nuestro RDD en un SparkDataFrame
# df = rddtoDF(rdd, features)
# #filtramos los casos positivos
# df = filtraPositivos(df)
# #limpiamos nuestra información
# df = limpiaNulls(df)
# #creamos una vista temporal en spark
# df.createOrReplaceTempView('covid')

In [None]:
#path del archivo csv
file_local_path = '/Users/carlosnieto/Desktop/casos-asociados-a-covid-19.csv'

#cargamos el archivo en un spark Dat~Frame
df_origen = spark.read.csv(file_local_path, header=True, encoding='UTF-8', inferSchema=True)

#covariables que vamos a tomar en cuenta para el modelo
features = ["TIPO PACIENTE", "SEXO", "EDAD", "EMBARAZO", "DIABETES", "EPOC", "ASMA", "INMUNOSUPRESION", "HIPERTENSION",
            "OTRA COMPLICACION", "CARDIOVASCULAR", "OBESIDAD", "RENAL CRONICA", "TABAQUISMO","RESULTADO"]
df = df_origen.select(features)
#filtramos los casos positivos
df = filtraPositivos(df)
#ya no necesitamos la columna resultado
df = df.drop('RESULTADO')
#limpiamos nuestra información
df = limpiaNulls(df)
#creamos una vista temporal en spark
df.createOrReplaceTempView('covid')

#### Ejercicio 5
5. Responda lo siguiente:    

    2. Instale la siguiente biblioteca pyarrow para agilizar el intercambio entre pandas y spark.
    3. ¿Cuál es la media de las edades de las personas contagiadas?. Haga un histograma
    4. ¿Se tienen datos balaceados?. Investigue técnicas para datos desbalanceados.
    5. Haga tablas cruzadas de tipo_paciente con obesidad, diabetes, epoc, etc. Explique lo que observa.
    8. Haga un agrupamiento de casos y decesos por estado y grafique (plotly) las series de tiempo con los 10 estados con más casos.
    
***
    Puede usar Pandas para la generación de la tablas cruzadas.
     Use la sig intrucción para instalar la biblioteca
     conda install -c conda-forge pyarrow
     
***

#### C. Edades de las personas contagiadas

Aquí andalizamos las edades de las personas contagiadas con histogramas y boxplots para entender mejor a que sector de la sociedad por edad se ve mas afectada por esta enfermedad.


En este análisis llegamos a concluir lo siguiente:

* La media de las edades de las personas contagiadas es de: **46.2 años**.


* La mayoría de las personas contagiadas se encuentra entre los 40 y 50 años.


* Tenemos observaciones de hasta 120 años de edad (outliers).


* La edad promedio de un hospitalizado es de: **54.7 años**


* La edad promedio de los casos ambulatorios es de: **41.7 años**


* Las personas hospitalizadas son mayores (en general) que las personas que no fueron hospitalizadas.


> De esto podemos concluir que la edad es una característica física que nos puede ayudar a asignar una probabilidad de ser hospitalizado.

In [None]:
#iniciaizamos nuestras gráficas
f,axes = plt.subplots(figsize=(15,6), ncols=2)
#extraemos las edades en un array de python
edad = df.select("EDAD").toPandas()["EDAD"].values

#primero graficamos el histograma
sns.distplot(edad, ax=axes[0], bins=10, kde = False, norm_hist=True,
             hist_kws={
                 'edgecolor':'black',
                 'color':'steelblue',
                 'linewidth':1
             })
#añadimos una linea vertical señalando el promedio
prom = edad.mean()
axes[0].axvline(prom, 0,1, color='red', linestyle='--')
#anotamos el promedio en el histograma
text = 'Promedio: {:.1f}'.format(prom)
axes[0].annotate(text, xy=(prom+3,0.965), xycoords=('data', 'axes fraction'), color='red')

#graficamos el boxplot
sns.boxplot(edad, orient='v', ax=axes[1], color='lightblue')

#titulos para el histograma y el boxplot
axes[0].title.set_text('Figura 1.0: Histograma de casos de COVID-19 en México por edad')
axes[1].title.set_text('Figura 1.1: Boxplot')

In [None]:
temp = df[df["TIPO PACIENTE"] == 'HOSPITALIZADO'].select("EDAD")
edades = temp.toPandas()["EDAD"].values

#iniciaizamos nuestras gráficas
f,axes = plt.subplots(figsize=(15,6), ncols=2)

#primero graficamos el histograma
sns.distplot(edades, ax=axes[0], bins=10, kde = False, norm_hist=True,
             hist_kws={
                 'edgecolor':'black',
                 'color':'steelblue',
                 'linewidth':1
             })
#añadimos una linea vertical señalando el promedio
prom = edades.mean()
axes[0].axvline(prom, 0,1, color='red', linestyle='--')
#anotamos el promedio en el histograma
text = 'Promedio: {:.1f}'.format(prom)
axes[0].annotate(text, xy=(prom+3,0.965), xycoords=('data', 'axes fraction'), color='red')

#graficamos el boxplot
sns.boxplot(edades, orient='v', ax=axes[1], color='lightblue')

#titulos para el histograma y el boxplot
axes[0].title.set_text('Figura 2.0: HOSPITALIZADOS en México por COVID-19 por edad')
axes[1].title.set_text('Figura 2.1: HOSPITALIZADOS Boxplot')

f.show()

temp = df[df["TIPO PACIENTE"] == 'AMBULATORIO'].select("EDAD")
edades = temp.toPandas()["EDAD"].values

#iniciaizamos nuestras gráficas
g,axes = plt.subplots(figsize=(15,6), ncols=2)


#primero graficamos el histograma
sns.distplot(edades, ax=axes[0], bins=10, kde = False, norm_hist=True,
             hist_kws={
                 'edgecolor':'black',
                 'color':'steelblue',
                 'linewidth':1
             })
#añadimos una linea vertical señalando el promedio
prom = edades.mean()
axes[0].axvline(prom, 0,1, color='red', linestyle='--')
#anotamos el promedio en el histograma
text = 'Promedio: {:.1f}'.format(prom)
axes[0].annotate(text, xy=(prom+3,0.965), xycoords=('data', 'axes fraction'), color='red')

#graficamos el boxplot
sns.boxplot(edades, orient='v', ax=axes[1], color='lightblue')

#titulos para el histograma y el boxplot
axes[0].title.set_text('Figura 2.3: Casos AMBULATORIOS en México por COVID-19 por edad')
axes[1].title.set_text('Figura 2.4: Casos AMBULATORIOS Boxplot')

g.show()

#### D. ¿Se tienen datos balanceados?

Haciendo un **``groupby``** sobre la columna **``TIPO PACIENTE``** (la cual es nuestra variable objetivo en este caso) podemos observar que hay más casos ambulatorios que casos de hospitalización en México aunque la diferencia no es mucha, puede causar problemas al momento de ajustar el modelo. 

Algo bueno es que contamos con un método que se llama **"Class Weighting"**, en la regresión logística de spark, que nos sirve para poder darle más peso a la clase minoritaria, y así, hacer que el modelo no se vea afectado por el desbalance en los datos.

Este método consiste en sacar el **``BalancingRatio``** de nuestros datos, el cual vamos a definir como:

$$BalancingRatio = \frac{numNegatives}{DatasetSize}$$

In [None]:
df.groupby("TIPO PACIENTE").count().show()

#### E. Tablas cruzadas
Haremos tablas y gráficas cruzadas para poder observar qué tanto influyen las covariables sobre nuestra variable objetivo (**``TIPO PACIENTE``**).


Para esto primero haremos un **Pandas DataFrame** para poder manipular los datos en python.

In [None]:
#dfp = spark.sql('select * from covid').toPandas()

In [None]:
#En esta celda hacemos todas las tablas cruzadas con nuestras covariables y la variable respuesta
#excepto con la edad porque esa ya la analizamos
cols = list(df.columns)
cols.remove("TIPO PACIENTE")
cols.remove("EDAD")

for col in cols:
    print(col,":")
    df.groupby(col).pivot("TIPO PACIENTE").count().show()
    print("---------------------------------------------------")

#### Análisis de covariables

En este análisis haremos gráficas cruzadas para poder observar qué covariables de padecimientos o condiciones físicas podemos tomar en cuenta para predecir si una persona con COVID-19 puede ser hospitalizada o no.


También vamos a analizar la tasa de hospitalización por cada padecimiento físico, por ejemplo: de las personas que dieron positivo a COVID-19 que tienen diabetes, ¿qué porcentaje de éstas fueron hospitalizadas?. Calcularemos los valores de la siguiente manera:


$$\text{Tasa de hospitalizacion} = \frac{\text{No. de personas con diabetes hospitalizadas}}{\text{No. total de personas con diabetes}} = \frac{n}{N}$$


In [None]:
def graficaPorTipoPaciente(data, feature):
    """
    Función que hace una gráfica para analisar una característica dada
    contra el tipo de paciente (HOSPITALIZADO, AMBULATORIO)
    - feature: característica a analizar
    - data: pandas DataFrame de donde sale la información
    Devuelve una figura de matplotlib
    """
    #inicializamos la gráfica y graficamos con seaborn
    g,ax = plt.subplots()
    sns.countplot(data=data, x=feature, hue='TIPO PACIENTE', ax=ax)
    
    #totales por valor de la característica dada
    totales = [data[data[feature]==val][feature].count().sum() for val in data[feature].unique()]
    #duplicamos la lista para anotar los % de hospitalizados y ambulatorios
    #para cada valor de la característica dada
    totales = totales * 2
    
    #anotamos los % mencionados arriba
    for patch,i in zip(ax.patches,range(len(ax.patches))):
        x = patch.get_x()
        y = patch.get_height()
        if totales[i] != 0: #checamos que el total no sea cero
            text = '{}%'.format(round(100*y/totales[i],1))
        else:
            text = '0%'
        ax.annotate(text, xy=(x+0.05,y), color='black')
    
    return g,ax

In [None]:
dfp = spark.sql('select * from covid').toPandas()
cols = dfp.columns.to_list()
cols.remove('TIPO PACIENTE')
cols.remove('EDAD')
cols.remove('SEXO')
porcentajes = []

for feature in cols:
    #numero total de casos que tienen caracteristica i
    N = dfp[dfp[feature]=='SI'].count().sum()
    #numero de hospitalizados dentro de los que tienen la característica i
    n = dfp[(dfp[feature]=='SI') & (dfp['TIPO PACIENTE']=='HOSPITALIZADO')].count().sum()
    #porcentaje de hospitalizados de la característica i
    porc = n/N
    #almacenamos el porcentaje
    porcentajes.append(porc)

print("Tabla de tasas de hospitalización por padecimiento")
tasas_df = pd.DataFrame({'Padecimiento':cols, 'Tasa':porcentajes})
tasas_df = tasas_df.sort_values(by='Tasa', ascending=False, ignore_index=True)
tasas_df

In [None]:
padecimientos = tasas_df.Padecimiento.values

for col in padecimientos:
    graficaPorTipoPaciente(data=dfp, feature=col)

#### F. Haga un agrupamiento de casos y decesos por estado y grafique (plotly) las series de tiempo con los 10 estados con más casos.

Para hacer esto, necesitaremos un DataFrame con las siguientes covariables:

* Entidad de residencia
* Fecha de inicio de síntomas
* Fecha de defunción
* Resultado (para filtrar solamente los positivos)

In [None]:
#dataframe de spark con solo la información que necesitamos para graficar
dff = df_origen.select('ENTIDAD RESIDENCIA', 'FECHA SINTOMAS', 'FECHA DEFUNCION', 'RESULTADO')
dff = filtraPositivos(dff)

Luego vamos a necesitar 3 funciones diferentes que nos van a ayudar a graficar la información en plotly

In [None]:
def filter_spark_dataframe_by_list(df, column_name, filter_list):
    """Regresa un subconjunto del df donde df[column_name] esta en filter_list"""
    sparky = SparkSession.builder.getOrCreate()
    filter_df = sparky.createDataFrame(filter_list, df.schema[column_name].dataType)
    return df.join(filter_df, df[column_name] == filter_df["value"])

In [None]:
def getTopCrossTableByDate(df,states_col,date_col,top=10):
    """
    Esta funcion genera la serie de tiempo del top por estado de residencia.
    
    :param df: dataframe de donde salen los datos
    :param states_col: nombre de la columna donde se encuentran los estados
    :param date_col: nombre de la columna donde se encuentran las fechas
    :param top: señala el # del top que queremos.
    :returns: Pandas DataFrame con la serie de tiempo del toplist
    """ 
    #sacamos el # de casos por estado (top solamente)
    top_df = df.groupby(states_col).count().orderBy('count', ascending=False).limit(top)
    #lista de los top estados
    top_estados = list(top_df.select(states_col).toPandas()[states_col])
    #filtramos el df de la info solamente de los top estados
    filtered_df = filter_spark_dataframe_by_list(df, states_col, top_estados)
    #hacemos la tabla cruzada de fechas (indices) vs estados (columnas)
    timeSeries = filtered_df.crosstab(date_col, states_col).toPandas()
    #nueva columna que se genera sola
    new_col = date_col + "_" + states_col
    #borramos los nulls (para el caso de las muertes nos genera nulls)
    timeSeries = timeSeries[timeSeries[new_col]!='null']
    #la columna que contiene las fechas la convertimos en fecha de python
    timeSeries[new_col] = pd.to_datetime(timeSeries[new_col])
    #ordenamos por fecha más antigua a la más reciente
    timeSeries = timeSeries.sort_values(by=new_col, ascending=True, ignore_index=True)
    #colocamos a la columna de las fechas en el index
    timeSeries = timeSeries.set_index(new_col)
    return timeSeries


In [None]:
def graficaSeriesDeTiempo(dataframe, titulo):
    """
    Esta función grafica las series de tiempo de los estados (columnas) de
    un dataframe donde las fechas vienen en el index del dataframe.
    :param dataframe: pandas dataframe con la información de las series de tiempo por estado.
    :param titulo: string que contiene el título del gráfico."""
    fig = go.Figure()
    
    #las columnas del dataframe deben de ser los estados que vamos a graficar
    for estado in dataframe.columns:
        trace = go.Scatter(
            x = dataframe.index, # fechas
            y = dataframe[estado].values, # serie de tiempo de cada estado
            name = estado,
            opacity = 0.8
        )
        fig.add_trace(trace)

    layout = dict(title=titulo)
    fig.layout = layout
    fig.show()

In [None]:
casos_positivos_por_estado = getTopCrossTableByDate(dff,"ENTIDAD RESIDENCIA", "FECHA SINTOMAS")
graficaSeriesDeTiempo(casos_positivos_por_estado.cumsum(),
                      titulo="Casos positivos acumulados de los 10 estados con más casos en México")

In [None]:
muertes_por_estado = getTopCrossTableByDate(dff,"ENTIDAD RESIDENCIA", "FECHA DEFUNCION")
graficaSeriesDeTiempo(muertes_por_estado.cumsum(),
                      titulo="Muertes acumuladas de los 10 estados con más defunciones en México")

#### Ejercicio 6
6. Implemente la sig función para el preprocesamiento, deberá realizar lo siguiente:
    1. Codifique sus varibles catégoricas a tipo númerico.
    2. Aplique oneHotEnconder(codificación parcial) a sus variables, explique cual es la raazón de hacerlo.
    3. Guarde en un vector sparse su salida.

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql.functions import when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import  StringIndexer, VectorAssembler, MinMaxScaler, ChiSqSelector, OneHotEncoder

In [None]:
def dataProcessing(train, categoricalCols, numericalCols, labelCol="TIPO PACIENTE"):
    """Función que hace todo el preprocesamiento de los datos
    categóricos de un conjunto de datos de entrenamiento (o no).
    :param train spark df: conjunto de datos de entrenamiento.
    :param categoricalCols list,array: conjunto de nombres de columnas categoricas del
        conjunto de datos train.
    :param numericalCols list,array: conjunto de nombres de columnas numéricas del 
        conjunto de datos train.
    :param labelCol str: variable objetivo o etiqueta
    
    :Returns spark dataframe con las columnas 'label' y 'features'
    """
    
    #codificamos todas las variables categóricas
    stages = []
    for categoricalCol in categoricalCols:
        stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol= categoricalCol + "Index")
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                        outputCols=[categoricalCol + "ohe"])
        stages += [stringIndexer, encoder]

    #variable objetivo (etiqueta)
    label_strIdx = StringIndexer(inputCol=labelCol, outputCol="label")
    stages += [label_strIdx]

    #ponemos todas las covariables en un vector
    assemblerInputs = [c + "ohe" for c in categoricalCols] + numericalCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feat")
    stages += [assembler]
    
    #escala de 0-1
    scala = MinMaxScaler(inputCol="feat", outputCol="features")
    stages += [scala]
    
    #pipeline donde vamos a hacer todo el proceso
    pipe = Pipeline(stages=stages)
    pipeModel = pipe.fit(train)
    train = pipeModel.transform(train)
    
    #regresamos nuestro df con lo que necesitamos
    return train

#### Ejercicio 7
6. Implemente un modelo Logístico.
    1. Aplique un Hold-out a sus datos con un 70-30 para validación y entrenamiento. 
    2. Que coeficientes obtuvo.
    3. Con la muestra de validación, obtenga el accuracy.
    4. Deberá devolver un dataframe con las probalidad P(Y=1) y la predicción.
    5. Obtenga la curva ROC

In [None]:
def modeloLogistico(data, labelCol="label", featuresCol="features", weightCol="classWeights"):
    """
    Función que se encarga de ajustar un modelo logístico
    a partir de un dataframe de spark con el esquema ya procesado
    a partir de la función dataProcessing().
    
    :param data: spark dataframe.
    :param labelCol: string nombre de la columna con la variable respuesta.
    :param featuresCol: string nombre de la columna con los vectores de las
        covariables.
    
    :returns modelo ajustado:
    """
    from pyspark.ml.classification import LogisticRegression
    
    model = LogisticRegression(featuresCol=featuresCol, labelCol=labelCol, weightCol=weightCol)
    return model.fit(data)

In [None]:
def predictLogistico(test, model):
    """
    Esta función predice un modelo logístico con columnas categóricas
    y numéricas sobre un conjunto de datos de prueba.
    """
    #predecimos el modelo
    predictions = model.transform(test)
    #regresamos el df con las predicciones
    return predictions

### Ajuste del modelo

In [None]:
#Primero definimos nuestras columnas categóricas, numéricas y nuestra variable objetivo
categoricalCols = ['SEXO','EMBARAZO','DIABETES','EPOC','ASMA','INMUNOSUPRESION','HIPERTENSION','OTRA COMPLICACION',
                   'CARDIOVASCULAR','OBESIDAD','RENAL CRONICA','TABAQUISMO']
numericalCols = ["EDAD"]
labelCol = "TIPO PACIENTE"

#Procesamos nuestros datos con la función dataProcessing()
raw_data = dataProcessing(df, categoricalCols, numericalCols, labelCol)

In [None]:
#conjunto de entrenamiento y de prueba
train, test = raw_data.randomSplit([0.70,0.30])

#obtenemos el balancing ratio
numHosp = train.filter(train["TIPO PACIENTE"]=="HOSPITALIZADO").count()
numAmb = train.filter(train["TIPO PACIENTE"]=="AMBULATORIO").count()
BalancingRatio = numAmb / (numHosp + numAmb)
print("Balancing Ratio: ", BalancingRatio)

#agregamos una columna con el BalancingRatio respectivo para cada label
train=train.withColumn("classWeights", when(train.label == 1,BalancingRatio).otherwise(1-BalancingRatio))

#modelo de regresión logística
model = modeloLogistico(data=train, labelCol="label", featuresCol="features", weightCol="classWeights")
#imprimimos los coeficientes
print("Coeficientes: ",str(model.coefficientMatrix))
print("Intercepto: ", str(model.interceptVector))

#predicciones con el conjunto de prueba
predictions = predictLogistico(test, model)

### Evaluación del modelo

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
modelSummary = model.summary

roc = modelSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set areaUnderROC: ' + str(modelSummary.areaUnderROC))
#AUROC del test set
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
pr = modelSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
#Matriz de confusión en las predicciones del test set
predictions.crosstab("label","prediction").show()

In [None]:
#otras métricas de evaluación
print("clases:", modelSummary.labels)
print("f-measure",modelSummary.fMeasureByLabel(beta=1.0))
print("false-positive rate by label:", modelSummary.falsePositiveRateByLabel)
print("Precisión: ", modelSummary.precisionByLabel)
print("Exhaustividad (recall): ", modelSummary.recallByLabel)

#### Ejercicio 8
6. Pruebas de hipotesis.
    1. Aplique la prueba de la $\chi^2$ para la selección de las variables relevantes en su modelo. 
    2. Obtenga la probablidad de riesgo de un sujeto de prueba. Deberá crear un dataframe de spark con el Esquema bien definido.
    3. Crear un job que pueda ser lanzado mediante spark-submit.
    4. Persista el modelo.



In [None]:
def pruebaChi(dataframe, categoricalCols, numericalCols, labelCol="TIPO PACIENTE"):
    """
    Función que procesa los datos pero añadiéndo la Prueba Chi para seleccionar variables.
    """
    
    #codificamos todas las variables categóricas
    stages = []
    for categoricalCol in categoricalCols:
        stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol= categoricalCol + "Index")
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                        outputCols=[categoricalCol + "ohe"])
        stages += [stringIndexer, encoder]

    #variable objetivo (etiqueta)
    label_strIdx = StringIndexer(inputCol=labelCol, outputCol="label")
    stages += [label_strIdx]

    #ponemos todas las covariables en un vector
    assemblerInputs = [c + "ohe" for c in categoricalCols] + numericalCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feat")
    stages += [assembler]
    
    #seleccionamos las variables que nos sirven con ChiSqSelector
    selector = ChiSqSelector(featuresCol="feat", outputCol="feature", labelCol="label", fpr=0.05,
                            selectorType='fpr')
    stages += [selector]
    
    #escala de 0-1
    scala = MinMaxScaler(inputCol="feature", outputCol="features")
    stages += [scala]
    
    #pipeline donde vamos a hacer todo el proceso
    pipe = Pipeline(stages=stages)
    pipeModel = pipe.fit(dataframe)
    df = pipeModel.transform(dataframe)
    
    #regresamos nuestro df con lo que necesitamos
    return df

### Prueba Chi

Ahora haremos el modelo pero sobre los datos donde corremos la prueba chi para selección de variables. Luego compararemos la diferencia entre el modelo sin y con la prueba Chi.

In [None]:
raw_data2 = pruebaChi(df, categoricalCols, numericalCols, labelCol="TIPO PACIENTE")

In [None]:
#conjunto de entrenamiento y de prueba
train2, test2 = raw_data2.randomSplit([0.70,0.30])

#obtenemos el balancing ratio
numHosp = train2.filter(train["TIPO PACIENTE"]=="HOSPITALIZADO").count()
numAmb = train2.filter(train["TIPO PACIENTE"]=="AMBULATORIO").count()
BalancingRatio = numAmb / (numHosp + numAmb)
print("Balancing Ratio: ", BalancingRatio)

#agregamos una columna con el BalancingRatio respectivo para cada label
train2=train2.withColumn("classWeights", when(train2.label == 1,BalancingRatio).otherwise(1-BalancingRatio))

#modelo de regresión logística
model2 = modeloLogistico(data=train2, labelCol="label", featuresCol="features", weightCol="classWeights")
#imprimimos los coeficientes
print("Coeficientes: ",str(model2.coefficientMatrix))
print("Intercepto: ", str(model2.interceptVector))

#predicciones con el conjunto de prueba
predictions = predictLogistico(test2, model2)

In [None]:
modelSummary2 = model2.summary

roc2 = modelSummary2.roc.toPandas()
plt.plot(roc2['FPR'],roc2['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set areaUnderROC: ' + str(modelSummary2.areaUnderROC))
#AUROC del test set
evaluator2 = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator2.evaluate(predictions))

Podemos ver que las AUROC de ambos modelos no cambia mucho, por lo que la prueba Chi en esta caso no "mejora" el modelo, sino lo deja igual.

In [None]:
def probSujetodeprueba(df):
    #insertar codigo
    return probabilidad

In [None]:
def getModeloPersistente(modelo):
    #insertar codigo
    return modeloserializado