In [1]:
import os
import sys

os.environ['SPARK_HOME'] = 'C:\spark'
os.environ['HADOOP_HOME'] = 'C:\spark\hadoop'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName('Modelo_Recomendacao')\
    .getOrCreate()



In [3]:
dados = spark.read.parquet('DADOS/dataset_preparado')

Vetorização

In [4]:
from pyspark.ml.feature import VectorAssembler

x = dados.columns
x.remove('id')

In [5]:
assembler = VectorAssembler(inputCols=x, outputCol='features')
dados_treino = assembler.transform(dados).select(['features'])

Scaler

In [6]:
from pyspark.ml.feature import StandardScaler


In [7]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scaler_model = scaler.fit(dados_treino)
dados_scaler = scaler_model.transform(dados_treino)

Recução da Dimensionalidade

In [8]:
from pyspark.ml.feature import PCA


In [9]:
pca = PCA(k=2, inputCol='scaledFeatures', outputCol='pca_features')
model_pca = pca.fit(dados_scaler)
dados_pca = model_pca.transform(dados_scaler)


Pipeline

In [10]:
from pyspark.ml import Pipeline


In [11]:
pca_pipeline = Pipeline(stages=[assembler, scaler, pca])
pca_pipeline_model = pca_pipeline.fit(dados)
dados_pca = pca_pipeline_model.transform(dados)


Clusterizando

In [12]:
from pyspark.ml.clustering import KMeans
SEED = 1224


In [13]:
cluster_kmeans = KMeans(featuresCol='pca_features',predictionCol='cluster_pca', k=5, seed=SEED)
model_kmeans = cluster_kmeans.fit(dados_pca)
predictions_kmeans = model_kmeans.transform(dados_pca)

In [14]:

predictions_kmeans.select(
    'pca_features', 'cluster_pca').show(5, truncate=False)


+-----------------------------------------+-----------+
|pca_features                             |cluster_pca|
+-----------------------------------------+-----------+
|[-5.512118758465008,-0.43476839552976954]|0          |
|[-4.8256678817413174,-0.5295971192273197]|3          |
|[-4.316107109820567,-0.7522300656417595] |3          |
|[-5.389508969071561,-0.27188950959557334]|0          |
|[-5.509200590399002,-0.4352124273782808] |0          |
+-----------------------------------------+-----------+
only showing top 5 rows



In [15]:
predictions_kmeans.select('cluster_pca').groupBy('cluster_pca').count().show()

+-----------+-----+
|cluster_pca|count|
+-----------+-----+
|          1| 8472|
|          3|14760|
|          4|17437|
|          2| 5127|
|          0|20541|
+-----------+-----+



In [16]:
#Função copiada da instrutora, achei muito interessante a forma como ela fez a contagem dos dados.

from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import functions as f

dados_pca\
    .join(predictions_kmeans.select('id', 'cluster_pca'), on='id')\
    .groupBy('cluster_pca')\
    .agg(
        f.count('id').alias('quantidade'),
        f.mean('valor').alias('valor_medio'),
        f.mean('area_util').alias('area_media'),
        f.round(f.mean('quartos'), 0).alias('quartos_medio'),
        f.round(f.mean('vaga'), 0).alias('vagas_medio'),
        f.round(f.mean('banheiros'), 0).alias('banheiros_medio'),
        f.round(f.mean('suites'), 0).alias('suites_medio'),
        f.mean('condominio').alias('condominio_medio'),
        f.mean('iptu').alias('iptu_medio'),
    )\
    .orderBy('cluster_pca')\
    .show()


