# Membrete

<img src="https://upload.wikimedia.org/wikipedia/commons/6/6c/Javeriana.svg" alt="Logo Javeriana" width="150"/>

- **Nombre:** Alberto Luis Vigna Arroyo
- **Universidad:** Pontificia Universidad Javeriana
- **Materia:** Procesamiento de Datos a Gran Escala
- **Nombre del Profesor:** John Corredor
- **Correo Electrónico:** a-vigna@javeriana.edu.co
- **Fecha:** 05 de febrero de 2024
- **Objetivo:** Presentar los diferentes métodos de PySpark para el tratamiento de los datos y conocer los primeros pasos de ML con Spark.


**Datos: Censo en California en 1990**
Los datos presentan valores medios de los precios de las viviendas en California para el año 1990.

- longitude: continuous.
- latitude: continuous.
- housingMedianAge: continuous. 
- totalRooms: continuous. 
- totalBedrooms: continuous. 
- population: continuous. 
- households: continuous. 
- medianIncome: continuous. 
- medianHouseValue: continuous. 

## Bibliotecas Necesarias

In [0]:
# Importar las bibliotecas necesarias:

# PySpart
import pyspark

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import col


# Pandas
import pandas as pd


# Para importarlo todo:
from pyspark import *
from pandas import *

## Se levanta la sesión de PySpart

In [0]:
# Se levanta la sesión de PySpart, para hacer uso de los métodos y herramientas que dispone.

sc = SparkContext.getOrCreate()
sql_sc = SQLContext(sc)

sc



In [0]:
# Se carga desde el repositorio de profesor (John Corredor): HC Housing California

headerHC = [
    "longitude", 
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue"
]

print("Header de los datos: \n", headerHC)

pathHC = "https://raw.githubusercontent.com/corredor-john/ExploratoryDataAnalisys/main/Varios/CaliforniaHousing/cal_housing.data"

Header de los datos: 
 ['longitude', 'latitude', 'housingMedianAge', 'totalRooms', 'totalBedrooms', 'population', 'households', 'medianIncome', 'medianHouseValue']


### Visualización de datos con Pandas: 

In [0]:
# Se cargan los datos a un dataframe de PySpark y visualizarlos con Pandas
housing_DFP = pd.read_csv(pathHC, sep = ',', names = headerHC)
housing_DFP

Unnamed: 0,longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0
...,...,...,...,...,...,...,...,...,...
20635,-121.09,39.48,25.0,1665.0,374.0,845.0,330.0,1.5603,78100.0
20636,-121.21,39.49,18.0,697.0,150.0,356.0,114.0,2.5568,77100.0
20637,-121.22,39.43,17.0,2254.0,485.0,1007.0,433.0,1.7000,92300.0
20638,-121.32,39.43,18.0,1860.0,409.0,741.0,349.0,1.8672,84700.0


### Visualización de datos con Spark:

In [0]:
# Se cargan los datos a un dataframe de PySpark y visualizarlos con Spark directamente. (Se formatea el dataframe al tipo Spark ya que este es uno mucho mas liviano y no presenta estructura formal de vista agradable. La idea es que sea rápido, no bonito)
housing_DFS = sql_sc.createDataFrame(housing_DFP)
housing_DFS.show(10)

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|medianHouseValue|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|  -122.23|   37.88|            41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|        452600.0|
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|
|  -122.25|   37.85|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|
|  -122.25|   37.85|            52.0|    1627.0|        280.0|     565.0|     259.0|      3.8462|        342200.0|
|  -122.25|   37.85|            52.0|     919.0|        213.0|     413.0|     19

In [0]:
# Se presentan los tipos de datos del dataset, y si existen o no datos imposibles/nulos/etc.
housing_DFS.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: double (nullable = true)
 |-- totalBedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)



### Conversión de Datos

In [0]:
# Se requiere que algunas columnas pase al tipo, por ejemplo, Entero. Para ello se hace una función que permite el paso o casting de tipos de datos:

def convertirColumnaTipo(dataframe, nombres, nuevoTipo):
    for nombre in nombres:
        dataframe = dataframe.withColumn(nombre, dataframe[nombre].cast(nuevoTipo))
    
    return dataframe    


## Lista de columnas a cambiar:
ColumnasAEntero = ["totalRooms", "totalBedrooms", "population", "households"]
housing_DFS_Entero = convertirColumnaTipo(housing_DFS, ColumnasAEntero, IntegerType())


## Se verifica si el cambio ha sido satisfactorio:
housing_DFS_Entero.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: integer (nullable = true)
 |-- totalBedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)



### Consultas:

#### Consulta de "population" y "totalBedroooms"

In [0]:
# Consulta de "population" y "totalBedroooms"
housing_DFS_Entero.select("totalBedrooms", "population").show(10)

+-------------+----------+
|totalBedrooms|population|
+-------------+----------+
|          129|       322|
|         1106|      2401|
|          190|       496|
|          235|       558|
|          280|       565|
|          213|       413|
|          489|      1094|
|          687|      1157|
|          665|      1206|
|          707|      1551|
+-------------+----------+
only showing top 10 rows



