<a href="https://colab.research.google.com/github/edinaldoab/challenge_data_science_2/blob/main/notebooks/semanas_03_e_04_recomendador.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Semanas 03 e 04

Durante as semanas 3 e 4 vamos focar em melhorar o sistema de recomendação da InsightPlaces.
Nosso sistema não está recebendo os clicks que esperava apenas recomendando imoveis das mesmas regiões e na mesma faixa de preço, por isso como parte do time de Data Science precisamos criar uma nova mecânica de recomendação de imóveis.

O objetivo é criar um sistema de recomendação de imóveis baseado em similaridade de características. A base de dados utilizada será a base de dados tratada na semana 2, após transformar as variáveis categóricas em variáveis binárias e antes de aplicarmos a vetorização.

# 1. Preparação do Ambiente

## a. Instalação das dependências

In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

## b. Inicialização da SparkSession

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("recomendador") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

## c. Montagem do Drive

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## d. Inicializando a  UI do Spark 

In [6]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [7]:
with open('/content/drive/MyDrive/Colab Notebooks/projeto_data_science_imobni/dados/authTokenngrok.txt', 'r') as file:
  content = file.read()
  file.close()

my_authtoken = str(content)

In [8]:
get_ipython().system_raw('./ngrok authtoken ' + my_authtoken)
get_ipython().system_raw('./ngrok http 4050 &')

!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [9]:
spark

Carregamento dos dados:

# 2. Carregamento e preparação dos dados

## a. Leitura do dataset

In [10]:
path='/content/drive/MyDrive/Colab Notebooks/projeto_data_science_imobni/dataset_ml_regressao'

data = spark.read.parquet(path)

In [11]:
data.show(5)

+--------------------+-----+---------+---------+-------+------+----+---------------+----------+-------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|         bairro|condominio|   iptu|    valor|zona_central|zona_norte|zona_oeste|zona_sul|Academia|animais_permitidos|Churrasqueira|condominio_fechado|Elevador|Piscina|Playground|portaria_24h|portao_eletronico|salao_de_festas|
+--------------------+-----+---------+---------+-------+------+----+---------------+----------+-------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|00790b85-56e3-43a...|   11|    166.0|        5|      4|     4|   2|    Jacarepaguá|    2100.0| 4600.0|1750000.0|     

In [12]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- andar: integer (nullable = true)
 |-- area_util: double (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- bairro: string (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)
 |-- zona_central: integer (nullable = true)
 |-- zona_norte: integer (nullable = true)
 |-- zona_oeste: integer (nullable = true)
 |-- zona_sul: integer (nullable = true)
 |-- Academia: integer (nullable = true)
 |-- animais_permitidos: integer (nullable = true)
 |-- Churrasqueira: integer (nullable = true)
 |-- condominio_fechado: integer (nullable = true)
 |-- Elevador: integer (nullable = true)
 |-- Piscina: integer (nullable = true)
 |-- Playground: integer (nullable = true)
 |-- portaria_24h: integer (nullable = true)
 |-- portao_eletronico: integer (nullable

In [14]:
print(f'Quantidade de linhas: {data.count()}')
print(f'Quantidade de colunas: {len(data.columns)}')

Quantidade de linhas: 66551
Quantidade de colunas: 25


Checagem de nulos: 

In [15]:
from pyspark.sql import functions as f

In [18]:
data.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in data.columns]).show()

+---+-----+---------+---------+-------+------+----+------+----------+----+-----+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
| id|andar|area_util|banheiros|quartos|suites|vaga|bairro|condominio|iptu|valor|zona_central|zona_norte|zona_oeste|zona_sul|Academia|animais_permitidos|Churrasqueira|condominio_fechado|Elevador|Piscina|Playground|portaria_24h|portao_eletronico|salao_de_festas|
+---+-----+---------+---------+-------+------+----+------+----------+----+-----+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|  0|    0|        0|        0|      0|     0|   0|     0|         0|   0|    0|           0|         0|         0|       0|       0|                 0|            0|                 0|       0|      0|         0|    

Nenhuma variável com atributo nulo, como previsto no último tratamento!

## b. Preparração dos dados

O modddelo de clustering a ser montado também demanda a vetorizaçõ dos dados, assim como os modelos de regressão demandavam:

In [27]:
from pyspark.ml.feature import VectorAssembler