+-----------+----------+------------------+-----------------+-------------+-----------+---------------+------------+------------------+------------------+
|cluster_pca|quantidade|       valor_medio|       area_media|quartos_medio|vagas_medio|banheiros_medio|suites_medio|  condominio_medio|        iptu_medio|
+-----------+----------+------------------+-----------------+-------------+-----------+---------------+------------+------------------+------------------+
|          0|     20541| 863595.3336254321|90.76690521396232|          2.0|        1.0|            2.0|         1.0| 4356.066160362202| 3676.753955503627|
|          1|      8472|2646218.1606468367|204.4748583569405|          4.0|        2.0|            4.0|         2.0| 6477.303706326723|12652.581090651558|
|          2|      5127| 3893152.328067096|285.2802808660035|          4.0|        3.0|            5.0|         3.0| 16801.08991613029|15630.653013458163|
|          3|     14760| 793718.8814363143|82.92960704607046|         

Otimização

In [17]:
pca_pipeline_model.stages[2].explainedVariance
k = len(x)

In [18]:
pca = PCA(k=k, inputCol='scaledFeatures', outputCol='pca_features')
model_pca = pca.fit(dados_pca.drop('pca_features'))
dados_imoveis_pca = model_pca.transform(dados_pca.drop('pca_features'))


In [19]:
sum(model_pca.explainedVariance) * 100


100.0

Melhor Dimensionalidade

In [20]:
import numpy as np

lista_valores = np.cumsum(model_pca.explainedVariance[:])
k = int(sum(lista_valores <= 0.7))


In [21]:
pca = PCA(k=k, inputCol='scaledFeatures', outputCol='pca_features')
model_pca = pca.fit(dados_pca.drop('pca_features'))
dados_imoveis_pca_final = model_pca.transform(dados_pca.drop('pca_features'))


In [22]:
pca_pipeline = Pipeline(stages=[assembler, scaler, pca])

In [23]:
model_pca_pipeline = pca_pipeline.fit(dados)


In [24]:
projection = model_pca_pipeline.transform(dados)


Melhor Clusterização

In [25]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score = []

