# Preparando o Ambiente

Instalando o PySpark, SparkSession, montando o drive e lendo os dados que foram tratados na última semana ao preparar os dados para a Regressão.

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=fe873ca4862a6c673a3fb3f8e1c996ec80723246c1fb53079880fca7ce2fd775
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]') \
        .appName("Challenge da Alura") \
        .getOrCreate()

spark

Montando o Drive

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Adicionando os Dados


In [5]:
dados = spark.read.parquet('/content/drive/MyDrive/Challenge_Pyspark_Alura/Data/Dataset_ML_Regressao')

In [6]:
dados.printSchema()

root
 |-- id: string (nullable = true)
 |-- andar: integer (nullable = true)
 |-- area_util: double (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- vaga: long (nullable = true)
 |-- bairro: string (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)
 |-- Zona Central: integer (nullable = true)
 |-- Zona Norte: integer (nullable = true)
 |-- Zona Oeste: integer (nullable = true)
 |-- Zona Sul: integer (nullable = true)
 |-- Academia: integer (nullable = true)
 |-- Animais permitidos: integer (nullable = true)
 |-- Churrasqueira: integer (nullable = true)
 |-- Condomínio fechado: integer (nullable = true)
 |-- Elevador: integer (nullable = true)
 |-- Piscina: integer (nullable = true)
 |-- Playground: integer (nullable = true)
 |-- Portaria 24h: integer (nullable = true)
 |-- Portão eletrônico: integer (nullable = 

In [7]:
dados.show(5)

+--------------------+-----+---------+---------+-------+------+----+------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|      bairro|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+--------------------+-----+---------+---------+-------+------+----+------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|00002dd9-cc74-480...|    2|     35.0|        1|      1|     0|   0|Santo Cristo|     100.0| 100.0| 245000.0|           1|        

# Preparando os dados

Aplicando as transfromações para utilizar os dados nos modelos de clustering usando o Vector Assembler.

In [8]:
from pyspark.ml.feature import VectorAssembler

In [9]:
x = dados.columns
x.remove('id')
x.remove('bairro')
x

['andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'condominio',
 'iptu',
 'valor',
 'Zona Central',
 'Zona Norte',
 'Zona Oeste',
 'Zona Sul',
 'Academia',
 'Animais permitidos',
 'Churrasqueira',
 'Condomínio fechado',
 'Elevador',
 'Piscina',
 'Playground',
 'Portaria 24h',
 'Portão eletrônico',
 'Salão de festas']

In [10]:
dados_vector = VectorAssembler(inputCols=x, outputCol='features').transform(dados).select(['features', 'bairro'])

In [11]:
dados_vector.select('features').show(truncate=False, n=5)

+-----------------------------------------------------------------------------------------------------------+
|features                                                                                                   |
+-----------------------------------------------------------------------------------------------------------+
|[2.0,35.0,1.0,1.0,0.0,0.0,100.0,100.0,245000.0,1.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]    |
|[5.0,169.0,4.0,4.0,2.0,2.0,998.0,2600.0,955000.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]  |
|[1.0,82.0,2.0,2.0,1.0,1.0,736.0,998.0,280000.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,1.0]    |
|(23,[1,2,3,4,5,6,7,8,11,13,15,17,18,19],[50.0,2.0,2.0,1.0,1.0,504.0,50.0,249900.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[3.0,70.0,2.0,2.0,2.0,1.0,1.0,1.0,1350000.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]       |
+-----------------------------------------------------------------------------------------------------------+
only showi

# Padronizando os dados

Padronizando os dados para conseguir utilizar o PCA. Uutilizando a classe StandardScaler do PySpark para realizar esse processo. 

In [12]:
from pyspark.ml.feature import StandardScaler

In [13]:
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
model_scaler = scaler.fit(dados_vector)
dados_moveis_scaler = model_scaler.transform(dados_vector)

In [14]:
dados_moveis_scaler.show()

+--------------------+--------------------+--------------------+
|            features|              bairro|     features_scaled|
+--------------------+--------------------+--------------------+
|[2.0,35.0,1.0,1.0...|        Santo Cristo|[0.13607726247524...|
|[5.0,169.0,4.0,4....|                Anil|[0.34019315618810...|
|[1.0,82.0,2.0,2.0...|             Taquara|[0.06803863123762...|
|(23,[1,2,3,4,5,6,...|         Jacarepaguá|(23,[1,2,3,4,5,6,...|
|[3.0,70.0,2.0,2.0...|               Gávea|[0.20411589371286...|
|(23,[0,1,2,3,5,6,...|              Tijuca|(23,[0,1,2,3,5,6,...|
|(23,[1,2,3,4,5,6,...|Recreio dos Bande...|(23,[1,2,3,4,5,6,...|
|(23,[1,2,3,6,7,8,...|        Santa Teresa|(23,[1,2,3,6,7,8,...|
|(23,[1,2,3,5,6,7,...|           Pechincha|(23,[1,2,3,5,6,7,...|
|[0.0,35.0,1.0,1.0...|              Leblon|[0.0,0.3928347118...|
|(23,[1,2,3,4,5,6,...|Recreio dos Bande...|(23,[1,2,3,4,5,6,...|
|(23,[1,2,3,4,5,6,...|              Tijuca|(23,[1,2,3,4,5,6,...|
|(23,[1,2,3,4,5,6,...|Fre

# Redução de dimensionalidade

Para criar o modelo de recomendação, precisa reduzir a dimensão dos nossos dados. Para fazer isso, utilizei a técnica chamada PCA.

Aplicando essa técnica, teremos um novo conjunto de dados onde as colunas serão uma combinação linear das colunas originais.

Para fazer a redução da dimensionalidade, ou seja, reduzir o número de colunas, podemos utilizar a classe PCA do PySpark. 

Após a transformação com o PCA, podemos utilizar o método **explainedVariance** que retorna a variância explicada por cada componente principal.

In [15]:
from pyspark.ml.feature import PCA

In [16]:
k = len(x)
k

23

In [17]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_moveis_scaler)
dados_moveis_pca = model_pca.transform(dados_moveis_scaler)

In [18]:
dados_moveis_pca.select('pca_features').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------------------------------------------------

# Extra: Encontrar o número ideal de componentes


Utilizando o método explainedVariance que retorna a variância explicada por cada componente para encontrar o número ideal de componentes principais que os dados devem possuir após a transformação com o PCA.

In [19]:
sum(model_pca.explainedVariance) * 100

99.99999999999997

In [20]:
lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
lista_valores

[0.2654574936099732,
 0.4375247949776239,
 0.5287992767783327,
 0.5831729986362465,
 0.6354028103524776,
 0.6820231562211663,
 0.7262890569978606,
 0.7678911881686642,
 0.8025730095613361,
 0.8297996535128314,
 0.854194399626071,
 0.8742987112956455,
 0.8935090265260472,
 0.9111213166740545,
 0.926628382434864,
 0.9405231608927341,
 0.9525305733295556,
 0.9638657310284781,
 0.9739753651565548,
 0.9832054875066132,
 0.9921039340092769,
 0.9999999999999951,
 0.9999999999999998]

In [21]:
import numpy as np

In [22]:
K = sum(np.array(lista_valores) <= 0.7)
K

6

In [23]:
pca = PCA(k=K, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_moveis_scaler)
dados_moveis_pca_final = model_pca.transform(dados_moveis_scaler)

In [24]:
dados_moveis_pca_final.select('pca_features').show(truncate=False, n=5)

+------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                            |
+------------------------------------------------------------------------------------------------------------------------+
|[-6.1651250495338035,1.3380985269405783,-1.7052299823820085,-0.5338289630560942,0.08903815478586295,-0.3134396180113537]|
|[-7.524672388434003,-3.8085058714021036,-0.8611234467676232,0.8708163848549987,-0.2490353016571682,-0.2218027235525904] |
|[-5.764448661386763,-1.4404347671314566,-0.39044571273099243,0.2764893984765971,-0.128001622939613,0.06220040040452328] |
|[-4.020466262019256,-2.0336728307621184,1.569423064692716,-0.027868375525824413,0.08506936638624416,1.1097064613413807] |
|[-6.383393688088925,-1.6478698820412068,-2.584854648625932,0.258497469477115,0.007853689403455273,1.8271740465159787]   |
+---------------

In [25]:
sum(model_pca.explainedVariance) *100

68.20231562211663

# Extra: Transformando os dados usando Pipeline

Construir um Pipeline para facilitar a aplicação das transformações nos dados e ele é um conjunto de transformações aplicadas sequencialmente no conjunto de dados.

In [68]:
from pyspark.ml import Pipeline

In [27]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=x, outputCol='features'),
                                 StandardScaler(inputCol='features', outputCol='features_scaled'),
                                 PCA(k=6, inputCol='features_scaled', outputCol='pca_features')])

In [28]:
model_pca_pipeline = pca_pipeline.fit(dados)

In [29]:
projection = model_pca_pipeline.transform(dados)

In [30]:
projection.select('pca_features').show(truncate=False, n=5)

+------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                            |
+------------------------------------------------------------------------------------------------------------------------+
|[-6.1651250495338035,1.3380985269405783,-1.7052299823820085,-0.5338289630560942,0.08903815478586295,-0.3134396180113537]|
|[-7.524672388434003,-3.8085058714021036,-0.8611234467676232,0.8708163848549987,-0.2490353016571682,-0.2218027235525904] |
|[-5.764448661386763,-1.4404347671314566,-0.39044571273099243,0.2764893984765971,-0.128001622939613,0.06220040040452328] |
|[-4.020466262019256,-2.0336728307621184,1.569423064692716,-0.027868375525824413,0.08506936638624416,1.1097064613413807] |
|[-6.383393688088925,-1.6478698820412068,-2.584854648625932,0.258497469477115,0.007853689403455273,1.8271740465159787]   |
+---------------

# Criando os Clusters com Kmeans

O KMeans é um algoritmo de aprendizado não supervisionado que possui o objetivo de encontrar grupos de dados que possuem características semelhantes e colocá-los em um mesmo cluster. 

In [67]:
from pyspark.ml.clustering import KMeans

In [31]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca')

In [32]:
modelo_kmeans = kmeans.fit(projection)

In [33]:
projetion_kmeans = modelo_kmeans.transform(projection) 

In [34]:
projetion_kmeans.select(['pca_features','cluster_pca']).show()

+--------------------+-----------+
|        pca_features|cluster_pca|
+--------------------+-----------+
|[-6.1651250495338...|          4|
|[-7.5246723884340...|         36|
|[-5.7644486613867...|          1|
|[-4.0204662620192...|         14|
|[-6.3833936880889...|         34|
|[-3.2529111812184...|         47|
|[-1.8302469503590...|         43|
|[-1.0611769329629...|         49|
|[-1.9958119007838...|         44|
|[-4.0420619793314...|         23|
|[-1.2282955688173...|         16|
|[-0.4181406070145...|          5|
|[-1.0739321850406...|         16|
|[-7.0282944130052...|         38|
|[-0.6012168547037...|         28|
|[-0.8789218397962...|         28|
|[-6.3956345386992...|         33|
|[-0.0574094927862...|          0|
|[-6.9015238649802...|          9|
|[-0.7164221492881...|          5|
+--------------------+-----------+
only showing top 20 rows



In [35]:
from pyspark.ml.functions import vector_to_array

In [36]:
projetion_kmeans = projetion_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
                                   .withColumn('y', vector_to_array('pca_features')[1])

In [37]:
projetion_kmeans.select(['x', 'y', 'cluster_pca', 'bairro']).show()

+--------------------+--------------------+-----------+--------------------+
|                   x|                   y|cluster_pca|              bairro|
+--------------------+--------------------+-----------+--------------------+
| -6.1651250495338035|  1.3380985269405783|          4|        Santo Cristo|
|  -7.524672388434003| -3.8085058714021036|         36|                Anil|
|  -5.764448661386763| -1.4404347671314566|          1|             Taquara|
|  -4.020466262019256| -2.0336728307621184|         14|         Jacarepaguá|
|  -6.383393688088925| -1.6478698820412068|         34|               Gávea|
|  -3.252911181218482| -1.1179591836228746|         47|              Tijuca|
| -1.8302469503590988|  -6.681699215588388|         43|Recreio dos Bande...|
| -1.0611769329629523| -1.6685040058694276|         49|        Santa Teresa|
|  -1.995811900783876| -1.7655796610847758|         44|           Pechincha|
| -4.0420619793314945|-0.34692123377142114|         23|              Leblon|

# Extra: Avaliando as informações de cada cluster


Avaliando os clusters de acordo com as informações dos imóveis. Por exemplo: quantidade banheiros, quartos e tamanho do imóvel.

In [70]:
projection\
    .join(projetion_kmeans.select('id', 'cluster_pca'), on='id')\
    .groupBy('cluster_pca')\
    .agg(
        f.count('id').alias('quantidade'),
        f.mean('valor').alias('valor_medio'),
        f.mean('area_util').alias('area_media'),
        f.round(f.mean('quartos'),0).alias('quartos_medio'),
        f.round(f.mean('vaga'), 0).alias('vagas_medio'),
        f.round(f.mean('banheiros'), 0).alias('banheiros_medio'),
        f.round(f.mean('suites'), 0).alias('suites_medio'),
        f.mean('condominio').alias('condominio_medio'),
        f.mean('iptu').alias('iptu_medio'),
    )\
    .orderBy('cluster_pca')\
    .show()

+-----------+----------+------------------+------------------+-------------+-----------+---------------+------------+------------------+------------------+
|cluster_pca|quantidade|       valor_medio|        area_media|quartos_medio|vagas_medio|banheiros_medio|suites_medio|  condominio_medio|        iptu_medio|
+-----------+----------+------------------+------------------+-------------+-----------+---------------+------------+------------------+------------------+
|          0|      2666| 780112.8679669917|  62.6072768192048|          2.0|        0.0|            1.0|         0.0|1823.9062265566392|1159.9478619654915|
|          1|      2262| 641429.8974358974| 78.25641025641026|          2.0|        1.0|            2.0|         1.0| 870.0534924845269|1920.4266136162687|
|          2|       777|  3965997.77992278|237.38095238095238|          4.0|        2.0|            4.0|         2.0|2466.8159588159588| 4571.371943371943|
|          3|      3109|1072260.7867481506|  118.117722740431|  

# Extra: Representação do Cluster

Usei o **plotly.express** para fazer uma analise do Cluster assim como tinha aprendido em um curso que era sobre esse assunto/tema. 

Nesse gráfico consegui ver todos os dados dos imoveis que estão no Cluster, qual o cluster especifico e quais bairros são de cada um dos dados.

In [38]:
import plotly.express as px

In [39]:
fig = px.scatter(projetion_kmeans.toPandas(), x='x', y='y', color='cluster_pca', hover_data=['bairro'])
fig.show()

# Filtrar imóveis do mesmo cluster

In [40]:
projetion_kmeans.select(['pca_features','cluster_pca', "id"]).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------+-----------+------------------------------------+
|pca_features                                                                                                               |cluster_pca|id                                  |
+---------------------------------------------------------------------------------------------------------------------------+-----------+------------------------------------+
|[-6.1651250495338035,1.3380985269405783,-1.7052299823820085,-0.5338289630560942,0.08903815478586295,-0.3134396180113537]   |4          |00002dd9-cc74-4809-b5a5-850adf0e7526|
|[-7.524672388434003,-3.8085058714021036,-0.8611234467676232,0.8708163848549987,-0.2490353016571682,-0.2218027235525904]    |36         |00003eb1-1d77-43f3-ac4c-0c34060bb5ec|
|[-5.764448661386763,-1.4404347671314566,-0.39044571273099243,0.2764893984765971,-0.128001622939613,0.06220040040452328]    |

In [41]:
id_imovel = '00118f9c-44b1-4132-b68b-8bc5829fa5e5'

In [42]:
projetion_kmeans.filter(projetion_kmeans.id == id_imovel).select('cluster_pca').show()

+-----------+
|cluster_pca|
+-----------+
|         23|
+-----------+



In [43]:
cluster = projetion_kmeans.filter(projetion_kmeans.id == id_imovel).select('cluster_pca').collect()[0][0]
cluster

23

In [44]:
imoveis_recomendados = projetion_kmeans.filter(projetion_kmeans.cluster_pca == cluster)\
                      .select('bairro', 'id', 'pca_features')
                      
imoveis_recomendados.show()

+-----------+--------------------+--------------------+
|     bairro|                  id|        pca_features|
+-----------+--------------------+--------------------+
|     Leblon|00118f9c-44b1-413...|[-4.0420619793314...|
| Copacabana|002fb311-cd71-4e2...|[-2.6411403155780...|
|   Botafogo|005b5441-16b2-483...|[-2.8493633914843...|
|     Leblon|0089a287-fab6-4fb...|[-2.6222774657522...|
|   Botafogo|0090eb0d-5fee-4f7...|[-3.3565703070475...|
| Copacabana|00abd716-4d6b-457...|[-3.4045704097294...|
| Copacabana|00e2ee5b-dfd0-4b7...|[-2.6341433996464...|
| Copacabana|00e93d0f-7917-455...|[-2.7760596591941...|
|   Botafogo|00eb8693-5de8-457...|[-2.6121806053735...|
| Copacabana|0184f3d2-61c4-495...|[-2.1429200663488...|
| Copacabana|01a7cd87-365d-4f2...|[-2.8641506741669...|
| Copacabana|022e954b-9555-4ff...|[-2.7280420338312...|
| Copacabana|02ac1601-ab97-431...|[-2.6326945643408...|
| Copacabana|02cc5233-3f9d-4bd...|[-2.6138941247180...|
| Copacabana|02e90d70-68f9-450...|[-2.8441197402

# Calcular a distância euclidiana

A mecânica para selecionar quais imóveis do cluster serão recomendadas será utilizando a distância dos imóveis em relação às suas componentes. 

Para isso, vou calcular a distância euclidiana para encontrar as 10 melhores recomendações.

In [45]:
id_imovel

'00118f9c-44b1-4132-b68b-8bc5829fa5e5'

In [46]:
imovel_procurado = imoveis_recomendados.filter(imoveis_recomendados.id == id_imovel)\
                                          .select('pca_features').collect()[0][0]
imovel_procurado                             

DenseVector([-4.0421, -0.3469, -3.4128, -0.3398, -0.0995, 0.4823])

In [49]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType
from pyspark.sql import functions as f

In [61]:
def calcula_distancia_euclidean(imovel, valor):
    return euclidean(imovel, valor)

euclidean_udf = f.udf(lambda x: calcula_distancia_euclidean(imovel_procurado, x), FloatType())

imoveis_recomendados\
        .withColumn('distancia', euclidean_udf('pca_features'))\
        .select('bairro', 'id', 'distancia')\
        .orderBy('distancia')\
        .show(10)

+-----------+--------------------+-----------+
|     bairro|                  id|  distancia|
+-----------+--------------------+-----------+
|     Leblon|00118f9c-44b1-413...|        0.0|
| Copacabana|c9ca971f-dec6-42c...| 0.07294781|
|    Ipanema|23fc411e-973c-489...|0.072998434|
|    Ipanema|39833268-492d-470...| 0.08764445|
|Laranjeiras|cccd963b-1540-495...| 0.09031644|
| Copacabana|2e37cb8c-903a-4cd...|0.092567734|
|      Lagoa|b1252445-12fd-4f6...| 0.13264999|
| Copacabana|f22328da-c5e7-464...| 0.13946792|
|     Leblon|90edc097-587c-4ce...| 0.17081119|
|    Ipanema|91c5876a-7b2d-4a0...| 0.17964944|
+-----------+--------------------+-----------+
only showing top 10 rows



In [62]:
recomendadas = imoveis_recomendados\
        .withColumn('distancia', euclidean_udf('pca_features'))\
        .select('bairro', 'id', 'distancia')\
        .orderBy('distancia')
        

recomendadas.show(10)

+-----------+--------------------+-----------+
|     bairro|                  id|  distancia|
+-----------+--------------------+-----------+
|     Leblon|00118f9c-44b1-413...|        0.0|
| Copacabana|c9ca971f-dec6-42c...| 0.07294781|
|    Ipanema|23fc411e-973c-489...|0.072998434|
|    Ipanema|39833268-492d-470...| 0.08764445|
|Laranjeiras|cccd963b-1540-495...| 0.09031644|
| Copacabana|2e37cb8c-903a-4cd...|0.092567734|
|      Lagoa|b1252445-12fd-4f6...| 0.13264999|
| Copacabana|f22328da-c5e7-464...| 0.13946792|
|     Leblon|90edc097-587c-4ce...| 0.17081119|
|    Ipanema|91c5876a-7b2d-4a0...| 0.17964944|
+-----------+--------------------+-----------+
only showing top 10 rows



# Criar uma função recomendadora

In [64]:
def calcula_distancia_euclidean(imovel, valor):
    return euclidean(imovel, valor)
                     

def recomendador(id_movel, valor):
    cluster = projetion_kmeans\
              .filter(projetion_kmeans.id == id_imovel)\
              .select('cluster_pca')\
              .collect()[0][0]
    
    imoveis_recomendados = projetion_kmeans\
                          .filter(projetion_kmeans.cluster_pca == cluster)\
                          .select('bairro', 'id', 'pca_features')
    
    imovel_procurado = imoveis_recomendados\
                      .filter(imoveis_recomendados.id == id_imovel)\
                      .select('pca_features')\
                      .collect()[0][0]

    euclidean_udf = f.udf(lambda x: calcula_distancia_euclidean(imovel_procurado, x), FloatType())

    recomendadas = imoveis_recomendados\
        .withColumn('distancia', euclidean_udf('pca_features'))\
        .select('bairro', 'id', 'distancia')\
        .orderBy('distancia')

    return recomendadas


In [66]:
recomendador('002dc4e0-f1a5-4add-9c1c-3f43904645da', projetion_kmeans).select("id", "bairro").show(truncate=False)

+------------------------------------+-----------+
|id                                  |bairro     |
+------------------------------------+-----------+
|00118f9c-44b1-4132-b68b-8bc5829fa5e5|Leblon     |
|c9ca971f-dec6-42c8-bc0a-88e8fded3af2|Copacabana |
|23fc411e-973c-489c-8e30-15a1c9e1c40f|Ipanema    |
|39833268-492d-4704-8948-784bd004b567|Ipanema    |
|cccd963b-1540-4952-a9d9-9644528cbacb|Laranjeiras|
|2e37cb8c-903a-4cd3-9888-3c4feeca2545|Copacabana |
|b1252445-12fd-4f6d-b80a-4afe1474bb82|Lagoa      |
|f22328da-c5e7-4645-8157-9c9cf3711695|Copacabana |
|90edc097-587c-4cef-ad5f-18a34fe73a7a|Leblon     |
|91c5876a-7b2d-4a08-86fd-83156fe7912a|Ipanema    |
|be95a541-e1e1-421b-bfbb-467a224368a4|Ipanema    |
|3e415433-d2b7-4608-8cea-08724854a6da|Leblon     |
|16057e6f-4781-400c-9a53-9f8e41b592a8|Ipanema    |
|e21725df-a33b-450a-9dd4-dd81fe3a4ca3|Ipanema    |
|35ef105f-419b-489f-a671-67b50feb954a|Ipanema    |
|bc9af4ca-142f-4a05-b6ea-1acc200991bc|Copacabana |
|56c7f290-d146-4e45-89e4-3c4b5a