In [19]:
data.columns

['id',
 'andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'bairro',
 'condominio',
 'iptu',
 'valor',
 'zona_central',
 'zona_norte',
 'zona_oeste',
 'zona_sul',
 'Academia',
 'animais_permitidos',
 'Churrasqueira',
 'condominio_fechado',
 'Elevador',
 'Piscina',
 'Playground',
 'portaria_24h',
 'portao_eletronico',
 'salao_de_festas']

Desta vez, todas as colunas serão utilizadas no modelo, exceto:
*  `id`: identificador único e 
*  `bairro`: não foi transformada em variável dummy por possuir muitas atribuições

Aqui, outra especificidade: a coluna `valor` não é removida para o modelo, pois o cluster será criado de acordo com um algoritmo não supervisionado, em que não é necessário uma variável alvo. 

In [25]:
input_col = [i for i in data.columns if i not in ['id', 'bairro']]

print(input_col)

['andar', 'area_util', 'banheiros', 'quartos', 'suites', 'vaga', 'condominio', 'iptu', 'valor', 'zona_central', 'zona_norte', 'zona_oeste', 'zona_sul', 'Academia', 'animais_permitidos', 'Churrasqueira', 'condominio_fechado', 'Elevador', 'Piscina', 'Playground', 'portaria_24h', 'portao_eletronico', 'salao_de_festas']


In [28]:
assembler = VectorAssembler(inputCols=input_col, outputCol='features')

In [29]:
data_prep = assembler.transform(data).select(['features'])

In [30]:
data_prep.show(truncate=False, n=5)

+------------------------------------------------------------------------------------------------------------+
|features                                                                                                    |
+------------------------------------------------------------------------------------------------------------+
|[11.0,166.0,5.0,4.0,4.0,2.0,2100.0,4600.0,1750000.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]|
|(23,[0,1,2,3,4,6,7,8,12,14,21],[4.0,640.0,5.0,11.0,3.0,3060.0,20030.0,3800000.0,1.0,1.0,1.0])               |
|[1.0,50.0,1.0,2.0,0.0,1.0,363.0,97.0,192000.0,0.0,1.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,0.0,1.0]      |
|(23,[1,2,3,4,5,6,7,8,12,17],[160.0,4.0,3.0,1.0,3.0,1530.0,7440.0,3490000.0,1.0,1.0])                        |
|(23,[1,2,3,4,5,8,11,15,18,19,22],[52.0,1.0,2.0,1.0,1.0,440000.0,1.0,1.0,1.0,1.0,1.0])                       |
+------------------------------------------------------------------------------------------------------------+
o

## c. Padronização dos dados

Com a classe `StandardScaler` do PySpark, pode-se padronizar os dados, fazendo com que adquiram média 0 e desvio padrão 1, de acordo com a fórmula *z-score*:

<center>
$x_{i} = \dfrac{x_{i} - \mu}{\sigma}$
</center>

em que a $\mu$ representa a média aritmética do conjunto e $\sigma$ seu desvio padrão.

In [31]:
from pyspark.ml.feature import StandardScaler

In [33]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(data_prep)
data_scaler = scaler_model.transform(data_prep)

In [35]:
data_scaler.show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                    |scaled_features                                                                                                                                                                                                                                                                                                                                                                                     |


## d. Redução de dimensionalidade

Para gerar uma combinação linear de todas as colunas e assim diminuir a dimensão do sistema, realiza-se a redução de dimensionalidade através da classe `PCA`:

In [37]:
from pyspark.ml.feature import PCA

