In [90]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler,MinMaxScaler,ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
from pyspark.sql.types import DoubleType



In [23]:
try:
    from pyspark.sql import SparkSession
except:
    import findspark
    findspark.init()
    from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("COVID") \
        .config("hive.exec.dynamic.partition", "true")\
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport()\
        .getOrCreate()    


In [105]:
def dataframe(rdd,features):
    df=spark.read.csv("D:\Descargas\casos-asociados-a-covid-19.csv",sep=",",header=True,encoding="UTF-8",inferSchema=True)
    df=df.filter(df["RESULTADO"].isin("Positivo SARS-CoV-2")) 
    df=df.select(features)
    df = df.withColumn("EDAD", df["EDAD"].cast(DoubleType()))
    
    diz={"NO APLICA":"NO"}
    f="SE IGNORA"
    df=df.filter(~df["EMBARAZO"].isin(f) & ~df["DIABETES"].isin(f) & ~df["EPOC"].isin(f) 
                 & ~df["ASMA"].isin(f) & ~df["INMUNOSUPRESION"].isin(f) & ~df["HIPERTENSION"].isin(f) 
                 & ~df["CARDIOVASCULAR"].isin(f) & ~df["OBESIDAD"].isin(f) & ~df["RENAL CRONICA"].isin(f) 
                 & ~df["TABAQUISMO"].isin(f) & ~df["EDAD"].isin(0))
    print("Se ha generado el siguiente dataframe")
    print(df.show(2))
    print("Se tiene el siguiente numero de registros")
    print(df.count())
    #df=pd.DataFrame(information)
    
    return df.select(features).replace(diz,1,"EMBARAZO")

features=["SEXO","EDAD","EMBARAZO","DIABETES","EPOC",
          "ASMA","INMUNOSUPRESION","HIPERTENSION","CARDIOVASCULAR","OBESIDAD","RENAL CRONICA",
          "TABAQUISMO","TIPO PACIENTE"]
df=dataframe(1,features)

Se ha generado el siguiente dataframe
+-----+----+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+-------------+
| SEXO|EDAD|EMBARAZO|DIABETES|EPOC|ASMA|INMUNOSUPRESION|HIPERTENSION|CARDIOVASCULAR|OBESIDAD|RENAL CRONICA|TABAQUISMO|TIPO PACIENTE|
+-----+----+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+-------------+
|MUJER|64.0|      NO|      SI|  NO|  NO|             NO|          SI|            NO|      NO|           NO|        NO|  AMBULATORIO|
|MUJER|52.0|      NO|      SI|  NO|  NO|             NO|          NO|            NO|      NO|           NO|        NO|HOSPITALIZADO|
+-----+----+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+-------------+
only showing top 2 rows

None
Se tiene el siguiente numero de registros
104307


In [123]:
train,test= df.randomSplit([0.70,0.30])

In [106]:
def pruebachi(df):
    features2=["SEXO","EDAD","EMBARAZO","DIABETES","EPOC",
          "ASMA","INMUNOSUPRESION","HIPERTENSION","CARDIOVASCULAR","OBESIDAD","RENAL CRONICA",
          "TABAQUISMO","NEUMONIA"]
    stages=[]
    for fea in features2:
        string_Indexer =StringIndexer(inputCol=fea,outputCol="{}_Index".format(fea))
        stages+= [string_Indexer]
    paciente_Index=StringIndexer(inputCol="TIPO PACIENTE",outputCol="label")
    stages += [paciente_Index]
    ai=[c+"_Index" for c in features2]+["EDAD"]
    a=VectorAssembler(inputCols=ai,outputCol="features")
    
    css=ChiSqSelector(featuresCol="features",outputCol="salidaf",labelCol="label",fpr=0.05)
    stages+=[a,css]
    pipe=Pipeline().setStages(stages)
    Result=pipe.fit(df).transform(df)
    return Result 

Result=pruebachi(train)


In [117]:
#Result.select("features","salidaf","features").show(truncate=False)


+-------------+-----+
|TIPO PACIENTE|count|
+-------------+-----+
|  AMBULATORIO|35721|
|HOSPITALIZADO|35499|
+-------------+-----+



