<a href="https://colab.research.google.com/github/EmanueleGiavardi/AMD_project/blob/main/src/amd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q kaggle pyspark

In [2]:
import pyspark
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
import os
from google.colab import files
from collections import Counter

In [3]:
# please upload your kaggle.json file here
files.upload()
!ls -lha kaggle.json
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
-rw-r--r-- 1 root root 72 May 28 07:10 kaggle.json


In [4]:
!kaggle datasets download -d "mohamedbakhet/amazon-books-reviews"
!unzip amazon-books-reviews.zip
!rm -r amazon-books-reviews.zip

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 98% 1.04G/1.06G [00:13<00:00, 233MB/s]
100% 1.06G/1.06G [00:13<00:00, 82.5MB/s]
Archive:  amazon-books-reviews.zip
  inflating: Books_rating.csv        
  inflating: books_data.csv          


In [5]:
spark = pyspark.sql.SparkSession.builder.master("local[*]").appName("AMD_project").getOrCreate()
sc = spark.sparkContext

In [61]:
books_rating_df = spark.read.csv("Books_rating.csv", header=True, inferSchema=True)
books_data_df = spark.read.csv("books_data.csv", header=True, inferSchema=True)
books_rating_df.printSchema()
books_data_df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- previewLink: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- infoLink: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- ratingsCount: string (nullable = true)



In [64]:
# books rating subsampling

random_state = 42
ratings_count = books_rating_df.count()
sampling_frac = 0.01

# probabilistic approach: keeps each line with prob = fraction
# books_rating_df_sub = books_rating_df.sample(fraction=sampling_frac, seed=random_state)

# keeps exactly (sampling_frac * count) lines, assuming books already in casual order
books_rating_df_sub = books_rating_df.limit(int(sampling_frac * ratings_count))
print(f"sample has {int(sampling_frac * ratings_count)} lines")

sample has 30000 lines


In [62]:
books_data_df.select("categories").take(10)

[Row(categories="['Comics & Graphic Novels']"),
 Row(categories='http://books.google.nl/books?id=IjvHQsCn_pgC&printsec=frontcover&dq=Dr.+Seuss:+American+Icon&hl=&cd=1&source=gbs_api'),
 Row(categories="['Religion']"),
 Row(categories="['Fiction']"),
 Row(categories=None),
 Row(categories="['Religion']"),
 Row(categories=None),
 Row(categories=None),
 Row(categories="['Biography & Autobiography']"),
 Row(categories="['Social Science']")]

# **Link Analysis: finding influential/authoritative users**

## **Graph creation**

- nodes → users
- edges → links between users if two users reviewed the same book

the graph is **oriented**, so a link from ```u2``` to ```u1``` exists if ```u1``` and ```u2``` reviewd the same book, but the score (helpfulness) of the ```u1```'s review for that book is higher than the score that ```u2``` obtained for his/her review of that specific book.

Actually, we take into account a subset of all the possibile books: in books_data csv there's a column named "categories" that contains:
- in most of the cases a string referring to a list of categories, such as ['Religion', 'Politics', ...]
- in some cases some rubbish data, for instance None values or links to Google Books Store

Thus, we just consider the subset of books for which the value of the column "categories" matches the regular expression \\[.*]\, so to turn this string into an actual list of strings.

This choice was made because the idea is to apply Topic Sensitive PageRank to the graph, where the "topic" associated with each node (user) is a "preferred literary genre", given by the most-reviewed genre by that specific user

```
# graph creation pseudocode:

for each book b (with well-formatted categories):
    for each (u1, u2) such that both u1 and u2 reviewed b:
        if (helpfulness(u1, b)) > (helpfulness(u1, b)):
            add edge from u2 to u1
```


---------------

given $R$ the review table and given $R' = Π_{Title, User\_id, helpfulness}(R)$, we create the table

$$J = \sigma_{helpfulness_1 > helpfulness_2}(R' ⨝_{Title} R') $$

This table has the schema
```
root
 |-- User_id_1
 |-- Title
 |-- User_id_2
 |-- Helpfulness_1
 |-- Helpfulness_2
```
and it's build such that both ```User_id_1``` and ```User_id_2``` reviewd the book named with ```Title``` and ```Helpfulness_1``` $>$ ```Helpfulness_2```

Starting from this table we create the graph according to the criterium explained above

**NOTE**:

the helpfulness score of each review does not share a common scaling (we have things like 0/0, 4/5, 8/10, 78/82 ...)
Just for now, the score is simply obtained turning the string "X/Y" into a float number and evaluating it.

> TODO NEXT: find a cleverer way to deal with helpfulness. The "X/Y" could be interpreted as "people who found the review useful/total people who voted", even though this is not clear from the dataset specifications (or from the Amazon website). With this assumption, however, it becomes important to take into account the number of people who voted, instead of just considering the fraction of appreciation.



In [94]:
from pyspark.sql.functions import split, col, when

# TODO: replace with normalized version
def get_helpfulness_score(col_name):
    num = split(col(col_name), "/").getItem(0).cast("float")
    den = split(col(col_name), "/").getItem(1).cast("float")
    return when(den != 0, num / den).otherwise(0.0)

R_first = books_rating_df_sub.select(["Title", "User_id", "review/helpfulness"]).filter(col('User_id').isNotNull())
R1 = R_first.alias("R1")
R2 = R_first.alias("R2")

J = R1.join(R2, col("R1.Title") == col("R2.Title")) \
      .filter(col("R1.User_id") != col("R2.User_id")) \
      .select(
          col("R1.Title").alias("Title"),
          col("R1.User_id").alias("User_id_1"),
          col("R2.User_id").alias("User_id_2"),
          get_helpfulness_score("R1.review/helpfulness").alias("helpfulness_1"),
          get_helpfulness_score("R2.review/helpfulness").alias("helpfulness_2")
      ).filter(col("helpfulness_1") > col("helpfulness_2"))

# J schema: | Title | User_id_1 | User_id_2 | helpfulness_1 | helpfulness_2 |

In [95]:
# finding books with well-formatted "categories" value
# TODO: additional checks on there values... a simple regular expression [*] might not be enough!

from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, StringType

genres_schema = ArrayType(StringType())

# output example: Row(Title='Wonderful Worship in Smaller Churches', Genres=['Religion', 'Politics']),
genres = books_data_df.filter((col("categories").isNotNull()) & (col("categories").rlike(r"^\[.*\]$"))).withColumn("Genres", from_json("categories", genres_schema)).select("Title", "Genres")

# filtering J_filtered keeping only books with well-formatted categories
J_filtered = J.join(genres, on="Title", how="inner")
J_filtered.take(10)

[Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A3KDWTJK489NTM', User_id_2='A3JQK9NSCJI4AR', helpfulness_1=0.5, helpfulness_2=0.0, Genres=['Biography & Autobiography']),
 Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A3KDWTJK489NTM', User_id_2='A2H4Z5FPW4ER5K', helpfulness_1=0.5, helpfulness_2=0.0, Genres=['Biography & Autobiography']),
 Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A2DSK1GG7GBDNW', User_id_2='AZA7EKIK1TKZX', helpfulness_1=0.8333333333333334, helpfulness_2=0.6666666666666666, Genres=['Biography & Autobiography']),
 Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A2DSK1GG7GBDNW', User_id_2='A1AJ1FG9SNJMZ1', helpfulness_1=0.8333333333333334, helpfulness_2=0.7142857142857143, Genres=['Biography & Autobiography']),
 Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A2DSK1GG7GBDNW', User_id_2='A3JQK9NSCJI4AR', helpfulness_1=0.8333333333333334, helpfulness_2=0.0, Genres=['Biography & Autobiography']),
 Row(Title='"""Our Brown-Eyed Boy"""', User_id_1='A2DSK1GG7GBDNW',

In [98]:
# finding preferred genre for each user in subsampled dataset

T1 = books_rating_df_sub.select(col("User_id"), col("Title")).alias("T1")
T2 = genres.alias("T2")

genres_per_review = T1.join(T2, T1.Title == T2.Title).select("User_id", "T1.Title", "Genres").filter(col("User_id").isNotNull())
genres_per_review.take(5)

[Row(User_id='AVCGYZL8FQQTD', Title='Its Only Art If Its Well Hung!', Genres=['Comics & Graphic Novels']),
 Row(User_id='A3OQWLU31BU1Y', Title='Wonderful Worship in Smaller Churches', Genres=['Religion']),
 Row(User_id='AGKGOH65VTRR4', Title='Wonderful Worship in Smaller Churches', Genres=['Religion']),
 Row(User_id='A373VVEU6Z9M0N', Title='Wonderful Worship in Smaller Churches', Genres=['Religion']),
 Row(User_id='AZ0IOBU20TBOP', Title='Wonderful Worship in Smaller Churches', Genres=['Religion'])]

In [99]:
# setting the user as key and grouping by key we have something like:
# ('A3U1XS6XK6YUU4' -> [['Fiction', 'Drama'], ['Religion'], ['Religion', 'Politics']]). So the value is a list of lists: Each internal list represents the genres of each book reviewd by the user

# [genre for book_genres in external_list for genre in book_genres] just flattens the list:
# [['Fiction', 'Drama'], ['Religion'], ['Religion', 'Politics']] => ['Fiction', 'Drama', 'Religion', 'Religion', 'Politics']
# same as:
#for book_genres in external_list:
#    for genre in book_genres:
#        append genre to resulting list

# the Counter simply returns the most common genre (in the example: 'Religion')
# if the counting is the same for two distinct genres, it is returned the first one according to the ordering

genres_per_user = genres_per_review.rdd.map(lambda row: (row['User_id'], row['Genres'])).groupByKey() \
    .mapValues(lambda external_list: [genre for book_genres in external_list for genre in book_genres]).mapValues(
    lambda genres: Counter(genres).most_common(1)[0][0] if genres else None
)

# so here we have a "Topic" related to each user for TSPR
genres_per_user.take(10)

[('AVCGYZL8FQQTD', 'Comics & Graphic Novels'),
 ('AGKGOH65VTRR4', 'Religion'),
 ('A3OZDTEEAF8GS9', 'Fiction'),
 ('A3U1XS6XK6YUU4', 'Fiction'),
 ('A1XNI3654I4ME2', 'Fiction'),
 ('A3T591DTKPYCVW', 'Fiction'),
 ('A3S5YQQWBO0LGI', 'Fiction'),
 ('A3OS2QHEH495TD', 'Fiction'),
 ('A2XXVRH6VJ8S7Q', 'Fiction'),
 ('A2ABV8FXF88O9P', 'Fiction')]

In [100]:
# just for curiosity: top 10 preferred genres

d = genres_per_user.collectAsMap()
Counter(d.values()).most_common(10)

[('Fiction', 5727),
 ('Book burning', 980),
 ('Business & Economics', 587),
 ('Education', 575),
 ('Biography & Autobiography', 524),
 ('Religion', 522),
 ('American fiction', 463),
 ('History', 411),
 ('True Crime', 353),
 ('Body, Mind & Spirit', 337)]

The idea now is to associate an **increasing integer value from $0$ to $N-1$** to each one of the $N$ user ids. In this way:
- An edge is simply going to be represented as a couple of integers $(i, j)$, where $i$ is the integer value related to the user having the outgoing connection and $j$ us the integer value related to the user having the incoming connection
- PageRank values will be stored in a simple array $V$ of $N$ elements, such that $V[i]$ = pageRank value for the user associated to integer value $i$



In [96]:
unique_users = J_filtered.select(col("User_id_1").alias("User_id")) \
    .union(J_filtered.select(col("User_id_2").alias("User_id"))) \
    .distinct()

# [('user1_id', integer1), ... ('userN_id', integerN)]
user_ids_rdd = unique_users.rdd.map(lambda row: row["User_id"]).zipWithIndex()
N = user_ids_rdd.count()
print(f"There are {N} unique users")

There are 15435 unique users


Now there could be two ways of creating the $(i, j)$ couples:
1. from ```user_ids_rdd``` a Dataframe with schema ```[User_id, Integer_value]``` could be created, being able to associate each ```Integer_value``` both to ```User_id_1``` and ```User_id_2``` using join operations.
2. convert ```user_ids_rdd``` in a dictionary which is broadcasted to every computing node of the cluster, so that becomes easy to retrieve the ```Integer_value``` extracting the value for the specific ```User_id``` key

Since the number of unique users is not expected to be _that high_, the second option is chosen

In [84]:
user_ids_dict = user_ids_rdd.collectAsMap()
bdcast = sc.broadcast(user_ids_dict)

In [85]:
# list of couples (node_src, node_dst)
edges = J_filtered.rdd.map(lambda row : (bdcast.value[row[2]], bdcast.value[row[1]]))

## **PageRank**

first of all we need to compute the adjacency list, so that we can initialize the transition matrix $M$ so that $M_{ij}$ = $\frac{1}{α}$, where $\alpha$ is the number of outgoing edges from node $j$ (if there's a link from $j$ to $i$)

In [86]:
# TODO: find a more efficient way

# list of couples (node_src, [iterable of dst nodes])
adjacency_list = edges.groupByKey()

now ```adjacency_list``` is a rdd in which each element is expressed ```(node, [neighbours])```. Since the transition matrix $M$ is heavily sparse, we are going to represent it using triplets $(i, j, M_{ij})$ only if $M_{ij} \neq 0$

**NOTE**: in this setting, we could potentially have many arcs from a certain node $A$ to another node $B$, because there could be many books for which user $B$ wrote a better review than user $A$.

The idea in this case is to collapse all the possibile arcs from $A$ to $B$ in a single arc, but weighting the associated pageRank initial value according to the actual number of books for which $B$ obtained a better score with respect to $A$.

So triplets are actually stored in the form $((i, j), M_{ij})$, so that it becomes easy to group triples with the same key $(i, j)$ and summing up all the contributes $M_{ij}$ associated with the same src-dest nodes.

_Example:_
- ```edges = [(A, B), (A, B), (A, C)]```
- ```adjacency_list = [A, [B, B, C]]```
- ```
triplets (before grouping) = [
    ((B, A), 1/3)
    ((B, A), 1/3)
    ((C, A), 1/3)
]
```
- ```
triplets (after grouping) = [
    ((B, A), 2/3)
    ((C, A), 1/3)
]
```



In [87]:
# if el is an element of the adjacency_list rdd,
# el[0] => node
# el[1] => list of neighbours of that node

# NOTE: now the semantics of the triplets is (i, j, Mij) => (dst_node, src_node, value)
# it's a bit counterintuitive, but COLUMNS REPRESENT SOURCE NODES, while ROWS REPRESENT DESTINATION NODES

triplets = adjacency_list.flatMap(
    lambda el: [((neighbour, el[0]), 1.0/len(el[1])) for neighbour in el[1] if len(el[1]) > 0]
).reduceByKey(lambda x, y: x + y)

In [88]:
print(f"transition graph has {N} nodes and {triplets.count()} edges")

transition graph has 15435 nodes and 2049866 edges


In [89]:
# mapping back to canonical (i, j, Mij) form, for semplicity (and for coherence with lecture notes)
M = triplets.map(lambda triplet: (triplet[0][0], triplet[0][1], triplet[1])).cache()

In [90]:
# CHECK: M should be column-wise stochastics, so column values should sum up to 1
# m[1] -> column index
# m[2] -> initial pageRank score for i, j nodes
check = M.map(lambda m: (m[1], m[2])).reduceByKey(lambda x, y: x+y)
# now we have key-value pairs such that key => column index j and value = SUM(M[i,j]) for i = 0, ..., # rows - 1.
# Check if some values are far from 1 (with a tolerance of epsilon)
epsilon = 1e-6
far_from_one = check.filter(lambda pair: abs(pair[1] - 1.0) > epsilon).count()
if far_from_one == 0: print(f"M is column-wise stochastics")
else: print(f"M IS NOT column-wise stochastics: there are {far_from_one} columns that doesn't sum up to 1")

M is column-wise stochastics


PageRank (with dumping factor β):
$$
\begin{equation}
    \begin{cases}
        v(0) = \frac{1}{N}\underline{1} \\
        v(t+1) = \beta Mv(t) + (1-\beta)\frac{1}{N}\underline{1}  
    \end{cases}\,
\end{equation}
$$

In [91]:
def pageRank(M, v, max_iterations=100, tolerance=10e-5, beta=0.8):
    iteration = 0
    while iteration < max_iterations:
        prev_v = v.copy()

        # broadcast v to each node of the cluster
        v_bdcast = sc.broadcast(v)

        # matrix - vector multiplication (distributed)
        pr_scores = M.map(lambda m: (m[0], m[2]*v_bdcast.value[m[1]])).reduceByKey(lambda x, y: x + y).collect()
        # update vector v (local)
        for (user, pr_score) in pr_scores: v[user] = beta * pr_score + (1 - beta) / N

        dist = np.linalg.norm(v - prev_v)

        if dist < tolerance:
            print(f"Convergence reached after {iteration} iterations with distance {dist}")
            break

        print(f"iteration {iteration}: distance = {dist}")
        iteration += 1
    return v

In [92]:
%%time
v = np.ones(N) / N

max_iterations = 100
tolerance = 10e-5
beta = 0.8

pg_scores = pageRank(M, v, max_iterations=max_iterations, tolerance=tolerance, beta=beta)

iteration 0: distance = 0.004508294288083763
iteration 1: distance = 0.001392548552744607
iteration 2: distance = 0.0005269557574718849
iteration 3: distance = 0.00022173774433604593
iteration 4: distance = 0.00011146929390574273
Convergence reached after 5 iterations with distance 5.992418102615781e-05
CPU times: user 522 ms, sys: 24.9 ms, total: 547 ms
Wall time: 26.3 s


In [93]:
k = 10
pagerank_top_k_users = np.argsort(v)[-k:][::-1]
print(f"PageRank top {k} users")
for user in pagerank_top_k_users: print(f"user id: {user} -> PageRank score: {v[user]}")

PageRank top 10 users
user id: 271 -> PageRank score: 0.0013815026485379056
user id: 6510 -> PageRank score: 0.0008588802397190114
user id: 2909 -> PageRank score: 0.0008068922060201434
user id: 8066 -> PageRank score: 0.0006907966501004309
user id: 6845 -> PageRank score: 0.0005986556961684647
user id: 4729 -> PageRank score: 0.0005938403834758282
user id: 3703 -> PageRank score: 0.0005904407361013377
user id: 5559 -> PageRank score: 0.0005904407361013377
user id: 8153 -> PageRank score: 0.0005904407361013377
user id: 6841 -> PageRank score: 0.0005764279607395862


## **HITS (Hubbs and Authorities)**

For this algorithm we need two matrices, build from the graph:
1. $L$, a $N \times N$ matrix in which $L_{ij} = 1$ if there's a link from node $i$ to node $j$ in the graph, $0$ otherwise
2. $L^T$, a $N \times N$ matrix in which $L_{ij} = 1$ if there's a link from node $j$ to node $i$ in the graph, $0$ otherwise

In [26]:
# edges contains couples (node_src, node_dst), so it's easy to represent both L and its transposed in triplets form starting from edges
L = edges.map(lambda couple: (couple[0], couple[1], 1.0)).cache()
L_T = edges.map(lambda couple: (couple[1], couple[0], 1.0)).cache()

In [27]:
def HITS(L, L_T, authorities, hubs, max_iterations=100, tolerance=10e-5, beta=0.8):
    iteration = 0
    while iteration < max_iterations:
        prev_hubs = hubs.copy()
        prev_authorities = authorities.copy()

        # broadcast hubs and authorities to each node of the cluster
        hubs_bdcast = sc.broadcast(hubs)
        authorities_bdcast = sc.broadcast(authorities)


        # AUTHORITIES UPDATE
        # matrix - vector multiplication (distributed)
        authority_scores = L_T.map(lambda l: (l[0], l[2]*hubs_bdcast.value[l[1]])).reduceByKey(lambda x, y: x + y).collect()
        # update authorities vector (local)
        for (user, auth_score) in authority_scores: authorities[user] = auth_score
        # authorities vector normalization
        authorities /= np.linalg.norm(authorities)

        # HUBS UPDATE
        # matrix - vector multiplication (distributed)
        hubs_scores = L.map(lambda l: (l[0], l[2]*authorities_bdcast.value[l[1]])).reduceByKey(lambda x, y: x + y).collect()
        # update hubs vector (local)
        for (user, hub_score) in hubs_scores: hubs[user] = hub_score
        # hubs vector normalization
        hubs /= np.linalg.norm(hubs)

        hub_dist = np.linalg.norm(hubs - prev_hubs)
        auth_dist = np.linalg.norm(authorities - prev_authorities)

        if hub_dist < tolerance and auth_dist < tolerance:
            print(f"Convergence reached after {iteration} iterations with hubs distance = {hub_dist} | authorities distance = {auth_dist}")
            break
        print(f"iteration {iteration}: hubs distance = {hub_dist} | authorities distance = {auth_dist}")

        iteration += 1

    return authorities, hubs

In [28]:
%%time

# authority and hub vectors are indexed with users
authorities = np.ones(N)
hubs = np.ones(N)

#authorities, hubs = HITS(L, L_T, authorities, hubs)

CPU times: user 103 µs, sys: 13 µs, total: 116 µs
Wall time: 120 µs


In [29]:
#HITS_top_k_users = np.argsort(authorities)[-k:][::-1]
#print(f"HITS top {k} users")
#for user in HITS_top_k_users: print(f"user id: {user} -> Authority score: {authorities[user]}")