In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import hsfs

connection = hsfs.connection()
fs = connection.get_feature_store()

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
9,application_1645967682797_0010,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

In [None]:
books = fs.get_feature_group("books_raw", version=1)
users = fs.get_feature_group("users_raw", version=1)
reviews = fs.get_feature_group("reviews_raw", version=1)

In [7]:
# TODO(fabio): one-hot-encode
authors = books.select("Book-Author").distinct() \
               .withColumn("author_id", F.monotonically_increasing_id())

In [8]:
books = books.join(authors, on="Book-Author").drop('Book-Author')

In [9]:
# Compute the number of reviews for each book
books_reviews = books.join(ratings, on="ISBN") \
                     .groupBy("ISBN") \
                     .count() \
                     .withColumnRenamed("count", "num_reviews")

In [10]:
# Books that have more than 100 reviews are considered popular
popular_books = books_reviews.filter(F.col("num_reviews") > 100) \
                             .withColumn('is_popular', F.lit(1)) \
                             .drop('num_reviews')

In [11]:
# Compute the average rating for each book
avg_rating = books.join(ratings, on="ISBN") \
                  .groupBy("ISBN") \
                  .agg({'Book-Rating': 'avg'}) \
                  .withColumnRenamed('avg(Book-Rating)', 'avg_rating')

In [12]:
# Compute the number of ratings for each class
rating_classes_tmp = books.join(ratings, on="ISBN")
rating_classes = books.select("ISBN")

for i in range(0, 11):
    rating_class_i = rating_classes_tmp.filter(F.col("Book-Rating") == i) \
                                       .groupBy("ISBN") \
                                       .count() \
                                       .withColumnRenamed("count", "num_ratings_{}".format(i))
    
    rating_classes = rating_classes.join(rating_class_i, on="ISBN", how="left")

In [13]:
# compute authors top books
author_rating_window = Window.partitionBy("author_id").orderBy(F.col("avg_rating").desc())

author_ranking = avg_rating.join(books, on="ISBN") \
                           .withColumn('author_book_rank',F.row_number().over(author_rating_window)) \
                           .drop('author_id')

top_3_author_books = books.join(author_ranking, on="ISBN") \
                          .filter(F.col("author_book_rank") <= 3) \
                          .withColumn("is_author_popular_book", F.lit(1)) \
                          .select("ISBN", "is_author_popular_book")

In [15]:
# [ 0 - 15, 15 - 30, 31 - 45, 46 - 60, 61 - 75, other ]
age_bracket = books.select("ISBN")

for age_br in [(0, 15), (15, 30), (30, 45), (45, 60), (60, 75)]:
    age_bracket_tmp = books.select("ISBN") \
        .join(ratings, on='ISBN') \
        .join(users, on="User-ID") \
        .filter((F.col('Age') > age_br[0]) & (F.col('Age') <= age_br[1])) \
        .groupBy("ISBN") \
        .agg({'Book-Rating': 'avg'}) \
        .withColumnRenamed('avg(Book-Rating)', 'avg_rating_age_{}_{}'.format(age_br[0], age_br[1]))
    
    age_bracket = age_bracket.join(age_bracket_tmp, on="ISBN", how="left")
    
# Other   
age_bracket_tmp = books.select("ISBN") \
        .join(ratings, on='ISBN') \
        .join(users, on="User-ID") \
        .filter((F.col('Age') > 75) | (F.isnull('Age'))) \
        .groupBy("ISBN") \
        .agg({'Book-Rating': 'avg'}) \
        .withColumnRenamed('avg(Book-Rating)', 'avg_rating_age_other')

age_bracket = age_bracket.join(age_bracket_tmp, on="ISBN", how="left")

In [16]:
books_fg = books.join(books_reviews, on="ISBN", how="left") \
     .join(popular_books, on="ISBN", how="left") \
     .join(avg_rating, on="ISBN", how="left") \
     .join(rating_classes, on="ISBN", how="left") \
     .join(top_3_author_books, on="ISBN", how="left") \
     .join(age_bracket, on="ISBN", how="left") \
     .fillna(0)

In [17]:
# cleanup columns
books_fg = books_fg.selectExpr(["`{}` as `{}`".format(c, c.lower().replace('-', '_')) for c in books_fg.columns])

In [18]:
books = fs.create_feature_group("books",
                                version=1,
                                description="Books related features and ratings",
                                online_enabled=False,
                                primary_key=["isbn"])
books.save(books_fg)

In [20]:
books_fg.select("ISBN").distinct().count()

271359