In [39]:
pca = PCA(k=2, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(data_scaler)
data_pca = model_pca.transform(data_scaler)

In [40]:
data_pca.select('pca_features').show(5, truncate=False)

+-----------------------------------------+
|pca_features                             |
+-----------------------------------------+
|[-7.914179810286645,-5.023192740643855]  |
|[-2.9387320766371854,-11.039000981915969]|
|[-5.091216714107638,-0.14189723285462086]|
|[-1.3047552598967092,-5.1610096090868485]|
|[-3.394864973027069,-1.895405603870326]  |
+-----------------------------------------+
only showing top 5 rows



# Criação de um pipeline de transformação

A implementação da classe `Pipeline` facilita muito a forma como se estabelece um conjunto de transformações de dados nos scripts PySpark: 

In [41]:
from pyspark.ml import Pipeline

Como se viu nas células anteriores, o pipeline deste projeto é constituído pelos seguintes estágios:

`vectorr_assembler` > `standard_scaler` > `pca`

In [42]:
pca_pipeline = Pipeline(stages=[assembler, scaler, pca])

In [44]:
pca_pipeline_model = pca_pipeline.fit(data)

In [47]:
data_pca = pca_pipeline_model.transform(data)

In [48]:
data_pca.show(5)

+--------------------+-----+---------+---------+-------+------+----+---------------+----------+-------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+--------------------+--------------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|         bairro|condominio|   iptu|    valor|zona_central|zona_norte|zona_oeste|zona_sul|Academia|animais_permitidos|Churrasqueira|condominio_fechado|Elevador|Piscina|Playground|portaria_24h|portao_eletronico|salao_de_festas|            features|     scaled_features|        pca_features|
+--------------------+-----+---------+---------+-------+------+----+---------------+----------+-------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------

Resultado da redução:

In [50]:
data_pca.select('pca_features').show(5, truncate=False)

+-----------------------------------------+
|pca_features                             |
+-----------------------------------------+
|[-7.914179810286645,-5.023192740643855]  |
|[-2.9387320766371854,-11.039000981915969]|
|[-5.091216714107638,-0.14189723285462086]|
|[-1.3047552598967092,-5.1610096090868485]|
|[-3.394864973027069,-1.895405603870326]  |
+-----------------------------------------+
only showing top 5 rows



#Clusterização dos dados com o algoritmo KMeans

Com a classe `KMeans` implementa-se o algoritmo que será responsável por agrupar os dados que possuem características semelhantes (cluster).

In [51]:
from pyspark.ml.clustering import KMeans

In [52]:
SEED = 1224

O algoritmo `KMeans` precisa das seguintes entradas:

*  `featuresCol`: coluna dos dados a serem clusterizados
*  `predictionCol`: coluna de clusters a que cada dado pertence
*  `k`: número de cluster a serem criados
*  `seed`: número para geração de números pseudoaletórios


In [53]:
kmeans = KMeans(featuresCol='pca_features', predictionCol='cluster_pca', k=5, seed=SEED)

In [54]:
model_kmeans = kmeans.fit(data_pca)

In [55]:
predictions_kmeans = model_kmeans.transform(data_pca)

In [56]:
predictions_kmeans.select('pca_features', 'cluster_pca')\
    .show(5, truncate=False)

+-----------------------------------------+-----------+
|pca_features                             |cluster_pca|
+-----------------------------------------+-----------+
|[-7.914179810286645,-5.023192740643855]  |1          |
|[-2.9387320766371854,-11.039000981915969]|3          |
|[-5.091216714107638,-0.14189723285462086]|2          |
|[-1.3047552598967092,-5.1610096090868485]|3          |
|[-3.394864973027069,-1.895405603870326]  |4          |
+-----------------------------------------+-----------+
only showing top 5 rows



## a. Análise dos clusters criados

A query a seguir mostra um resumo para informações dos clusters de acordo com as informações dos cômodos e de tamanho do imóvel:

In [57]:
data_pca\
    .join(predictions_kmeans.select('id', 'cluster_pca'), on='id')\
    .groupBy('cluster_pca')\
    .agg(
        f.count('id').alias('quantidade'),
        f.mean('valor').alias('valor_medio'),
        f.mean('area_util').alias('area_media'),
        f.round(f.mean('quartos'),0).alias('quartos_medio'),
        f.round(f.mean('vaga'), 0).alias('vagas_medio'),
        f.round(f.mean('banheiros'), 0).alias('banheiros_medio'),
        f.round(f.mean('suites'), 0).alias('suites_medio'),
        f.mean('condominio').alias('condominio_medio'),
        f.mean('iptu').alias('iptu_medio'),
    )\
    .orderBy('cluster_pca')\
    .show()

+-----------+----------+------------------+------------------+-------------+-----------+---------------+------------+------------------+------------------+
|cluster_pca|quantidade|       valor_medio|        area_media|quartos_medio|vagas_medio|banheiros_medio|suites_medio|  condominio_medio|        iptu_medio|
+-----------+----------+------------------+------------------+-------------+-----------+---------------+------------+------------------+------------------+
|          0|     17674| 808818.4721059183| 84.18343329184113|          2.0|        1.0|            2.0|         0.0|1672.2981215344573|1728.4869299536042|
|          1|      5187| 3847158.789666474| 282.5008675534991|          4.0|        3.0|            5.0|         3.0| 16862.68016194332|16832.253518411413|
|          2|     20501| 856104.6322618409| 90.19145407541096|          2.0|        1.0|            2.0|         1.0| 4294.944929515634|3311.0277547436713|
|          3|      8300|2702813.2746987953|207.83048192771085|  

## b. Pesquisa pelo número ideal de componentes após a transformação PCA

Como atividade extra, verifica-se a quantidade ideal de componentes que os dados devem possuir após a transformação PCA. Para isto, vale-se da medida de **variância explicada**.

In [58]:
pca_pipeline_model.stages[2].explainedVariance

DenseVector([0.2655, 0.1721])

In [59]:
k = len(input_col)

In [60]:
print(k)

23


In [61]:
data_pca.select('pca_features').show(5, truncate=False)

+-----------------------------------------+
|pca_features                             |
+-----------------------------------------+
|[-7.914179810286645,-5.023192740643855]  |
|[-2.9387320766371854,-11.039000981915969]|
|[-5.091216714107638,-0.14189723285462086]|
|[-1.3047552598967092,-5.1610096090868485]|
|[-3.394864973027069,-1.895405603870326]  |
+-----------------------------------------+
only showing top 5 rows



In [63]:
pca = PCA(k=k, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(data_pca.drop('pca_features'))
data_imoveis_pca = model_pca.transform(data_pca.drop('pca_features'))

In [64]:
data_imoveis_pca.select("pca_features").show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+---------------------------------------------------------------------

In [66]:
round(sum(model_pca.explainedVariance)*100)

100

As 23 colunas explicam  100% dos daods.

In [67]:
model_pca.explainedVariance[:]

array([2.65457494e-01, 1.72067301e-01, 9.12744818e-02, 5.43737219e-02,
       5.22298117e-02, 4.66203459e-02, 4.42659008e-02, 4.16021312e-02,
       3.46818214e-02, 2.72266440e-02, 2.43947461e-02, 2.01043117e-02,
       1.92103152e-02, 1.76122901e-02, 1.55070658e-02, 1.38947785e-02,
       1.20074124e-02, 1.13351577e-02, 1.01096341e-02, 9.23012235e-03,
       8.89844650e-03, 7.89606599e-03, 8.58369421e-15])

Soma cumulativa (`cumsum`) dos valores de variância para cada componente principal:

In [68]:
import numpy as np

In [69]:
lista_valores = np.cumsum(model_pca.explainedVariance[:])

In [70]:
print(lista_valores)

[0.26545749 0.43752479 0.52879928 0.583173   0.63540281 0.68202316
 0.72628906 0.76789119 0.80257301 0.82979965 0.8541944  0.87429871
 0.89350903 0.91112132 0.92662838 0.94052316 0.95253057 0.96386573
 0.97397537 0.98320549 0.99210393 1.         1.        ]


Com o intuito de se explicar até 70% dos dados, atualiza-se o valor de k:

In [73]:
k = sum(lista_valores <= 0.7)

print(f'Novo k: {k}')

Novo k: 6


E atualiza-se a redução também:

In [75]:
pca = PCA(k=k, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(data_pca.drop('pca_features'))
data_imoveis_pca_final = model_pca.transform(data_pca.drop('pca_features'))

In [76]:
data_imoveis_pca_final.select('pca_features').show(truncate=False, n=5)

+-----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------+
|[-7.914179810286645,-5.023192740643855,-1.0110361124443572,0.9748710818561165,-0.2847670025663571,-0.2961858190165721] |
|[-2.9387320766371854,-11.039000981915969,-4.038490438016008,2.187986153253321,-0.4378607661628024,-0.09260027190872544]|
|[-5.091216714107638,-0.14189723285462086,-0.14369304838874064,2.7753925806355486,0.1370110834766389,1.0604617546093669]|
|[-1.3047552598967092,-5.1610096090868485,-2.151348198589035,0.5930678901415247,-0.04180116151994076,1.1330543488639435]|
|[-3.394864973027069,-1.895405603870326,2.041885447153389,0.021934892842320086,0.08521905610799077,0.9684732130864777]  |
+-----------------------

Validando a porcentagem de dados explicados pelo conjunto principal:

In [77]:
sum(model_pca.explainedVariance) *100

68.20231562211643

Novo pipeline:

In [78]:
pca_pipeline = Pipeline(stages=[assembler, scaler, pca])

In [81]:
model_pca_pipeline = pca_pipeline.fit(data)

In [82]:
projection = model_pca_pipeline.transform(data)

In [83]:
projection.select('pca_features').show(truncate=False, n=5)

+-----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------+
|[-7.914179810286645,-5.023192740643855,-1.0110361124443572,0.9748710818561165,-0.2847670025663571,-0.2961858190165721] |
|[-2.9387320766371854,-11.039000981915969,-4.038490438016008,2.187986153253321,-0.4378607661628024,-0.09260027190872544]|
|[-5.091216714107638,-0.14189723285462086,-0.14369304838874064,2.7753925806355486,0.1370110834766389,1.0604617546093669]|
|[-1.3047552598967092,-5.1610096090868485,-2.151348198589035,0.5930678901415247,-0.04180116151994076,1.1330543488639435]|
|[-3.394864973027069,-1.895405603870326,2.041885447153389,0.021934892842320086,0.08521905610799077,0.9684732130864777]  |
+-----------------------

Cálculo do Silhoute Score

> Silhoutte Score é uma métrica usada para calculara o quão bom está a clusterização utilizada. Esse valor varia de -1 a 1.

*  1: Significa que os clusters estão bem separados uns dos outros.

*  0: Significa que os clusters são indiferentes, ou podemos dizer que a distância entre os clusters não é significativa.

*  -1: Significa que os clusters são atribuídos da maneira errada.

Reprodução: Alura

Aumentando-se a quantidade de cluster para 50, garante-se que todos sejam mais bem definidos, uma vez que a volumetria de dados é grande.

In [84]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]

evaluator = ClusteringEvaluator(predictionCol='cluster_pca', featuresCol='pca_features', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette_score = {}

for i in range(2,51):
    
    KMeans_algo = KMeans(k=i, featuresCol='pca_features', predictionCol='cluster_pca', seed=SEED)
    
    KMeans_fit = KMeans_algo.fit(projection)
    
    output = KMeans_fit.transform(projection)
    
    score = evaluator.evaluate(output)
    silhouette_score[i] = score

In [86]:
silhouette_score

{2: 0.4806427415486872,
 3: 0.49203887905769156,
 4: 0.4191525822925099,
 5: 0.44274988480339267,
 6: 0.4315961667254472,
 7: 0.4271389330967459,
 8: 0.48762369061277383,
 9: 0.46664547667636025,
 10: 0.44835129159163506,
 11: 0.4367608779937447,
 12: 0.473656680512776,
 13: 0.4601000892125327,
 14: 0.508133224849908,
 15: 0.49105002889659577,
 16: 0.5094277635859876,
 17: 0.5074604040822153,
 18: 0.501428736996892,
 19: 0.4935339599833687,
 20: 0.5156679386505055,
 21: 0.49304150672411695,
 22: 0.5296317789910818,
 23: 0.5094133207960251,
 24: 0.5155370675485829,
 25: 0.5314632354412274,
 26: 0.5208086043856796,
 27: 0.49139656446765295,
 28: 0.5100374807517398,
 29: 0.5210941580269356,
 30: 0.5142727228943784,
 31: 0.49870307713458223,
 32: 0.48135266233898655,
 33: 0.5145046203612771,
 34: 0.5089193027984322,
 35: 0.5175029147861618,
 36: 0.5110855838904148,
 37: 0.4953987374557303,
 38: 0.47604207188868086,
 39: 0.5204852078331769,
 40: 0.48429500229406935,
 41: 0.4883790145856585,

In [87]:
k_best = [key for key, value in silhouette_score.items() if value == max(silhouette_score.values())]
k_best[0]

25

In [89]:
kmeans = KMeans(k=k_best[0], featuresCol='pca_features', predictionCol='cluster_pca', seed=SEED)
modelo_kmeans = kmeans.fit(projection)
projection_kmeans = modelo_kmeans.transform(projection)

In [90]:
projection_kmeans.select(['pca_features','cluster_pca', "id"]).show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------+-----------+------------------------------------+
|pca_features                                                                                                              |cluster_pca|id                                  |
+--------------------------------------------------------------------------------------------------------------------------+-----------+------------------------------------+
|[-7.914179810286645,-5.023192740643855,-1.0110361124443572,0.9748710818561165,-0.2847670025663571,-0.2961858190165721]    |21         |00790b85-56e3-43a5-a499-9e4c3708b95c|
|[-2.9387320766371854,-11.039000981915969,-4.038490438016008,2.187986153253321,-0.4378607661628024,-0.09260027190872544]   |5          |007f8099-8e1d-45f6-9cdf-dee58ea462f4|
|[-5.091216714107638,-0.14189723285462086,-0.14369304838874064,2.7753925806355486,0.1370110834766389,1.0604617546093669]   |12    

## c. Visualização de um cluster

Com o intuito de verificar a validação de um dos clusteres, escolhe-se um id aleatório e, assim, mostra-se o cluster em que ele está alocado:

In [91]:
id_imovel = '04171318-00f9-4306-b404-2a07b0a9b977'

In [92]:
projection_kmeans\
    .filter(projection_kmeans.id == id_imovel)\
    .select('cluster_pca')\
    .show()

+-----------+
|cluster_pca|
+-----------+
|          1|
+-----------+



In [93]:
cluster = projection_kmeans\
    .filter(projection_kmeans.id == id_imovel)\
    .select('cluster_pca')\
    .collect()[0][0]

In [94]:
print(cluster)

1


In [95]:
recomendacoes = projection_kmeans\
    .filter(projection_kmeans.cluster_pca == cluster)\
    .select('bairro', 'id', 'pca_features')

In [97]:
recomendacoes.show(8, truncate=False)

+------------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|bairro                  |id                                  |pca_features                                                                                                              |
+------------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|Recreio dos Bandeirantes|04171318-00f9-4306-b404-2a07b0a9b977|[-6.386073506411739,-1.7594200413402494,-0.8939008490586634,0.2697491037526369,-0.23866412022884329,-0.5796882609888405]  |
|Recreio dos Bandeirantes|0a35241d-3051-40c4-a2d9-669d32e7ac14|[-6.910302164356212,-1.1782853321853928,-0.6266002946841187,0.20466208470220532,-0.13689591300900014,0.044564760603640785]|
|Recreio dos Bandeirantes|0ed4db7a-f9a8-4857-bcd5-9f09ed7ab46b|[-

#Finalização

## a. Busca pelas 10 melhores recomendações

Com o cálculo da distância euclidiana entre os elementos de um cluster, é possível elencar as 10 melhores recomendações. Para este teste, ainda se vale do `id` aleatório `id_imovel`.

In [99]:
imovel_procurado = recomendacoes\
    .filter(recomendacoes.id == id_imovel)\
    .select('pca_features')\
    .collect()[0][0]

In [101]:
print(f'Vetor: {imovel_procurado}')

Vetor: [-6.386073506411739,-1.7594200413402494,-0.8939008490586634,0.2697491037526369,-0.23866412022884329,-0.5796882609888405]


In [102]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType

In [103]:
def calculate_euclidean_distance(imovel, valor):
    return euclidean(imovel, valor)

euclidean_udf = f.udf(lambda x: calculate_euclidean_distance(imovel_procurado, x), FloatType())

recomendacoes\
    .withColumn('distancia', euclidean_udf('pca_features'))\
    .select('bairro', 'id', 'distancia')\
    .orderBy('distancia')\
    .show(5)

+--------------------+--------------------+-----------+
|              bairro|                  id|  distancia|
+--------------------+--------------------+-----------+
|Recreio dos Bande...|04171318-00f9-430...|        0.0|
|Recreio dos Bande...|3dc65040-ff74-403...|0.015714277|
|         Jacarepaguá|9c8a005d-e127-4d8...|0.016470006|
|Recreio dos Bande...|c7dc293c-3a36-4db...|0.016532231|
|Freguesia (Jacare...|01a5aae1-6767-463...|0.020644499|
+--------------------+--------------------+-----------+
only showing top 5 rows



In [106]:
top_recomendacoes = recomendacoes\
    .withColumn('distancia', euclidean_udf('pca_features'))\
    .select('bairro', 'id', 'distancia')\
    .orderBy('distancia')

top_recomendacoes.show(10, truncate=False)

+------------------------+------------------------------------+-----------+
|bairro                  |id                                  |distancia  |
+------------------------+------------------------------------+-----------+
|Recreio dos Bandeirantes|04171318-00f9-4306-b404-2a07b0a9b977|0.0        |
|Recreio dos Bandeirantes|3dc65040-ff74-4034-af77-6b5f575ce8b6|0.015714277|
|Jacarepaguá             |9c8a005d-e127-4d8b-9c44-da0523637e5f|0.016470006|
|Recreio dos Bandeirantes|c7dc293c-3a36-4db8-8a91-3a9d37f55aaa|0.016532231|
|Freguesia (Jacarepaguá) |01a5aae1-6767-4634-9e87-490724b6ed90|0.020644499|
|Freguesia (Jacarepaguá) |f3e29161-e611-48d1-9678-82210822855a|0.023297567|
|Pechincha               |31685d3f-3b08-4027-8669-d34adb46edce|0.026400762|
|Freguesia (Jacarepaguá) |624060ee-2412-4d90-93f0-9f86c87d0898|0.03780748 |
|Pechincha               |a68a378a-a0ec-4d0f-a762-22ab663cdf85|0.038690396|
|Freguesia (Jacarepaguá) |5c253135-5609-4d63-a330-fa5b9a6fdabc|0.042061623|
+-----------

## b. Disponibilização da ref. de função recomendadora para o time de dev 

Consolidando tudo o que foi discutido e implementado, chega-se à função que o time de desenvolvimento usará como referência para o recomendador:

In [107]:
def calculate_euclidean_distance(imovel, valor):
    return euclidean(imovel, valor)


def recommender(id_imovel, dataframe_kmeans):
    cluster = dataframe_kmeans\
        .filter(dataframe_kmeans.id == id_imovel)\
        .select('cluster_pca')\
        .collect()[0][0]

    imoveis_recomendadas = dataframe_kmeans\
        .filter(dataframe_kmeans.cluster_pca == cluster)

    imovel_procurado = imoveis_recomendadas\
        .filter(imoveis_recomendadas.id == id_imovel)\
        .select('pca_features')\
        .collect()[0][0]

    euclidean_udf = f.udf(lambda x: calculate_euclidean_distance(
        imovel_procurado, x), FloatType())

    colunas_nao_utilizadas = [
        'features', 'scaled_features', 'pca_features', 'cluster_pca', 'distancia']

    recomendadas = imoveis_recomendadas\
        .withColumn('distancia', euclidean_udf('pca_features'))\
        .select([col for col in imoveis_recomendadas.columns if col not in colunas_nao_utilizadas])\
        .orderBy('distancia')

    return recomendadas

Realiza-se um teste para recomendar imóveis parecidos com o imóvel `id = '00b23c6d-0e9d-4be3-a4cc-cf64ca06f3d2'`

In [110]:
id_validacao = '00b23c6d-0e9d-4be3-a4cc-cf64ca06f3d2'

In [138]:
recommender(id_validacao, projection_kmeans)\
    .select('id', 'bairro')\
    .show(10, truncate=False)

+------------------------------------+---------------+
|id                                  |bairro         |
+------------------------------------+---------------+
|00b23c6d-0e9d-4be3-a4cc-cf64ca06f3d2|Parada de Lucas|
|03b3fee4-fd36-48c5-8911-00bfdfc79762|Tomás Coelho   |
|db0ad902-d0d0-4233-8a67-63428d698023|Todos os Santos|
|69e53fc2-2885-44d5-a675-9d09750f879a|Cachambi       |
|f32d7f78-f06f-4b43-9d98-62b3d7dc5d39|Tijuca         |
|d51fd7fc-ab7e-4755-9e91-0fb347ed1541|São Cristóvão  |
|b5ac5c83-4d2f-44f7-b1b6-97764decf782|Vila Isabel    |
|d7dfc69b-b974-4301-9cde-61ea052423c8|Engenho Novo   |
|2e0a73cb-5c7a-4cf8-8ded-58d0ccab3cba|Todos os Santos|
|5eb84915-78e1-4748-8b0a-a1feba703210|Andaraí        |
+------------------------------------+---------------+
only showing top 10 rows

