In [0]:
from scipy.spatial.distance import euclidean

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import CountVectorizer, StringIndexer, VectorAssembler, StandardScalerPCA
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline

In [0]:
def projetar_kmeans(df):

    df = df.withColumn("published_date", year(df.publishedDate))

    cv = CountVectorizer(inputCol="genre", outputCol="genre_vetor")
    df = cv.fit(df).transform(df)

    indexer = StringIndexer(inputCol="author", outputCol="autor_index")
    df = indexer.fit(df).transform(df)

    assembler = VectorAssembler(
        inputCols=["pages", "stars", "genre_vetor", "reviews", "totalratings", "price", "autor_index", "published_date"], 
        outputCol="features"
    )
    df_features = assembler.transform(df)

    scaler = StandardScaler(inputCol='atributos', outputCol='atributos_dimensionados')
    model_scaler = scaler.fit(df_atributos)
    dados_livros_scaler = model_scaler.transform(df_atributos)

    pca = PCA(k=6, inputCol='atributos_dimensionados', outputCol='pca_atributos')
    modelo_pca = pca.fit(dados_livros_scaler)
    dados_livros_pca = modelo_pca.transform(dados_livros_scaler)

    kmeans = KMeans(k=50, featuresCol='pca_atributos', predictionCol='cluster_pca', seed=1224)
    pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='atributos'),
                                StandardScaler(inputCol='atributos', outputCol='atributos_dimensionados'),
                                PCA(k=6, inputCol='atributos_dimensionados', outputCol='pca_atributos')])
    
    modelo_pca_pipeline = pca_pipeline.fit(df)
    projection = modelo_pca_pipeline.transform(df)
    
    modelo_kmeans = kmeans.fit(projection)
    projecao_kmeans = modelo_kmeans.transform(projection)
    projecao_kmeans = projecao_kmeans.withColumn('x', vector_to_array('pca_atributos')[0])\
                                       .withColumn('y', vector_to_array('pca_atributos')[1])

    return projecao_kmeans

In [0]:
def recomendar_livros(titulo_livro, df):

    projecao_kmeans = projetar_kmeans(df)
  
    cluster = projecao_kmeans.filter(projecao_kmeans.title.isin(titulo_livro)).select('cluster_pca').collect()[0][0]
    livros_recomendados = projecao_kmeans.filter(projecao_kmeans.cluster_pca == cluster)\
                                       .select('title', 'id', 'pca_atributos')

    #componenetes_livro = livros_recomendados.filter(livros_recomendados.title.isin(titulo_livro)).select('pca_atributos').collect()[0][0]

    def calcular_distaniae(valor):
        return euclidean(componenetes_livro, valor)

    udf_calcula_distancia = udf(calcular_distancia, FloatType())

    livros_recomendados_dist = livros_recomendados.withColumn('Dist', udf_calcula_distancia('pca_atributos'))

    recomendados = spark.createDataFrame(livros_recomendados_dist.sort('Dist').take(10)).select(['title', 'id', 'Dist'])

    return recomendados

In [0]:
df_goodreads = spark.read.format('delta').load('abfss://livros@recomendador.dfs.core.windows.net/silver/goodreads')
df_kindle = spark.read.format('delta').load('abfss://livros@recomendador.dfs.core.windows.net/silver/kindle')

In [0]:
df_livros = df_goodreads.join(df_kindle, df_goodreads.title == df_kindle.title, 'inner') \
                    .drop(df_goodreads.title, df_goodreads.author, df_goodreads.rating, df_kindle.reviews)

df_livros = df_livros.na.fill(0)
df_livros = df_livros.repartition(200)

In [0]:
del df_goodreads, df_kindle

In [0]:
#titulo_livro = input("Digite o título do livro para receber recomendação: ")
titulo_livro = "Harry Potter"
resultado = recomendar_livros(titulo_livro, df_livros)
display(resultado)