# Agrupamiento Semillas

El grupo examinado estaba compuesto por granos pertenecientes a tres variedades diferentes de trigo: Kama, Rosa y Canadian, 70 elementos cada uno, seleccionados al azar para el experimento. Se detectó una visualización de alta calidad de la estructura interna del grano utilizando una técnica de rayos X blandos. Es no destructiva y considerablemente más barata que otras técnicas de imagen más sofisticadas, como la microscopía de barrido o la tecnología láser. Las imágenes se registraron en placas KODAK de rayos X de 13 x 18 cm. Los estudios se llevaron a cabo utilizando granos de trigo cosechados con cosechadoras procedentes de campos experimentales, explorados en el Instituto de Agrofísica de la Academia Polaca de Ciencias en Lublin. 

**Información sobre los atributos**
1. área A, 
2. perímetro P, 
3. compacidad C = 4*pi*A/P^2, 
4. longitud del grano, 
5. anchura del grano, 
6. coeficiente de asimetría 
7. longitud del surco del grano. 

**Hipótesis: Los datos se deben agrupar en grupos de 3 mediante KMeans**

In [1]:
import warnings
warnings.filterwarnings("ignore")

## Creación Spark

In [2]:
import os, subprocess

java8_home = "/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"

os.environ["JAVA_HOME"] = java8_home
os.environ["PATH"] = os.path.join(java8_home, "bin") + os.pathsep + os.environ.get("PATH","")

os.environ["HADOOP_USER_NAME"] = os.environ.get("USER", "tomas")

print("JAVA_HOME fijado a:", os.environ["JAVA_HOME"])
try:
    print("which java (kernel):", subprocess.check_output(["which","java"]).decode().strip())
    print("java -version (kernel):")
    print(subprocess.check_output(["java","-version"], stderr=subprocess.STDOUT).decode())
except Exception as e:
    print("Error llamando a java desde kernel:", e)

JAVA_HOME fijado a: /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
which java (kernel): /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java
java -version (kernel):
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster').getOrCreate()

25/09/13 21:54:29 WARN Utils: Your hostname, MacBook-Air-de-Tomas-3.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
25/09/13 21:54:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/09/13 21:54:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/13 21:54:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/09/13 21:54:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/09/13 21:54:32 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## Importación de Datos

In [4]:
data = spark.read.csv("../PySparkCourse/MLData/seeds_dataset.csv",header=True,inferSchema=True)

                                                                                

In [10]:
data.show(5)

+-----+---------+-----------+------------------+------------------+---------------------+----------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|length_of_groove|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|            5.22|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|           4.956|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|           4.825|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|           4.805|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355|           5.175|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
only showing top 5 rows



In [7]:
data.describe().show()

                                                                                

+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|summary|              area|         perimeter|         compactness|   length_of_kernel|   width_of_kernel|asymmetry_coefficient|   length_of_groove|
+-------+------------------+------------------+--------------------+-------------------+------------------+---------------------+-------------------+
|  count|               210|               210|                 210|                210|               210|                  210|                210|
|   mean|14.847523809523816|14.559285714285718|  0.8709985714285714|  5.628533333333335| 3.258604761904762|   3.7001999999999997|  5.408071428571429|
| stddev|2.9096994306873647|1.3059587265640225|0.023629416583846364|0.44306347772644983|0.3777144449065867|   1.5035589702547392|0.49148049910240543|
|    min|             10.59|             12.41|              0.8081|              4.899|            

## Transformación de Datos

- **VectorAssembler:** Conversión de datos a features para el manejo del modelo

In [11]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [12]:
data.columns

['area',
 'perimeter',
 'compactness',
 'length_of_kernel',
 'width_of_kernel',
 'asymmetry_coefficient',
 'length_of_groove']

In [13]:
vec_assembler = VectorAssembler(inputCols = data.columns, outputCol='features')
final_data = vec_assembler.transform(data)

## Escalamiento de los Datos

- **StandardScaler:** Se utiliza para llevar cada característica a una escala comparable
- **MinMaxScaler:** Se utiliza para llevar cada característica a un rango entre [0, 1]

In [14]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler

In [17]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures", min=0, max=1)
scalerModel = scaler.fit(final_data)
final_data = scalerModel.transform(final_data)

                                                                                

## Modelo

In [18]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col

### Identificación de K con el índice de Silhouette

In [19]:
silhouette_scores = []
ks = list(range(2, 11))

for k in ks:
    kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=42, initMode="k-means||")
    model_k = kmeans.fit(final_data)
    preds = model_k.transform(final_data)  # contiene columna "prediction"
    evaluator = ClusteringEvaluator(featuresCol="scaledFeatures", predictionCol="prediction",
                                     metricName="silhouette", distanceMeasure="squaredEuclidean")
    sil = evaluator.evaluate(preds)
    silhouette_scores.append((k, sil))
    print(f"k={k}, silhouette={sil:.4f}")

25/09/13 22:02:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/09/13 22:02:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

k=2, silhouette=0.7095


                                                                                

k=3, silhouette=0.6209


                                                                                

k=4, silhouette=0.5321


                                                                                

k=5, silhouette=0.4433


                                                                                

k=6, silhouette=0.4390
k=7, silhouette=0.4012
k=8, silhouette=0.4398
k=9, silhouette=0.4199
k=10, silhouette=0.4127


In [21]:
best_k, best_sil = max(silhouette_scores, key=lambda x: x[1])
print(f"Best k: {best_k} with silhouette score: {best_sil:.4f}")

Best k: 2 with silhouette score: 0.7095


### Entrenar el Modelo con el mejor Silhouette

In [22]:
kmeans_final = KMeans(featuresCol="scaledFeatures", k=best_k, seed=42, initMode="k-means||")
kmodel = kmeans_final.fit(final_data)

### Centroides y Distribución de Clusters

In [23]:
centers = kmodel.clusterCenters()
print("Centers (scaled):")
for i, c in enumerate(centers):
    print(i, c)

Centers (scaled):
0 [0.71752149 0.75619835 0.68529707 0.69545697 0.73595116 0.35770963
 0.72098914]
1 [0.22309839 0.26703775 0.50581033 0.24931088 0.28475229 0.39521383
 0.27710781]


In [24]:
clustered = kmodel.transform(final_data).withColumnRenamed("prediction", "cluster")
clustered.groupBy("cluster").count().orderBy("cluster").show()



+-------+-----+
|cluster|count|
+-------+-----+
|      0|   76|
|      1|  134|
+-------+-----+



                                                                                