In [1]:
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('clasificacion_penguins').getOrCreate()
df = spark.createDataFrame(sns.load_dataset('penguins'))
df.show(5)

+-------+---------+--------------+-------------+-----------------+-----------+------+
|species|   island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+--------------+-------------+-----------------+-----------+------+
| Adelie|Torgersen|          39.1|         18.7|            181.0|     3750.0|  Male|
| Adelie|Torgersen|          39.5|         17.4|            186.0|     3800.0|Female|
| Adelie|Torgersen|          40.3|         18.0|            195.0|     3250.0|Female|
| Adelie|Torgersen|           NaN|          NaN|              NaN|        NaN|   NaN|
| Adelie|Torgersen|          36.7|         19.3|            193.0|     3450.0|Female|
+-------+---------+--------------+-------------+-----------------+-----------+------+
only showing top 5 rows



In [2]:
# Queremos predecir species por tanto elimino filas donde species sea nan
df = df.dropna(subset=['species'])
# si estuviera en dataframe de pandas: 
# df['island'] = df['island'].fillna('other')

In [3]:
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

## Imputer

In [4]:
from pyspark.sql.types import NumericType, StringType

numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
# Filtramos species porque species es la variable a predecir y ya hemos asegurado que no tiene nan
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != 'species']

print(numeric_cols)
print(categorical_cols)

['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
['island', 'sex']


In [5]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    strategy='median',
    inputCols=numeric_cols,
    outputCols=[col + '_imputed' for col in numeric_cols]
)
imputer_model = imputer.fit(df_train) # fit solo sobre train para evitar fuga de datos data leakage
df_train = imputer_model.transform(df_train)
df_test = imputer_model.transform(df_test)
df_train.show(3)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             3800.0|
| Adelie|Biscoe|          35.9|         19.2|            189.0|     3800.0|Female|                  35.9|                 19.2|                    189.0|             3800.0|
| Adelie|Biscoe|          37.8|         18.3|            174.0|     3400.0|Female|                  37.8|                 18.3|   

In [6]:
df_train.printSchema()

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- bill_length_mm: double (nullable = true)
 |-- bill_depth_mm: double (nullable = true)
 |-- flipper_length_mm: double (nullable = true)
 |-- body_mass_g: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- bill_length_mm_imputed: double (nullable = true)
 |-- bill_depth_mm_imputed: double (nullable = true)
 |-- flipper_length_mm_imputed: double (nullable = true)
 |-- body_mass_g_imputed: double (nullable = true)



In [7]:
# Opción 1: rellenar con un valor fijo
# al inferir el schema automáticamente nos está diciendo que la columna sex NO es nullable y no tiene nan, por lo que en realidad
# lo que está pasando es que los NaN los tiene como palabras 'NaN' texto, por tanto no sirve el fill y usamos replace:

# df_train = df_train.na.fill('other', subset=categorical_cols)
# df_test = df_test.na.fill('other', subset=categorical_cols)

# df_train = df_train.fillna('other', subset=categorical_cols)
# df_test = df_test.fillna('other', subset=categorical_cols)

# df_train = df_train.replace('NaN', 'other', subset=categorical_cols)
# df_test = df_test.replace('NaN', 'other', subset=categorical_cols)

# df_train.show(5)

In [8]:
# Opción 2: usar Imputer
# No se puede, IllegalArgumentException, requiere numéricas.
# Se haría si hemos hecho primero un StringIndexer para convertir a numéricas a índices
# imputer = Imputer(
#     strategy='mode',
#     inputCols=categorical_cols,
#     outputCols=[col + '_imputed' for col in categorical_cols]
# )
# imputer_model = imputer.fit(df_train) # fit solo sobre train para evitar fuga de datos data leakage
# df_train = imputer_model.transform(df_train)
# df_test = imputer_model.transform(df_test)

In [9]:
#ver value_counts para ver valors más frecuentes
from pyspark.sql.functions import col
df.groupBy('island').count().orderBy(col('count').desc()).show()

+---------+-----+
|   island|count|
+---------+-----+
|   Biscoe|  168|
|    Dream|  124|
|Torgersen|   52|
+---------+-----+



In [10]:
df.groupBy('sex').count().orderBy(col('count').desc()).show()

+------+-----+
|   sex|count|
+------+-----+
|  Male|  168|
|Female|  165|
|   NaN|   11|
+------+-----+



