# Entregable Modulo 1: Utilización, procesamiento y visualización de grandes volúmenes de datos 

## Jose Pablo Cobos Austria  A01274631 

### 1.- Descripcion de la actividad


Este documento trata sobre el uso de la herramienta PySpark para el manejo de un conjunto de datos de gran volumen, para poder generar un modelo inteligente,y asimismo hacer uso de la aplicacion de Tableau para poder visualizar los datos 

### 2.- Configuracion del entorno de trabajo para usar PySpark 

Lo primero que vamos hacer antes de iniciar con nuestra generación el modelo es la configuración del entorno que utilizaremos para trabajar con PySpark. 

En cuánto a la información o características del sistema que podemos trabajar tenemos que estamos trabajando con el sistema operativo Arch Linux v6.0.9, un procesador Intel i5-10500H y un total de 16 gigas de RAM

A continuación importamos lo que son las todas las librerías Spark que utilizaremos para poder trabajar con los datos y generar el modelo

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt


Lo siguiente que se realizó fue iniciar una sesión de Spark qué será nuestro entorno de trabajo, bajo el nombre de Cobos Big Data

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Cobos Big Data")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

22/11/26 09:43:15 WARN Utils: Your hostname, CubesLaptop resolves to a loopback address: 127.0.1.1; using 10.25.65.197 instead (on interface wlp0s20f3)
22/11/26 09:43:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/26 09:43:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Nos aseguramos que nuestra configuracion fuese la correcta y estuviera funcionando 

In [3]:
spark

### 3.- Seleccion y carga de datos 

Ya con nuestro entorno de trabajo ya configurado correctamente el siguiente paso a seguir fue la selección y carga de datos, utilizará fue encontrado del siguiente link: https://bit.ly/3EzU4ai

El peso total de dicho es más de  1 GB, por lo que cumple con el objetivo de analizar un dataset con un gran volumen de datos

In [7]:
df_main = spark.read.option("inferSchema", "true").csv("geometries.csv",header=True)


                                                                                

### 4.- Creacion del modelo inteligente usando MLib

##### 4.1 EDA

Para la generacion del modelo inteligente lo primero que vamos a realizar es el EDA (Exploratory Data Analysis), para poder visualizar la informacion que tenemos de nuestros datos 

Imprimimos un esquema de nuestro dataset, para poder visualizar cuales son las columnas y que tipo de dato contienen

In [8]:
df_main.printSchema()

root
 |-- apartment_id: string (nullable = true)
 |-- site_id: integer (nullable = true)
 |-- building_id: integer (nullable = true)
 |-- plan_id: integer (nullable = true)
 |-- floor_id: integer (nullable = true)
 |-- unit_id: integer (nullable = true)
 |-- area_id: string (nullable = true)
 |-- unit_usage: string (nullable = true)
 |-- entity_type: string (nullable = true)
 |-- entity_subtype: string (nullable = true)
 |-- geometry: string (nullable = true)
 |-- elevation: double (nullable = true)
 |-- height: double (nullable = true)



In [9]:
df_main.show()