evaluator = ClusteringEvaluator(predictionCol='cluster_pca', featuresCol='pca_features',
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette_score = {}

for i in range(2, 51):

    KMeans_algo = KMeans(k=i, featuresCol='pca_features',
                         predictionCol='cluster_pca', seed=SEED)

    KMeans_fit = KMeans_algo.fit(projection)

    output = KMeans_fit.transform(projection)

    score = evaluator.evaluate(output)
    silhouette_score[i] = score


In [26]:
k_best = [key for key, value in silhouette_score.items() if value ==
          max(silhouette_score.values())]
k_best[0]


35

In [27]:
kmeans = KMeans(k=k_best[0], featuresCol='pca_features',
                predictionCol='cluster_pca', seed=SEED)


In [28]:
modelo_kmeans = kmeans.fit(projection)


In [29]:
projetion_kmeans = modelo_kmeans.transform(projection)


In [31]:
projetion_kmeans.write.parquet('DADOS/dataset_projecao_kmeans', mode='overwrite')


Criação das Funções de Consultas

In [32]:
id_imovel = '0034df72-124a-4383-a89f-a019850a2ba0'


In [33]:
try :
  cluster = projetion_kmeans.filter(
    projetion_kmeans.id == id_imovel).select('cluster_pca').collect()[0][0]
  cluster
except:
    print("Não foi possível encotnrar o imóvel com o id informado")


In [38]:
imoveis_recomendadas = projetion_kmeans.filter(projetion_kmeans.cluster_pca == cluster)\
                                       .select('id', 'pca_features')
imoveis_recomendadas.show(5)


+--------------------+--------------------+
|                  id|        pca_features|
+--------------------+--------------------+
|34aea9e8-cd98-4a0...|[-4.4914908601853...|
|d6c67f6b-5515-4f6...|[-4.4284664548113...|
|a6385b02-8349-409...|[-4.5494351090068...|
|cca0dbb3-7c52-4ed...|[-4.3805173611370...|
|6b6c2bee-793b-473...|[-4.4546925726016...|
+--------------------+--------------------+
only showing top 5 rows



In [39]:
imovel_procurado = imoveis_recomendadas.filter(imoveis_recomendadas.id == id_imovel)\
    .select('pca_features').collect()[0][0]
imovel_procurado


DenseVector([-4.6145, -4.0154, 1.3826, 0.2565, -0.0629, 0.3156])

In [36]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType


In [37]:
def calculate_euclidean_distance(imovel, valor):
    return euclidean(imovel, valor)


euclidean_udf = f.udf(lambda x: calculate_euclidean_distance(
    imovel_procurado, x), FloatType())

imoveis_recomendadas\
    .withColumn('distancia', euclidean_udf('pca_features'))\
    .select('id', 'distancia')\
    .orderBy('distancia')\
    .show(5)


+--------------------+----------+
|                  id| distancia|
+--------------------+----------+
|0034df72-124a-438...|       0.0|
|e5ec2e28-c81f-47a...| 0.1730086|
|12012273-e197-488...| 0.1799632|
|c4585166-c411-4e5...|0.18049306|
|acbccbb4-4aec-452...| 0.2254096|
+--------------------+----------+
only showing top 5 rows



In [40]:
def calculate_euclidean_distance(imovel, valor):
    return euclidean(imovel, valor)


def recommender(id_imovel, dataframe_kmeans):
    cluster = dataframe_kmeans\
        .filter(dataframe_kmeans.id == id_imovel)\
        .select('cluster_pca')\
        .collect()[0][0]

    imoveis_recomendadas = dataframe_kmeans\
        .filter(dataframe_kmeans.cluster_pca == cluster)

    imovel_procurado = imoveis_recomendadas\
        .filter(imoveis_recomendadas.id == id_imovel)\
        .select('pca_features')\
        .collect()[0][0]

    euclidean_udf = f.udf(lambda x: calculate_euclidean_distance(
        imovel_procurado, x), FloatType())

    colunas_nao_utilizadas = [
        'features', 'scaled_features', 'pca_features', 'cluster_pca', 'distancia']

    recomendadas = imoveis_recomendadas\
        .withColumn('distancia', euclidean_udf('pca_features'))\
        .select([col for col in imoveis_recomendadas.columns if col not in colunas_nao_utilizadas])\
        .orderBy('distancia')

    return recomendadas


In [41]:
recommender("0034df72-124a-4383-a89f-a019850a2ba0",
            projetion_kmeans).select("id").show(20, truncate=False)


+------------------------------------+
|id                                  |
+------------------------------------+
|0034df72-124a-4383-a89f-a019850a2ba0|
|e5ec2e28-c81f-47a6-aeff-1288d6773c7b|
|12012273-e197-488f-bf81-784cb02ec875|
|c4585166-c411-4e57-9095-9b7ac2f5dccc|
|acbccbb4-4aec-4527-88fa-7933de023633|
|a5ff60b0-bc83-4e98-b56f-3e855b7b3711|
|7e96df1b-e77d-49c5-82e2-cc5ac4f6f960|
|d155b386-04e3-48e1-9bb1-f6f14a84a675|
|612f2a2c-a28a-44ec-b6d1-4d7606932f84|
|dbe16cf4-439c-4812-9233-34507b399d53|
|7c3e3897-3734-4a10-bd7f-2efbb28aaef8|
|5788a47e-04e1-41fd-8d81-dfaeff13d942|
|9f12771a-93e0-4fe6-bfd2-02d40e429644|
|a6b8131b-9bec-46cd-83a7-aa491b83dbe1|
|5cbe6ea9-ae38-4f19-8f45-c5f75431b9cd|
|ef6cf68f-ea1a-46c2-aeb4-84e3a4822b51|
|e1302c47-8a02-4044-92db-7975d743caa6|
|e8d36b03-9a0b-47cf-aed2-01b063fa07f0|
|0c7b4266-d8fd-4cf2-95d8-d0ba4121feb9|
|c3f03e00-b8f8-4b28-9e4c-2590b0d1b083|
+------------------------------------+
only showing top 20 rows