In [124]:
def dataProcessing(df):
    features2=["SEXO","EMBARAZO","DIABETES","EPOC",
          "ASMA","INMUNOSUPRESION","HIPERTENSION","CARDIOVASCULAR","OBESIDAD","RENAL CRONICA",
          "TABAQUISMO"]
    
    stages=[]
    for fea in features2:
        string_Indexer =StringIndexer(inputCol=fea,outputCol="{}_Index".format(fea))
        ohe=OneHotEncoderEstimator(inputCols=[string_Indexer.getOutputCol()],outputCols=["{}_ohe".format(fea)])
        stages+= [string_Indexer,ohe]
    paciente_Index=StringIndexer(inputCol="TIPO PACIENTE",outputCol="label")
    stages += [paciente_Index]
    ai=[c+"_ohe" for c in features2]+["EDAD"]#["asma_ohe","edad"]
    asu=VectorAssembler(inputCols=ai,outputCol="features_vec")
    stages+=[asu]
    escalado=MinMaxScaler(inputCol="features_vec",outputCol="features")
    stages+=[escalado]
    mlr=LogisticRegression(labelCol="label",featuresCol="features")
    stages+=[mlr]
    pipe=Pipeline(stages=stages)
    Model=pipe.fit(df)
    print("Coeficientes: " + str(Model.stages[-1].coefficientMatrix))
    print("intercepto: "+ str(Model.stages[-1].interceptVector))
    return Model 

mod_train=dataProcessing(train)

Coeficientes: DenseMatrix([[ 0.53466211, -1.35759011, -0.66195146, -0.32543677,  0.2849297 ,
              -0.64444482, -0.19855877, -0.00614032, -0.23973089, -0.89166918,
               0.08451465,  5.96085672]])
intercepto: [0.3667877566693125]


In [125]:
def metricas(prediccion):
    score=MulticlassClassificationEvaluator(
    predictionCol="prediction",
    labelCol="label",
    metricName="accuracy")
    evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label")
    print("Elaccuracy del modelo es:")
    print(score.evaluate(prediccion))
    print("La curva ROC del modelo es:")
    print(evaluator.evaluate(prediccion))

In [126]:
predic=mod_train.transform(train)
metricas(predic)

Elaccuracy del modelo es:
0.7258760478219046
La curva ROC del modelo es:
0.7651197274170215


In [137]:
from pyspark.sql.types import *

def structDataframe(data):
    features2=["SEXO","EMBARAZO","DIABETES","EPOC",
          "ASMA","INMUNOSUPRESION","HIPERTENSION","CARDIOVASCULAR","OBESIDAD","RENAL CRONICA",
          "TABAQUISMO"]
    schema= StructType(
    [
        StructField("SEXO",StringType(),True),
        StructField("EMBARAZO",StringType(),True),
        StructField("DIABETES",StringType(),True),
        StructField("EPOC",StringType(),True),
        StructField("ASMA",StringType(),True),
        StructField("INMUNOSUPRESION",StringType(),True),
        StructField("HIPERTENSION",StringType(),True),
        StructField("CARDIOVASCULAR",StringType(),True),
        StructField("OBESIDAD",StringType(),True),
        StructField("RENAL CRONICA",StringType(),True),
        StructField("TABAQUISMO",StringType(),True),
        StructField("EDAD",DoubleType(),True)
    ]
    )
    df=spark.createDataFrame(data,schema)
    return df
data=[["HOMBRE","NO","NO","NO","NO","NO","NO","NO","SI","NO","NO",30.0],["HOMBRE","NO","NO","NO","NO","NO","NO","NO","SI","NO","NO",70.0],
      ["HOMBRE","NO","NO","NO","NO","NO","NO","NO","NO","NO","NO",70.0],["HOMBRE","NO","SI","NO","NO","NO","NO","NO","SI","NO","NO",70.0]]
df2=structDataframe(data)
df2.show()

+------+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+----+
|  SEXO|EMBARAZO|DIABETES|EPOC|ASMA|INMUNOSUPRESION|HIPERTENSION|CARDIOVASCULAR|OBESIDAD|RENAL CRONICA|TABAQUISMO|EDAD|
+------+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+----+
|HOMBRE|      NO|      NO|  NO|  NO|             NO|          NO|            NO|      SI|           NO|        NO|30.0|
|HOMBRE|      NO|      NO|  NO|  NO|             NO|          NO|            NO|      SI|           NO|        NO|70.0|
|HOMBRE|      NO|      NO|  NO|  NO|             NO|          NO|            NO|      NO|           NO|        NO|70.0|
|HOMBRE|      NO|      SI|  NO|  NO|             NO|          NO|            NO|      SI|           NO|        NO|70.0|
+------+--------+--------+----+----+---------------+------------+--------------+--------+-------------+----------+----+



In [140]:
result=mod_train.transform(df2)

In [142]:
result.select("features").show(truncate=False)

+-----------------------------------------------------------------+
|features                                                         |
+-----------------------------------------------------------------+
|[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,0.24369747899159663]|
|[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,0.5798319327731093] |
|[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.5798319327731093] |
|[1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,0.5798319327731093] |
+-----------------------------------------------------------------+



In [143]:
result.select("probability").show(truncate=False)

+----------------------------------------+
|probability                             |
+----------------------------------------+
|[0.7961253732094232,0.2038746267905768] |
|[0.3449306169809247,0.6550693830190752] |
|[0.4009118950773944,0.5990881049226056] |
|[0.21360187041246081,0.7863981295875392]|
+----------------------------------------+

