# ANÁLISIS DATAFRAME 'DIAMONDS' CON PYSPARK.

## OBJETIVO DEL ESTUDIO.

<p>El objetivo del estudio es crear un modelo que sea capaz de predecir el precio de un diamante.</p>

## 1.-CARGA DE LIBRERÍAS.

In [10]:
import seaborn as sns
import pandas as pd
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType, NumericType, IntegerType
from pyspark.sql.functions import col, sum
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

## 2.-CREACIÓN SESIÓN PYSPARK.

In [11]:
spark = SparkSession.builder.appName("pipeline_diamonds").getOrCreate()

## 3.-CARGA DEL DATAFRAME.

In [12]:
url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv' # Se define la URL del archivo csv.
csv_path = 'diamonds.csv'           # Se define el nombre del archivo csv.

with open(csv_path, 'wb') as file:                  # Se descarga el archivo csv.
    file.write(requests.get(url).content)
    
schema = StructType([                               # Se define el Schema del DataFrame.
    StructField('caract', FloatType(), True),
    StructField('cut', StringType(), True),
    StructField('color', StringType(), True),
    StructField('clarity', StringType(), True),
    StructField('depth', FloatType(), True),
    StructField('table', FloatType(), True),
    StructField('price', FloatType(), True),
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True)
])

df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)    # Se lee el archivo csv con el Schema definido.
df.show(5)          # Se muestran los primeros 5 registros del DataFrame.
df.printSchema()    # Se muestra el Schema del DataFrame.

+------+-------+-----+-------+-----+-----+-----+----+----+----+
|caract|    cut|color|clarity|depth|table|price|   x|   y|   z|
+------+-------+-----+-------+-----+-----+-----+----+----+----+
|  0.23|  Ideal|    E|    SI2| 61.5| 55.0|326.0|3.95|3.98|2.43|
|  0.21|Premium|    E|    SI1| 59.8| 61.0|326.0|3.89|3.84|2.31|
|  0.23|   Good|    E|    VS1| 56.9| 65.0|327.0|4.05|4.07|2.31|
|  0.29|Premium|    I|    VS2| 62.4| 58.0|334.0| 4.2|4.23|2.63|
|  0.31|   Good|    J|    SI2| 63.3| 58.0|335.0|4.34|4.35|2.75|
+------+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows

root
 |-- caract: float (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: float (nullable = true)
 |-- table: float (nullable = true)
 |-- price: float (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)



## 4.-PRE-PROCESADOS.

### 4.1.-Preparación previa del DataFRame.

#### 4.1.1.-Eliminación de valores 'NaN' en la columna objetivo.

In [13]:
# La variable objetivo es 'precio'. Por tanto, se eliminan las filas con valores nulos en esta variable.
df = df.dropna(subset=['price'])

#### 4.1.2.-Contabilizar valores 'NaN' en las columnas del DataFrame.

In [14]:
# Se cuentan los valores nulos en cada columna.
df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+------+---+-----+-------+-----+-----+-----+---+---+---+
|caract|cut|color|clarity|depth|table|price|  x|  y|  z|
+------+---+-----+-------+-----+-----+-----+---+---+---+
|     0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+------+---+-----+-------+-----+-----+-----+---+---+---+



## 5.-REGRESIÓN DE LA COLUMNA NUMÉRICA 'price'.

### 5.1.-Separación de las columnas numéricas, no-numéricas y variable objetivo.

In [15]:
# Se seleccionan los nombres de las columnas a las que aplicar Preprocesados.
num_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
cat_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != 'price']
label_col = 'price'

### 5.2.-Indexados.

#### 5.2.1.-Indexado de la columna objetivo: 'price'.

In [None]:
# Indexer para la columna a predecir 'price'.
indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)

### 5.2.2.-Indexado de la columnas categóricas.

In [None]:
# Indexers para las features de la entrada que no son la columna label a predecir.
# Se crea un objeto StringIndexer por cada columna categórica a indexar.
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in cat_cols
]
cat_cols_indexed = [c + '_indexed' for c in cat_cols]
print(cat_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


### 5.3.-Procesados.

#### 5.3.1.-Columnas categóricas.