#### Consulta de la "housingMedianAge": 

In [0]:
# Se requiere una consulta de la "housingMedianAge": 
housing_DFS_Entero.groupBy("housingMedianAge").count().show(10)

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|             8.0|  206|
|             7.0|  175|
|            49.0|  134|
|            29.0|  461|
|            47.0|  198|
|            42.0|  368|
|            44.0|  356|
|            35.0|  824|
|            18.0|  570|
|            39.0|  369|
+----------------+-----+
only showing top 10 rows



#### Consulta de la "housingMedianAge" ordenada por la cantidad: 

In [0]:
# Se requiere una consulta de la "housingMedianAge" ordenada por la cantidad: 
housing_DFS_Entero.groupBy("housingMedianAge").count().sort("housingMedianAge", ascending = False).show(10)

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
+----------------+-----+
only showing top 10 rows



#### Resumen estadístico de los datos numéricos (Con Show):

In [0]:
# Se requiere un resumen estadístico de los datos numéricos:
housing_DFS_Entero.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|  housingMedianAge|        totalRooms|     totalBedrooms|        population|        households|      medianIncome|  medianHouseValue|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|              20640|             20640|             20640|             20640|             20640|             20640|             20640|             20640|             20640|
|   mean|-119.56970445736447| 35.63186143410853|28.639486434108527|2635.7630813953488| 537.8980135658915|1425.4767441860465| 499.5396802325581| 3.870671002906976|206855.81690891474|
| stddev| 2.0035317235026016|2.1359523974570953|12.585557612111653|2181.6152515827957|421.

#### Resumen estadístico de los datos numéricos (Método Display):

In [0]:
# Se requiere un resumen estadístico de los datos numéricos (Método Display):
display(housing_DFS_Entero.describe())

summary,longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue
count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0
mean,-119.56970445736448,35.63186143410853,28.639486434108527,2635.7630813953488,537.8980135658915,1425.4767441860463,499.5396802325581,3.870671002906976,206855.81690891477
stddev,2.0035317235026016,2.1359523974570958,12.585557612111652,2181.615251582796,421.247905943132,1132.4621217653412,382.3297528316109,1.8998217179452688,115395.61587441388
min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0
max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0


#### Observaciones
- No se encuentran valores nulos/imposibles/invalidos.
- Se recomienda normalizar los datos, para que queden en una misma celda.
- Se recomienda agregar algunas variables que presenten atributos adicionales (variables, derivadas, habitaciones por hogar).
- La variable "target": *medianHouseValue*, presenta un alto valor o magnitud.

### Correcciones con base en las observaciones:

#### Ajustar el orden de magnitud de la variable target:

In [0]:
# Se ajusta el orden de magnitud de la variable target: Los valores se encuentran en el orden de los 100.000, se divide toda la columna entre 100.000:
housing_DFS_Entero = housing_DFS_Entero.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Inspección visual de las 2 primeras filas:
housing_DFS_Entero.take(2)

[Row(longitude=-122.23, latitude=37.88, housingMedianAge=41.0, totalRooms=880, totalBedrooms=129, population=322, households=126, medianIncome=8.3252, medianHouseValue=4.526),
 Row(longitude=-122.22, latitude=37.86, housingMedianAge=21.0, totalRooms=7099, totalBedrooms=1106, population=2401, households=1138, medianIncome=8.3014, medianHouseValue=3.585)]


## Ingeniería de Funciones

Se añaden variables (variables derivadas) al conjunto de datos:

- **HabitacionesHogares**: Cantidad de habitaciones por hogar.
- **PersonasBloque**: Cantidad de personas por bloque.
- **HabitacionesBloque**: Cantidad de Habitaciones por grupos de bloques.


### Manejo de columnas

In [0]:
# Se imprime los nombres de las columnas de referencia:
housing_DFS_Entero.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedrooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

In [0]:
housing_DFS_Entero = housing_DFS_Entero.withColumn("HabitacionesHogares", col("totalRooms")/col("households"))
housing_DFS_Entero = housing_DFS_Entero.withColumn("PersonasBloques", col("population")/col("households"))
housing_DFS_Entero = housing_DFS_Entero.withColumn("HabitacionesBloques", col("totalBedrooms")/col("totalRooms"))

housing_DFS_Entero.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: integer (nullable = true)
 |-- totalBedrooms: integer (nullable = true)
 |-- population: integer (nullable = true)
 |-- households: integer (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)
 |-- HabitacionesHogares: double (nullable = true)
 |-- PersonasBloques: double (nullable = true)
 |-- HabitacionesBloques: double (nullable = true)



In [0]:
# Imprimir columnas 
housing_DFS_Entero.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedrooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue',
 'HabitacionesHogares',
 'PersonasBloques',
 'HabitacionesBloques']


### Análisis de Columnas

In [0]:
# Se seleccionan sólo las columnas que se van a analizar

housing_DFS_Entero = housing_DFS_Entero.select("medianHouseValue", "totalRooms", "population", "households", "medianIncome", "HabitacionesHogares", "HabitacionesBloques", "PersonasBloques")