In [11]:
# Opción 3: rellenar con la moda
island_mode = df.groupBy('island').count().orderBy(col('count').desc()).first()['island']
sex_mode = df.groupBy('sex').count().orderBy(col('count').desc()).first()['sex']

df_train = df_train.replace('NaN', island_mode, subset=['island'])
df_test = df_test.replace('NaN', island_mode, subset=['island'])

df_train = df_train.replace('NaN', sex_mode, subset=['sex'])
df_test = df_test.replace('NaN', sex_mode, subset=['sex'])

df_train.show(5)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             3800.0|
| Adelie|Biscoe|          35.9|         19.2|            189.0|     3800.0|Female|                  35.9|                 19.2|                    189.0|             3800.0|
| Adelie|Biscoe|          37.8|         18.3|            174.0|     3400.0|Female|                  37.8|                 18.3|   

## StringIndexer + OneHotEncoder

## Scaler

In [12]:
from pyspark.ml.feature import StringIndexer
indexed_label = StringIndexer(inputCol='species', outputCol='label')
indexed_model = indexed_label.fit(df_train)
df_train = indexed_model.transform(df_train)
df_test = indexed_model.transform(df_test)

for categorical_col in categorical_cols:
    indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + '_indexed')
    model = indexer.fit(df_train)
    df_train = model.transform(df_train)
    df_test = model.transform(df_test)
    
df_train.show(3)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|label|island_indexed|sex_indexed|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             3800.0|  0.0|           0.0|        1.0|
| Adelie|Biscoe|          35.9|         19.2|            189.0|     3800.0|Female|                  35.9|                 19.2|                    189.0|             3800.0

In [13]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=[col + '_indexed' for col in categorical_cols],
    outputCols= [col + '_onehot' for col in categorical_cols]
)

model = encoder.fit(df_train)
df_train = model.transform(df_train)
df_test = model.transform(df_test)

df_train.show(5)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+-------------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|label|island_indexed|sex_indexed|island_onehot|   sex_onehot|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+-------------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             3800.0|  0.0|           0.0|        1.0|(2,[0],[1.0])|    (1,[],[])|
| Adelie|Biscoe|          35.9|         19.2|            189

In [14]:
onehot = [col + '_onehot' for col in categorical_cols]
imputed = [col + '_imputed' for col in numeric_cols]
onehot + imputed

['island_onehot',
 'sex_onehot',
 'bill_length_mm_imputed',
 'bill_depth_mm_imputed',
 'flipper_length_mm_imputed',
 'body_mass_g_imputed']

In [15]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=onehot + imputed,
    outputCol='features'
)
df_train = assembler.transform(df_train)
df_test = assembler.transform(df_test)
df_train.show(2)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+----------+--------------------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|label|island_indexed|sex_indexed|island_onehot|sex_onehot|            features|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+----------+--------------------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             3800.0|  0.0|           0.0|        1.0|(2,[0],[1.0])| (1,[],[])|[1.0,0.0,0

In [16]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol='features',
    outputCol='scaled_festures',
    withMean=True
)

model = scaler.fit(df_train)
df_train = model.transform(df_train)
df_test = model.transform(df_test)

df_train.show(5)

+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+-------------+--------------------+--------------------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|bill_length_mm_imputed|bill_depth_mm_imputed|flipper_length_mm_imputed|body_mass_g_imputed|label|island_indexed|sex_indexed|island_onehot|   sex_onehot|            features|     scaled_festures|
+-------+------+--------------+-------------+-----------------+-----------+------+----------------------+---------------------+-------------------------+-------------------+-----+--------------+-----------+-------------+-------------+--------------------+--------------------+
| Adelie|Biscoe|          35.3|         18.9|            187.0|     3800.0|Female|                  35.3|                 18.9|                    187.0|             380

In [17]:
from pyspark.ml.classification import (
    LogisticRegression, 
    DecisionTreeClassifier, 
    RandomForestClassifier, 
    GBTClassifier, 
    MultilayerPerceptronClassifier
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [18]:
lr = LogisticRegression()
model = lr.fit(df_train)
df_pred = model.transform(df_test)
evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

In [20]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 1.0
f1 1.0
precision 1.0
recall 1.0


In [21]:
lr = LogisticRegression(featuresCol='scaled_festures')
model = lr.fit(df_train)
df_pred = model.transform(df_test)
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 1.0
f1 1.0
precision 1.0
recall 1.0


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 59156)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =