In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=0586a6c2bbe2a6be1599bbf604426b6e7ec09297bd18ffc09799d5a67d3c8763
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [21]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col

In [10]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import IntegerType
# Importar librerias
from pyspark.sql.functions import udf
# Crear una configuración de Spark
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")

sc = SparkContext(conf=conf)

from pyspark.sql import SparkSession
spark = SparkSession(sc)



In [11]:
df = spark.read.csv("heart.dat", header=False, sep=' ', inferSchema=True)

# set column names
schema = ['Años','Sexo','Tipo_dolor_toracico','Presion_arterial_reposo','Colesterol_serico','Azucar_sangre_ayunas',
            'Resultados_electrocardiográficos_reposo', 'Frecuencia_cardíaca_máxima', 'Angina_inducida_ejercicio',
            'Oldpeak', 'Pendiente_segmento ST','Número_vasos_principales','Thal','Target']
for i in range(len(schema)):
    col = '_c'+str(i) # nombre de la columna
    #df = df.withColumn(col, df[col].cast(FloatType())) # convertir a FloatType
    df = df.withColumnRenamed(col, schema[i]) # renombrar columnas

# drop column Target
df = df.drop('Target')



# Ver el esquema de los datos
df.printSchema()

# Ver los datos
df.show()

root
 |-- Años: double (nullable = true)
 |-- Sexo: double (nullable = true)
 |-- Tipo_dolor_toracico: double (nullable = true)
 |-- Presion_arterial_reposo: double (nullable = true)
 |-- Colesterol_serico: double (nullable = true)
 |-- Azucar_sangre_ayunas: double (nullable = true)
 |-- Resultados_electrocardiográficos_reposo: double (nullable = true)
 |-- Frecuencia_cardíaca_máxima: double (nullable = true)
 |-- Angina_inducida_ejercicio: double (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- Pendiente_segmento ST: double (nullable = true)
 |-- Número_vasos_principales: double (nullable = true)
 |-- Thal: double (nullable = true)

+----+----+-------------------+-----------------------+-----------------+--------------------+---------------------------------------+--------------------------+-------------------------+-------+---------------------+------------------------+----+
|Años|Sexo|Tipo_dolor_toracico|Presion_arterial_reposo|Colesterol_serico|Azucar_sangre_ayunas|Res

In [12]:


# Crear la funcion
def ENFERMO(x):
    if x in(3, 6): # 3 = normal; 6 = problema solucionado
        return 0
    else:
        return 1

# Crear la funcion UDf
udfENFERMO = udf(ENFERMO, IntegerType())

# Aplicar la funcion UDF a la columna target usando el valor de that
df = df.withColumn("Target", udfENFERMO("Thal") )

# Ver los datos
df.show(5)


+----+----+-------------------+-----------------------+-----------------+--------------------+---------------------------------------+--------------------------+-------------------------+-------+---------------------+------------------------+----+------+
|Años|Sexo|Tipo_dolor_toracico|Presion_arterial_reposo|Colesterol_serico|Azucar_sangre_ayunas|Resultados_electrocardiográficos_reposo|Frecuencia_cardíaca_máxima|Angina_inducida_ejercicio|Oldpeak|Pendiente_segmento ST|Número_vasos_principales|Thal|Target|
+----+----+-------------------+-----------------------+-----------------+--------------------+---------------------------------------+--------------------------+-------------------------+-------+---------------------+------------------------+----+------+
|70.0| 1.0|                4.0|                  130.0|            322.0|                 0.0|                                    2.0|                     109.0|                      0.0|    2.4|                  2.0|                  

In [16]:
# set columns except the col Target
columns = df.columns
columns.remove('Target')


# Crear el vector assembler
assembler = VectorAssembler(inputCols=columns, outputCol='features')

# Transformar los datos
raw_data = assembler.transform(df).select("features", "Target")

# Ver los datos
raw_data.show(5)


+--------------------+------+
|            features|Target|
+--------------------+------+
|[70.0,1.0,4.0,130...|     0|
|[67.0,0.0,3.0,115...|     1|
|[57.0,1.0,2.0,124...|     1|
|[64.0,1.0,4.0,128...|     1|
|[74.0,0.0,2.0,120...|     0|
+--------------------+------+
only showing top 5 rows



In [17]:
# Crear el objeto StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

# Calcular la media y la desviacion estandar
scalerModel = scaler.fit(raw_data)

# Normalizar los datos
scaledData = scalerModel.transform(raw_data)


In [18]:
scaledData.show(5)

+--------------------+------+--------------------+
|            features|Target|      scaledFeatures|
+--------------------+------+--------------------+
|[70.0,1.0,4.0,130...|     0|[1.70892007713705...|
|[67.0,0.0,3.0,115...|     1|[1.37957787811706...|
|[57.0,1.0,2.0,124...|     1|[0.28177054805043...|
|[64.0,1.0,4.0,128...|     1|[1.05023567909707...|
|[74.0,0.0,2.0,120...|     0|[2.14804300916370...|
+--------------------+------+--------------------+
only showing top 5 rows



In [19]:
train_data, test_data = scaledData.randomSplit([0.5, 0.5])

train_data.show(5)

+--------------------+------+--------------------+
|            features|Target|      scaledFeatures|
+--------------------+------+--------------------+
|(13,[0,2,3,4,7,10...|     0|[-1.9138441120828...|
|(13,[0,2,3,4,7,10...|     0|[-1.4747211800561...|
|(13,[0,2,3,4,7,10...|     0|[-1.3649404470495...|
|(13,[0,2,3,4,7,10...|     0|[-1.0355982480295...|
|(13,[0,2,3,4,7,10...|     0|[-0.9258175150228...|
+--------------------+------+--------------------+
only showing top 5 rows



In [22]:
model = LogisticRegression(labelCol="Target", featuresCol="scaledFeatures")

# Entrenar
trainedModel = model.fit(train_data)

# Imprima los coeficientes y el intercepto para el modelo de regresión logística
print("Coeficientes: " + str(trainedModel.coefficients))
print("Intercepto: " + str(trainedModel.intercept))


Coeficientes: [2.279236582018356,-7.321900982973219,-3.6188770786485365,-3.499863200458144,-4.382683554822123,2.401887609668977,9.395467841324818,9.069128223844562,4.2054576185988095,9.489100836342388,-5.521908425622844,7.589740492829675,41.936818804996584]
Intercepto: -10.2440096335291


In [23]:
def RESULT(x, y):
    if int(x) == int(y):
        return 1
    else:
        return 0

udfCHECK_RESULT = udf(RESULT, IntegerType())
predictions = trainedModel.transform(test_data)
predictions = predictions.withColumn("correct", udfCHECK_RESULT("Target", "prediction"))

# Ver los datos
predictions.show(5)

# conteo de datos
print(f"Aciertos: ",predictions.filter(predictions.correct == 1).count()," de ", predictions.count())

+--------------------+------+--------------------+--------------------+--------------------+----------+-------+
|            features|Target|      scaledFeatures|       rawPrediction|         probability|prediction|correct|
+--------------------+------+--------------------+--------------------+--------------------+----------+-------+
|(13,[0,2,3,4,7,10...|     0|[-1.6942826460694...|[38.8060576690031...|           [1.0,0.0]|       0.0|      1|
|(13,[0,2,3,4,7,10...|     0|[-0.5964753160028...|[59.3662470566400...|           [1.0,0.0]|       0.0|      1|
|(13,[0,2,3,4,7,10...|     0|[0.83067421308374...|[49.8501347095161...|           [1.0,0.0]|       0.0|      1|
|[35.0,0.0,4.0,138...|     0|[-2.1334055780961...|[38.1059730419107...|           [1.0,0.0]|       0.0|      1|
|[35.0,1.0,4.0,120...|     1|[-2.1334055780961...|[-16.190660493565...|[9.30005354236634...|       1.0|      1|
+--------------------+------+--------------------+--------------------+--------------------+----------+-

In [24]:
spark.stop()
