### Frida Lizett Zavala Pérez

### A01275226

# Clustering de Datos de Taxis con PySpark

En este script, se utiliza Apache Spark para realizar clustering en datos de taxis de Nueva York. El proceso incluye la carga de datos desde Google Drive, la limpieza de los datos, la selección de características relevantes, la creación de un modelo de clustering K-means, y la evaluación de la calidad del clustering.

El dataset seleccionado para la implementación fue extraido de kaggle, el cual es un registro de viajes de taxis de Nueva York en el año 2016. El archivo csv tiene un tamaño de 1.78 GB y cuenta con 19 columnas, de las cuales para prpósitos de esta implementacón se seleccionaron las más relevantes.



Se realiza la instalación de pyspark sobre el entorno virtual.

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=1bfe8065903d23311fa6e401c49c10b43ef9765b46d7cf6a7839a533235e605f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Montamos drive para poder acceder a la carpeta donde se encuentra alojado el conjunto de datos que usaremos. Enseguida se configuran las rutas a las carpetas lara poder usarlas en el código.

In [3]:
from google.colab import drive

# Montar Google Drive
drive.mount('/content/gdrive')

import os
drive_path = '/content/gdrive/MyDrive/IA concentracion/Bloque 2/Mod 1/Entregable'
data_path = os.path.join(drive_path, "data")
df4_path = os.path.join(data_path, "yellow_tripdata_2016-03.csv")

Mounted at /content/gdrive


Se instalan las librerías de pyspark que necesitaremos para el problema de clustering, en este caso contruiremos el modelo de agrupamiento con KMeans, del mismo modo añadimos las herramientas para su evaluación posterior.
Se inicia la sesión de spark.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col
from pyspark.sql import functions as F
# Crear una sesión de Spark
spark = SparkSession.builder.appName("TaxiClustering").getOrCreate()

Como en cualquier modelo, es necesario realizar una limpieza y análisis de datos previa para preparar el conjunto de datos y mejorar el desempeño del modelo generado, evitando sesgos importamtes. En este caso se realizó la eliminación de filas con valores nulos usando el método ```na.drop()```, esto ayuda a garantizar que el dataset esté completo y no existan valores faltantes que afecten el entrenamiento del modelo.
También se aplicó un filtrado de valores atípicos para eliminar valores que pudiesen generar ruido en columnas específicas. Tales como valores inválidos o no probables.




In [None]:
df4 = spark.read.csv(df4_path, header=True, inferSchema=True)

df = df4
# Eliminar filas con valores nulos
df = df.na.drop()

# Filtrar valores atípicos en columnas específicas
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("pickup_longitude").isNotNull()) &
    (col("pickup_latitude").isNotNull()) &
    (col("dropoff_longitude").isNotNull()) &
    (col("payment_type").isNotNull()) &
    (col("fare_amount").isNotNull()) &
    (col("tip_amount").isNotNull()) &
    (col("total_amount").isNotNull())
)

Como paso siguiente se seleccionan las columnas que aporten información más relevante, las cuales representas características como el número de pasajeros, la distancia del viaje, las coordenadas de ascenso y descenso, tipo de pago, y las cantidades de la tarifa y propinas.


In [None]:
# Seleccionar las columnas relevantes para el clustering
selected_columns = [
    "passenger_count",
    "trip_distance",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "payment_type",
    "fare_amount",
    "tip_amount",
    "total_amount"
]

Haciendo uso de ``` VectorAssembler``` se combinan las columnas anteriores en una misma llamada "features", que representa un vector de características.

In [None]:
# Combinar las características en una sola columna de características vectoriales
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")
df = assembler.transform(df)


Para generar el aprendizaje y la validación del modelo, se dividen los datos en el conjunto de entrenamiento y prueba, con una proporción de 80% y 20% respectivamente.

In [None]:
# Dividir el conjunto de datos en entrenamiento y validación
train_df, test_df = df.randomSplit([0.8, 0.2], seed=1)

Seguimos con el entrenamiento del modelo, para el cual se crea una instancia KMeans con un número predeterminado de clústers, en este caso fueron 3. y se entrena con el conjunto de entrenamiento.


In [None]:
# Entrenar el modelo K-means
kmeans = KMeans(k=3, seed=1)  # Puedes ajustar el número de clústeres (k) según sea necesario
model = kmeans.fit(train_df)

Se usan las instancias del modelo previamente entrenado para generar predicciones con el conjunto de prueba.

In [4]:
# Hacer predicciones en el conjunto de prueba
predictions = model.transform(test_df)

Finalmente de hace la evaluación del modelo a partir de las predicciones hechas.
Una métrica empleada comunmente para la evaluación de modelos de agrupamiento es *Silhouette with squared euclidean distance* dónde se mide de -1 a 1,muentras más grande sea, mejor se encuentran definidos cada uno de los clústers. En es te caso los resultados obtenidos son buenos ya que se obtuvo un valor de 0.99.

Se muestran los resultados imprimiendo los centroides, así cómo un resumen del modelo en general, (número de clústers y número de puntos).


In [5]:
# Evaluar la calidad del clustering utilizando el evaluador de clustering
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Mostrar los resultados
print("Centroides:")
centers = model.clusterCenters()
for center in centers:
    print(center)

# Imprimir algunas métricas adicionales
print("Métricas adicionales:")
print("Número de clústeres:", len(centers))
print("Tamaño de los clústeres:")
cluster_sizes = predictions.groupBy("prediction").count().collect()
for cluster_size in cluster_sizes:
    print(f"Clúster {cluster_size['prediction']}: {cluster_size['count']} puntos")

Silhouette with squared euclidean distance = 0.9999995561860565
Centroides:
[  1.66119218   3.48950621 -72.93964662  40.18120795 -73.05734287
   1.33819889  12.69799042   1.7888611   15.9448992 ]
[ 1.00000000e+00  1.90726288e+07 -7.39957962e+01  4.07612038e+01
 -7.39957886e+01  2.00000000e+00  2.50000000e+00  0.00000000e+00
  4.30000000e+00]
[ 2.00000000e+00  8.33008320e+06 -7.40049286e+01  4.07300186e+01
 -7.39946136e+01  2.00000000e+00  9.00000000e+00  0.00000000e+00
  9.80000000e+00]
Métricas adicionales:
Número de clústeres: 3
Tamaño de los clústeres:
Clúster 2: 1 puntos
Clúster 0: 2429123 puntos


Finalmente se la por terminada la sesión de spark.

In [7]:
# Terminar la sesión de Spark
spark.stop()

Se adjunta un dashboard dónde se visualiza información relevante del dataset. De dichos gráficos se puede ontener un análisis y entendimiento de los datos más exahustivo ya que nos permite observar mejor diversos comportaminetos o patrones que podrían estar presentes de entrada en el conjunto de datos.

