# **Análisis libre**

Realiza un análisis, el que el alumno/a desee, y considerando el conjunto de datos que quiera. Puede ser un análisis de Clustering, de cualquier tipo de algoritmo o algoritmos de clasificación, de reglas de asociación, etc.

La calificación irá en función de:

* Uso de técnicas no vistas en clase (ejemplos dados)
* Explicación de los pasos seguidos
* Interés del problema/solución

# **Instalación del entorno**

## Instalación de Hadoop

Instalamos la versión de Hadoop/Spark 3.2.4
Se recomienda visitar el sitio de Apache Spark para descargar esta versión:

https://spark.apache.org/downloads.html

Se configuran posteriormente las variables de entorno `JAVA_HOME` y `SPARK_HOME`

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

La descarga de Hadoop puede tomar su tiempo, según la conexión disponible. Se borra posteriormente de la máquina virtual el archivo `.tgz

In [None]:
!wget https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar -xf spark-3.2.4-bin-hadoop3.2.tgz
!rm spark-3.2.4-bin-hadoop3.2.tgz

## Instalación e iniciación de la sesión de Spark

* Buscamos la librería `findspark` con `pip install`

In [None]:
!pip install findspark

* Con `SparkSession` inicializamos

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Spark_Dataframes")\
        .getOrCreate()

spark

# **Lectura del dataset**

In [None]:
!pip install ucimlrepo

In [None]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
cdc_diabetes_health_indicators = fetch_ucirepo(id=891)

# data (as pandas dataframes)
X = cdc_diabetes_health_indicators.data.features
y = cdc_diabetes_health_indicators.data.targets

# **Análisis Clustering**

In [None]:
#######################################
# (A) FETCH DATA & WRITE TO CSV (LOCAL)
#######################################
!pip install ucimlrepo pyspark cloudpickle seaborn scikit-learn --quiet

from ucimlrepo import fetch_ucirepo
import pandas as pd
import numpy as np
import os

# 1. Fetch data locally in pure Python/Pandas
cdc_data = fetch_ucirepo(id=891)
X = cdc_data.data.features
y = cdc_data.data.targets
data_df = pd.concat([X, y], axis=1)

# 2. Convert to numeric in Pandas
numeric_data_df = data_df.apply(pd.to_numeric, errors='coerce').fillna(0).astype(float)

# 3. Save to CSV
csv_path = "/tmp/cdc_diabetes.csv"
numeric_data_df.to_csv(csv_path, index=False)

########################################
# (B) SPARK SESSION & KMEANS CLUSTERING
########################################
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("HealthClustering").getOrCreate()

# 1. Read the CSV into Spark DataFrame
df = spark.read.csv(csv_path, header=True, inferSchema=True)

# 2. Choose columns
all_features = [
    "BMI", "MentHlth", "PhysHlth", "HighBP", "HighChol",
    "Smoker", "PhysActivity", "Fruits", "Veggies", "HvyAlcoholConsump",
    "Age", "Education", "Income"
]
valid_features = [f for f in all_features if f in df.columns]
data = df.select(valid_features)

# 3. Fill missing with mean
from pyspark.sql.functions import mean
for f in valid_features:
    mean_val = data.select(mean(f)).collect()[0][0]
    if mean_val is not None:
        data = data.fillna({f: mean_val})

# 4. Assemble & scale
from pyspark.ml.feature import VectorAssembler, StandardScaler
assembler = VectorAssembler(inputCols=valid_features, outputCol="features")
data = assembler.transform(data)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

# 5. KMeans
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="scaledFeatures", k=4, seed=1)
model = kmeans.fit(data)
predictions = model.transform(data)

# 6. Silhouette score
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol="scaledFeatures", metricName="silhouette")
silhouette = evaluator.evaluate(predictions)
print("Silhouette Score:", silhouette)

# 7. Show cluster centers
centers = model.clusterCenters()
for i, c in enumerate(centers):
    print(f"Center {i}:", c)

# 8. Show cluster counts
predictions.groupBy("prediction").count().show()

##########################################
# (C) COLLECT TO PANDAS & STOP SPARK
##########################################
# 1. Convert Spark predictions to Pandas
pdf = predictions.select(*valid_features, "prediction").toPandas()

# 2. *Stop* Spark so we don't trigger pickling errors when plotting
spark.stop()

##########################################
# (D) VISUALIZATION IN PURE PYTHON
##########################################
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")  # A nicer background

###############
# (D1) Bar Plot
###############
cluster_counts = pdf["prediction"].value_counts().sort_index()
df_counts = pd.DataFrame({"cluster": cluster_counts.index.astype(str),
                          "count": cluster_counts.values})

plt.figure(figsize=(6,4))
sns.barplot(data=df_counts, x="cluster", y="count", palette="Set2")
plt.xlabel("Cluster")
plt.ylabel("Count")
plt.title("Number of Points per Cluster")
plt.show()

################################
# (D2) Optional PCA Scatter Plot
################################
# We'll project the scaled features to 2D with PCA
# so we can visualize cluster separation.

# We already used 'scaledFeatures' in Spark, but let's replicate that in Python:
# - The easiest method is to re-assemble and scale with scikit-learn in Python,
#   or re-collect "scaledFeatures" from Spark. For demonstration:
#   We'll just use the raw columns from pdf and scale them with scikit-learn here.
subpdf = pdf[valid_features].values  # raw features
from sklearn.preprocessing import StandardScaler as SkScaler
subpdf_scaled = SkScaler().fit_transform(subpdf)

from sklearn.decomposition import PCA
pca = PCA(n_components=2)
pca_data = pca.fit_transform(subpdf_scaled)

pdf["pca1"] = pca_data[:, 0]
pdf["pca2"] = pca_data[:, 1]

plt.figure(figsize=(7,5))
sns.scatterplot(
    data=pdf, x="pca1", y="pca2", hue="prediction",
    palette="Set2", alpha=0.6, edgecolor=None
)
plt.title("Clusters in 2D PCA space")
plt.legend(title="Cluster", loc="best")
plt.show()

