<a href="https://colab.research.google.com/github/PedroAdair/TesisMaestria/blob/main/PySpark_Gower.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
#@title Instalar librerias
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import DenseVector, VectorUDT, Vector
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline 
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import udf, array
from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.sql.functions import udf, expr
from pyspark.sql.types import DoubleType

import numpy as np

In [None]:
# Creamos una sesión de Spark
spark = SparkSession.builder.getOrCreate()

## Del dataset a la matriz de distancias

In [47]:
# Creamos un DataFrame de ejemplo
data = [(1, "M", 25, 150),
        (2, "F", 30, 180),
        (3, "F", 25, 160),
        (4, "M", 35, 170)]
columns = ["id", "gender", "age", "height"]
df = spark.createDataFrame(data, columns)

# Definimos las variables numéricas y categóricas
numeric_cols = ["age", "height"]
categorical_cols = ["gender"]

In [48]:
df.show()

+---+------+---+------+
| id|gender|age|height|
+---+------+---+------+
|  1|     M| 25|   150|
|  2|     F| 30|   180|
|  3|     F| 25|   160|
|  4|     M| 35|   170|
+---+------+---+------+



In [49]:
# Codificación de variables categóricas
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep") for col_name in categorical_cols]

# Pipeline para encadenar los pasos de codificación
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)


In [50]:
df.show()

+---+------+---+------+------------+
| id|gender|age|height|gender_index|
+---+------+---+------+------------+
|  1|     M| 25|   150|         1.0|
|  2|     F| 30|   180|         0.0|
|  3|     F| 25|   160|         0.0|
|  4|     M| 35|   170|         1.0|
+---+------+---+------+------------+



In [51]:
# VectorAssembler para combinar todas las variables en un solo vector
assembler = VectorAssembler(inputCols=numeric_cols + [col_name + "_index" for col_name in categorical_cols], outputCol="features")
df = assembler.transform(df)

In [52]:
df.show()

+---+------+---+------+------------+----------------+
| id|gender|age|height|gender_index|        features|
+---+------+---+------+------------+----------------+
|  1|     M| 25|   150|         1.0|[25.0,150.0,1.0]|
|  2|     F| 30|   180|         0.0|[30.0,180.0,0.0]|
|  3|     F| 25|   160|         0.0|[25.0,160.0,0.0]|
|  4|     M| 35|   170|         1.0|[35.0,170.0,1.0]|
+---+------+---+------+------------+----------------+



In [None]:
# Definimos una función para calcular la distancia de Gower
def calculate_gower_distance(row, features):
    values = features.toArray()
    gower_sum = sum([abs(float(values[i]) - float(row[i])) for i in range(len(values))])
    return gower_sum / len(values)

# Registramos la función como UDF
calculate_gower_distance_udf = spark.udf.register("calculate_gower_distance", calculate_gower_distance, DoubleType())

# Agregamos una columna con la distancia de Gower calculada para cada fila
df = df.withColumn("gower_distance", calculate_gower_distance_udf(col("features"), col("features")))


In [None]:
df.show()

+---+------+---+------+------------+----------------+--------------+
| id|gender|age|height|gender_index|        features|gower_distance|
+---+------+---+------+------------+----------------+--------------+
|  1|     M| 25|   150|         1.0|[25.0,150.0,1.0]|           0.0|
|  2|     F| 30|   180|         0.0|[30.0,180.0,0.0]|           0.0|
|  3|     F| 25|   160|         0.0|[25.0,160.0,0.0]|           0.0|
|  4|     M| 35|   170|         1.0|[35.0,170.0,1.0]|           0.0|
+---+------+---+------+------------+----------------+--------------+



In [56]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.linalg import DenseVector

# Creamos un DataFrame de ejemplo
data = [(1, "M", 25, 150),
        (2, "F", 30, 180),
        (3, "F", 25, 160),
        (4, "M", 35, 170)]
columns = ["id", "gender", "age", "height"]
df = spark.createDataFrame(data, columns)

# Definimos las variables numéricas y categóricas
numeric_cols = ["age", "height"]
categorical_cols = ["gender"]

# Codificación de variables categóricas
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep") for col_name in categorical_cols]
df = df
for indexer in indexers:
    df = indexer.fit(df).transform(df)

# VectorAssembler para combinar todas las variables en un solo vector
assembler = VectorAssembler(inputCols=numeric_cols + [col_name + "_index" for col_name in categorical_cols], outputCol="features")
df = assembler.transform(df)

# IDs de las observaciones que deseas comparar
id1 = 1
id2 = 2

# Filtrar las observaciones del DataFrame
row1 = df.filter(col("id") == id1).select("features").first()
row2 = df.filter(col("id") == id2).select("features").first()

# Calcular la distancia de Gower entre las observaciones seleccionadas
values1 = row1.features.toArray()
values2 = row2.features.toArray()
gower_sum = sum([abs(float(values1[i]) - float(values2[i]))(max()) for i in range(len(values1))])
gower_distance = gower_sum / len(values1)

print(gower_distance)


12.0


##Eliminar aristas y construir el grafo