In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, MinMaxScaler, ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *

In [2]:
try:
  from pyspark.sql import SparkSession
except:
  import findspark
  findspark.init()
  from pyspark.sql import SparkSession

  spark = SparkSession.builder.appName("COVID") \
          .config("hive.exec.dynamic.partition", "true") \
          .config("hive.exec.dynamic.partition.mode", "nonstrict")\
          .enableHiveSupport()\
          .getOrCreate()

In [3]:
def generaDataset(fileloc, columnas):
  """
  fileloc - string que da la dirección del csv con nuestros datos
  columnas - lista con las columnas que queremos utilizar en el dataframe
  
  Devuelve un dataframe a partir del csv en fileloc con las columnas requeridas
  """
  df = spark.read.csv(fileloc, sep = ",", header = True, encoding = "UTF-8", inferSchema = True)
  df = dataClean(df, columnas)
  print("Se generó el dataframe:")
  print(df.show(5))
  print("con %d registros" % df.count())
  return df

def dataClean(df, columnas):
  """
  Filtra el dataframe df  por positivos a Covid
  """
  df = df.filter(df["RESULTADO"] == "Positivo SARS-CoV-2")
  df = df.select(columnas)
  return df

In [4]:
#columnas a utilizar
columnas = ["SEXO", "OBESIDAD", "DIABETES", "EPOC", "ASMA", "CARDIOVASCULAR", "RENAL CRONICA", "TABAQUISMO", "TIPO PACIENTE", "EDAD"]
file = "/FileStore/tables/casos_asociados_a_covid_19-4388c.csv"

df = generaDataset(file, columnas)
train, test = df.randomSplit([0.70, 0.30])

In [5]:
df.groupBy("TIPO PACIENTE").count().show()

In [6]:
def pruebaChiSquare(data, numCols, catCols, target):
  """
  Realiza la prueba de chi^2 en el dataset provisto después de un poco de procesamiento 
  Regresa el dataset procesado y con sólo con las variables relevantes, a partir de la prueba de chi^2
  
  data -- dataframe a utilizar
  numCols -- lista con las features numéricas del dataset
  catCols -- lista con las features categóricas del dataset
  target -- string con el nombre la variable objetivo a utilizar para la prueba
  """
  stages = []
  for categoricalCol in catCols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + "index")
    stages += [stringIndexer]
  labelStringIndexer = StringIndexer(inputCol = target, outputCol = "label")
  stages += [labelStringIndexer]
  assemblerInputs = [c + "index" for c in catCols] + numCols
  assembler = VectorAssembler(inputCols = assemblerInputs, outputs = "features")
  css = ChiSqSelector(features = 'features', outputCol = "salidaf", labelCol = 'label', fpr = 0.05)
  stages += [assembler, css]
  pipe = Pipeline().setStages(stages)
  result = pipe.fit(data).transform(data)
  return result

In [7]:
def entrenamiento(data, catCols, numCols, target):
  """
  Crea un pipeline donde las variables categóricas se procesan usando one-hot-encoding
  pasa la data transformada al modelo logístico y regresa el modelo ajustado.
  
  data -- el dataframe a utilizar
  catCols -- lista con las columnas categóricas de las features a usar
  numCols -- lista con las columnas numéricas de las features
  target -- la variable que deseamos modelar con la regresión logística
  """
  stages = []
  for categoricalCol in catCols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + "index")
    stages += [stringIndexer]
    ohe = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()], outputCols = [categoricalCol + "ohe"])
    stages += [ohe]
  label_stringIdx = StringIndexer(inputCol = target, outputCol = "label")
  stages += [label_stringIdx]
  assemblerInputs = [c+"ohe" for c in catCols] + numCols
  assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "featuresVec")
  minmax = MinMaxScaler(inputCol = "featuresVec", outputCol = "features")
  lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 10)
  stages += [assembler, minmax, lr]
  pipe = Pipeline().setStages(stages)
  model = pipe.fit(data)
  
  print("Coeficientes: " + str(model.stages[-1].coefficientMatrix))
  print("Intercepto: " + str(model.stages[-1].interceptVector))
  return model

In [8]:
def metricas(prediccion):
  """
  Regresa el accuracy y la curva ROC del modelo 
  
  prediccion -- dataframe con el dataset de test
  """
  score = MulticlassClassificationEvaluator(
  predictionCol = "prediction",
  labelCol = "label",
  metricName = "accuracy")
  
  evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", labelCol = "label")
  print("El accuracy del modelo es:")
  print(score.evaluate(prediccion))
  print("La curva ROC del modelo es:")
  print(evaluator.evaluate(prediccion))

In [9]:
numericalColumns = columnas[-1:]
target = columnas[-2]
categoricalColumns = columnas[:-2]
modelo = entrenamiento(train, categoricalColumns, numericalColumns, target)

In [10]:
prediccion = modelo.transform(train)
metricas(prediccion)

In [11]:
def structDataframe(data):
  """
  Dada una lista con entradas de datos, regresa un dataframe con dichos datos y el esquema de abajo
  data -- lista con listas, cada lista tiene datos de EDAD, SEXO, etc...
  """
  schema = StructType([
    StructField("EDAD", IntegerType(), True),
    StructField("SEXO", StringType(), True),
    StructField("OBESIDAD", StringType(), True),
    StructField("DIABETES", StringType(), True),
    StructField("EPOC", StringType(), True),
    StructField("ASMA", StringType(), True),
    StructField("CARDIOVASCULAR", StringType(), True),
    StructField("TABAQUISMO", StringType(), True),
    StructField("RENAL CRONICA", StringType(), True)
  ])
  
  df = spark.createDataFrame(data, schema)
  return df

In [12]:
data = [[25, "HOMBRE", "NO", "SI", "SI", "NO", "NO", "NO", "SI"], [70, "MUJER", "NO", "SI", "NO", "NO", "NO", "NO", "NO"]]
df2 = structDataframe(data)
result = modelo.transform(df2)