+--------------------+-------+-----------+-------+--------+-------+--------+----------+-----------+--------------+--------------------+---------+------+
|        apartment_id|site_id|building_id|plan_id|floor_id|unit_id| area_id|unit_usage|entity_type|entity_subtype|            geometry|elevation|height|
+--------------------+-------+-----------+-------+--------+-------+--------+----------+-----------+--------------+--------------------+---------+------+
|d338ccd5607781e63...|    692|       1460|   4185|    6365|  40063|665573.0|COMMERCIAL|       area|         LOBBY|POLYGON ((1.60122...|     14.5|   2.6|
|d338ccd5607781e63...|    692|       1460|   4185|    6365|  40063|368602.0|COMMERCIAL|       area|     STOREROOM|POLYGON ((1.81678...|     14.5|   2.6|
|d338ccd5607781e63...|    692|       1460|   4185|    6365|  40063|368605.0|COMMERCIAL|       area|       BALCONY|POLYGON ((-5.8129...|     14.5|   2.6|
|d338ccd5607781e63...|    692|       1460|   4185|    6365|  40063|368617.0|COMMER

Como podemos observar arriba el dataset anterior y lo que se vio en el el sitio web del dataset de Kaggle este dataset es de un conjunto de apartamentos, teniendo informacion de un conjunto de mas de 42,500 departamentos

Por eso para este caso, se realizara una realizara un regresion logistica para saber si unit_sage sera Residencial o Publica

##### 4.2 ETL

Primero vamos a quitar todas las filas que tienen Comercial en la columna de unit_sage

In [95]:
df_main_filtrado = df_main.where(df_main.unit_usage!="COMMERCIAL")

df_main_filtrado = df_main_filtrado.where(df_main.unit_usage!="JANITOR")

In [96]:
df_main_filtrado.show()

+------------+-------+-----------+-------+--------+-------+--------+----------+-----------+--------------+--------------------+---------+------+
|apartment_id|site_id|building_id|plan_id|floor_id|unit_id| area_id|unit_usage|entity_type|entity_subtype|            geometry|elevation|height|
+------------+-------+-----------+-------+--------+-------+--------+----------+-----------+--------------+--------------------+---------+------+
|        null|    692|       1460|   4185|    6365|   null|368603.0|    PUBLIC|       area|     STAIRCASE|POLYGON ((4.41013...|     14.5|   2.6|
|        null|    692|       1460|   4185|    6365|   null|368604.0|    PUBLIC|       area|     STAIRCASE|POLYGON ((2.16726...|     14.5|   2.6|
|        null|    692|       1460|   4185|    6365|   null|368616.0|    PUBLIC|       area|      ELEVATOR|POLYGON ((4.03602...|     14.5|   2.6|
|        null|    692|       1460|   4185|    6365|   null|918411.0|    PUBLIC|       area|         SHAFT|POLYGON ((3.88863...|   

A continuacion buscaremos todos los los valores nulos en el dataframe

In [97]:
from pyspark.sql.functions import col,isnan, when, count
df_main_filtrado.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_main_filtrado.columns]
   ).show()



+------------+-------+-----------+-------+--------+-------+-------+----------+-----------+--------------+--------+---------+------+
|apartment_id|site_id|building_id|plan_id|floor_id|unit_id|area_id|unit_usage|entity_type|entity_subtype|geometry|elevation|height|
+------------+-------+-----------+-------+--------+-------+-------+----------+-----------+--------------+--------+---------+------+
|      533477|      0|          0|      0|       0| 533477|2254731|         0|          0|             0|       0|        0|     0|
+------------+-------+-----------+-------+--------+-------+-------+----------+-----------+--------------+--------+---------+------+



                                                                                

Como podemos observar, debido a la gran cantidad de nulls y que no son tan rellevantes para el problema,podemos descartar a las columnas: apartment_id, unit_id, aread_id.

Ademas eliminaremos geometry con el simplificar un poco el modelo

In [98]:
df_main_filtrado = df_main_filtrado.drop(*('apartment_id','unit_id','area_id','geometry'))

In [99]:
df_main_filtrado.show()

+-------+-----------+-------+--------+----------+-----------+--------------+---------+------+
|site_id|building_id|plan_id|floor_id|unit_usage|entity_type|entity_subtype|elevation|height|
+-------+-----------+-------+--------+----------+-----------+--------------+---------+------+
|    692|       1460|   4185|    6365|    PUBLIC|       area|     STAIRCASE|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|       area|     STAIRCASE|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|       area|      ELEVATOR|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|       area|         SHAFT|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|       area|         SHAFT|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|    feature|        STAIRS|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|    feature|        STAIRS|     14.5|   2.6|
|    692|       1460|   4185|    6365|    PUBLIC|    feature

Volvemos a buscar valores null 

In [61]:
df_main_filtrado.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_main_filtrado.columns]
   ).show()



+-------+-----------+-------+--------+----------+-----------+--------------+---------+------+
|site_id|building_id|plan_id|floor_id|unit_usage|entity_type|entity_subtype|elevation|height|
+-------+-----------+-------+--------+----------+-----------+--------------+---------+------+
|      0|          0|      0|       0|         0|          0|             0|        0|     0|
+-------+-----------+-------+--------+----------+-----------+--------------+---------+------+



                                                                                

Y como no tenemos 0 ya casi estamos listos para ver la regresion logistica, solo faltaria darle formato a las columnas

In [100]:
from pyspark.ml.feature import StringIndexer,OneHotEncoder,StandardScaler

Gracias a las herramientas antes importadas, lo que haremos sera formatear todas la variables categoricas que tengamos para que puedan ser usadas en nuestro modelo 

In [101]:
categoricalColumns = ['entity_type','entity_subtype']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol,outputCol =  categoricalCol + 'Index')
    enconder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols = [categoricalCol + "classVec"])
    stages += [stringIndexer, enconder]

label_stringIdx = StringIndexer(inputCol='unit_usage',outputCol= 'label')

stages += [label_stringIdx]

numericCols = ['site_id','building_id','plan_id','floor_id','elevation','height']

assemblerInputs = [c + 'classVec' for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs,outputCol="vectorized_features")
stages += [assembler]
scaler = StandardScaler(inputCol= 'vectorized_features',outputCol="features")
stages+= [scaler]

In [105]:
df_main_filtrado.columns

['label',
 'features',
 'site_id',
 'building_id',
 'plan_id',
 'floor_id',
 'unit_usage',
 'entity_type',
 'entity_subtype',
 'elevation',
 'height']

Tambien hacemos uso de pipelines para nuestras nuevas columnas 

In [104]:
from pyspark.ml import Pipeline

cols = df_main_filtrado.columns
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_main_filtrado)
df_main_filtrado = pipelineModel.transform(df_main_filtrado)
selectedCols = ['label','features']+cols
df_main_filtrado = df_main_filtrado.select(selectedCols)

                                                                                

Imprimimos de nuevo el esquema

In [106]:
df_main_filtrado.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- site_id: integer (nullable = true)
 |-- building_id: integer (nullable = true)
 |-- plan_id: integer (nullable = true)
 |-- floor_id: integer (nullable = true)
 |-- unit_usage: string (nullable = true)
 |-- entity_type: string (nullable = true)
 |-- entity_subtype: string (nullable = true)
 |-- elevation: double (nullable = true)
 |-- height: double (nullable = true)



##### 4.3  Generacion del modelo


Primero dividimos todos nuestros datos en parte train, test

In [107]:
train, test = df_main_filtrado.randomSplit([.8,.2],seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Data Count: "+str(test.count()))

                                                                                

Training Dataset Count: 2425437




Test Data Count: 606787


                                                                                

In [108]:
train.groupBy("unit_usage").count().show()



+-----------+-------+
| unit_usage|  count|
+-----------+-------+
|RESIDENTIAL|1998835|
|     PUBLIC| 426602|
+-----------+-------+



                                                                                

Y aqui es donde vamos a entrenar a nuestro modelo de regresion logistica 

In [110]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol= 'features',labelCol='label',maxIter=5)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
#predictions.select('label','features','rawPrediction','prediction','probability').toPandas().head(5)



22/11/26 13:07:07 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Connecting to /10.25.65.197:39341 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:126)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecut

                                                                                



### 5.- Evaluacion del modelo



Finalmente para evaludar que el modelo sirva se hace uso de una herramienta conocida como BinaryClassificationEvaluator, que es el area debajo de ROC. RORC es la cuva de probabildiad y AUC es el grado de separabilidad. Esto siginifca que mientras mas alto AUC, mejor es el modelo para distinguir 

In [111]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC',evaluator.evaluate(predictions))

                                                                                

Test Area Under ROC 0.7503062738527361




22/11/26 13:11:17 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 3 retries)
java.io.IOException: Connecting to /10.25.65.197:39341 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:126)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecut

In [112]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)



Accuracy :  0.8445698408172885


                                                                                

Y finalmente lo probamos la precision del modelo

### 6.- Visualizacion de los datos usando la herramienta de Tableu 