display(housing_DFS_Entero)

medianHouseValue,totalRooms,population,households,medianIncome,HabitacionesHogares,HabitacionesBloques,PersonasBloques
4.526,880,322,126,8.3252,6.984126984126984,0.146590909090909,2.555555555555556
3.585,7099,2401,1138,8.3014,6.238137082601054,0.1557965910691646,2.109841827768014
3.521,1467,496,177,7.2574,8.288135593220339,0.1295160190865712,2.8022598870056497
3.413,1274,558,219,5.6431,5.817351598173516,0.184458398744113,2.547945205479452
3.422,1627,565,259,3.8462,6.281853281853282,0.1720958819913952,2.1814671814671813
2.697,919,413,193,4.0368,4.761658031088083,0.2317736670293797,2.139896373056995
2.992,2535,1094,514,3.6591,4.9319066147859925,0.1928994082840236,2.1284046692607004
2.414,3104,1157,647,3.12,4.797527047913447,0.2213273195876288,1.7882534775888717
2.267,2555,1206,595,2.0804,4.294117647058823,0.2602739726027397,2.026890756302521
2.611,3549,1551,714,3.6912,4.970588235294118,0.1992110453648915,2.172268907563025


In [0]:
housing_DFS_Entero.take(1)

[Row(medianHouseValue=4.526, totalRooms=880, population=322, households=126, medianIncome=8.3252, HabitacionesHogares=6.984126984126984, HabitacionesBloques=0.14659090909090908, PersonasBloques=2.5555555555555554)]


## Normalización

- Se hace uso de la función *map()* para separar las características de la variable objetivo: medianHouseValue.
- La función *DenseVector()*. Vector para almacenar matrices de valores.
- Se recrea el dataframe con los datos, y se etiquetan las columnas en lista: "label" y "features".


### Instanciar el objeto

In [0]:
from pyspark.ml.feature import DenseVector, StandardScaler

## Se debe mapear primero para sacar las etiquetas "features" y "label"
### Features
datosEntrada = housing_DFS_Entero.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

datosEntrada.take(1)

[(4.526, DenseVector([880.0, 322.0, 126.0, 8.3252, 6.9841, 0.1466, 2.5556]))]


### Creando el dataframe

In [0]:
## Con el vector denso de (label y features) se crea un dataframe
df_label_features = spark.createDataFrame(datosEntrada, ["label", "features"])

display(df_label_features)

label,features
4.526,"Map(vectorType -> dense, length -> 7, values -> List(880.0, 322.0, 126.0, 8.3252, 6.984126984126984, 0.14659090909090908, 2.5555555555555554))"
3.585,"Map(vectorType -> dense, length -> 7, values -> List(7099.0, 2401.0, 1138.0, 8.3014, 6.238137082601054, 0.15579659106916466, 2.109841827768014))"
3.521,"Map(vectorType -> dense, length -> 7, values -> List(1467.0, 496.0, 177.0, 7.2574, 8.288135593220339, 0.12951601908657123, 2.8022598870056497))"
3.413,"Map(vectorType -> dense, length -> 7, values -> List(1274.0, 558.0, 219.0, 5.6431, 5.8173515981735155, 0.18445839874411302, 2.547945205479452))"
3.422,"Map(vectorType -> dense, length -> 7, values -> List(1627.0, 565.0, 259.0, 3.8462, 6.281853281853282, 0.1720958819913952, 2.1814671814671813))"
2.697,"Map(vectorType -> dense, length -> 7, values -> List(919.0, 413.0, 193.0, 4.0368, 4.761658031088083, 0.23177366702937977, 2.139896373056995))"
2.992,"Map(vectorType -> dense, length -> 7, values -> List(2535.0, 1094.0, 514.0, 3.6591, 4.9319066147859925, 0.19289940828402366, 2.1284046692607004))"
2.414,"Map(vectorType -> dense, length -> 7, values -> List(3104.0, 1157.0, 647.0, 3.12, 4.797527047913447, 0.22132731958762886, 1.7882534775888717))"
2.267,"Map(vectorType -> dense, length -> 7, values -> List(2555.0, 1206.0, 595.0, 2.0804, 4.294117647058823, 0.2602739726027397, 2.026890756302521))"
2.611,"Map(vectorType -> dense, length -> 7, values -> List(3549.0, 1551.0, 714.0, 3.6912, 4.970588235294118, 0.1992110453648915, 2.172268907563025))"


In [0]:
## Se instancia el objeto
estandar = StandardScaler(inputCol="features", outputCol="features_escaladas")


## Se entrena al objeto
objetoEscalado = estandar.fit(df_label_features)


## Transforma el objeto
objetoEscaladorDF = objetoEscalado.transform(df_label_features)


## Visualización: 1 fila
objetoEscaladorDF.take(1)

[Row(label=4.526, features=DenseVector([880.0, 322.0, 126.0, 8.3252, 6.9841, 0.1466, 2.5556]), features_escaladas=DenseVector([0.4034, 0.2843, 0.3296, 4.3821, 2.8228, 2.5264, 0.2461]))]