<a href="https://colab.research.google.com/github/doinav/Algorithms-For-Massive-Data/blob/main/LINK_ANALYSIS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import sys
import os

In [2]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=1a59bb10b4ead5629582922cb9f81e9c7bc64abab654c591db1d34caf626db7b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
!rm -rf spark-3.5.0-bin-hadoop3.tgz
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!ls -l spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

-rw-r--r-- 1 root root 400395283 Sep  9 02:10 spark-3.5.0-bin-hadoop3.tgz


In [5]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [6]:
import findspark
findspark.init("/content/spark-3.5.0-bin-hadoop3") #SPARK_HOME

In [7]:
os.environ['KAGGLE_USERNAME'] = "doinav"
os.environ['KAGGLE_KEY'] = "******"
!kaggle datasets download -d cynthiarempel/amazon-us-customer-reviews-dataset

Downloading amazon-us-customer-reviews-dataset.zip to /content
100% 20.9G/21.0G [03:41<00:00, 159MB/s]
100% 21.0G/21.0G [03:41<00:00, 102MB/s]


In [8]:
!unzip -j /content/amazon-us-customer-reviews-dataset.zip amazon_reviews_us_Books_v1_02.tsv

Archive:  /content/amazon-us-customer-reviews-dataset.zip
  inflating: amazon_reviews_us_Books_v1_02.tsv  


In [9]:
import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as f
from pyspark.sql import Window

spark = SparkSession.builder \
                    .appName("Link Analysis") \
                    .getOrCreate()
spark

In [10]:
# Import the dataset as RDD and remove the header
books = spark.sparkContext.textFile('amazon_reviews_us_Books_v1_02.tsv', minPartitions=8) # import as rdd dataset

header = books.first()
books = books.filter(lambda line: line != header)

In [11]:
# Pre-process data by splitting the columns, retrieve column 1 (customer id) and column 2 (product id), sample 10% of it
data = books.map(lambda x: (x.split('\t')[1], x.split('\t')[3])).sample(False, 0.1, 125)

In [12]:
# Count the number of rows in the sampled df
data.count()

309998

In [13]:
# Group the data by costumer id -> map product ids into lists, and filter out customer ids having reviewed only one book
df = data.groupByKey().mapValues(list)
filtered = df.filter(lambda x: len(x[1]) > 1)

In [14]:
# Compute the total number of nodes (distinct book id)
tot = data.map(lambda x: x[1]).distinct().count()

In [15]:
# Compute the edges between books
def calculate_linkages(data):
    key, values = data
    combine = [(v1, v2) for i, v1 in enumerate(values) for v2 in values[i + 1:]]
    add = [(v2, v1) for (v1, v2) in combine]
    return (combine + add)

In [16]:
# Compute the list of edges
links = filtered.map(lambda x: calculate_linkages(x)).flatMap(lambda value: value)

# Calculate out-degree for each node
id2degree = links.countByKey()

# Sort the items in the defaultdict by key in descending order
sorted_items = sorted(id2degree.items(), key=lambda x: x[1], reverse=True)

In [17]:
# Create the transportation matrix and its transposed
P = links.map(lambda x: (x[0], x[1], 1 / id2degree[x[0]])) #(i, j, Mij)
PT = P.map(lambda x: (x[1], x[0], x[2])) #(j, i , Mij)

In [18]:
# Calculate the initial probability
def calculate_probability(degrees, total):
    prob = 1 / total
    p_i = {item: prob for item in degrees.keys()}
    return p_i

p_i = calculate_probability(id2degree, tot)

In [19]:
# Exploit the power method for PageRank by iteratively updating the probabilities
for i in range(50):
    new_p = PT.map(lambda x:(x[0], (x[2]*p_i[x[1]])))\
              .reduceByKey(lambda x,y: x+y)\
              .collect()\

    for idx,prb in new_p:
        p_i[idx] = prb

    print(f"iteration {i}")

iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 14
iteration 15
iteration 16
iteration 17
iteration 18
iteration 19
iteration 20
iteration 21
iteration 22
iteration 23
iteration 24
iteration 25
iteration 26
iteration 27
iteration 28
iteration 29
iteration 30
iteration 31
iteration 32
iteration 33
iteration 34
iteration 35
iteration 36
iteration 37
iteration 38
iteration 39
iteration 40
iteration 41
iteration 42
iteration 43
iteration 44
iteration 45
iteration 46
iteration 47
iteration 48
iteration 49


In [20]:
# Save the results into a dictionary
d = dict(new_p)

# Sort it by product in ascending order
sorted_d = dict(sorted(d.items(), key=lambda item: item[0], reverse=False))

In [21]:
# Sort and print the most recurrent products
sorted_p_i = sorted(p_i.items(), key=lambda x: x[1], reverse=True)[:20]
print('Most quoted products:')
for item, prob in sorted_p_i:
    print(f'With prob: {prob}, you take product with code: {item}')

Most quoted products:
With prob: 0.00011288455081260919, you take product with code: 0373250517
With prob: 0.0001063244794487078, you take product with code: 0345458931
With prob: 0.00010014309221280358, you take product with code: 0399152180
With prob: 8.79062811490297e-05, you take product with code: 0385504209
With prob: 8.46592070356197e-05, you take product with code: 0385336675
With prob: 8.234647965291267e-05, you take product with code: 0385333927
With prob: 8.138651979442579e-05, you take product with code: 043935806X
With prob: 8.043795194239667e-05, you take product with code: 0767908171
With prob: 7.940514697075e-05, you take product with code: 0312252617
With prob: 7.871454788775515e-05, you take product with code: 0451524934
With prob: 7.812703171011327e-05, you take product with code: 0375826688
With prob: 7.753878908295939e-05, you take product with code: 0684801221
With prob: 7.664525046212533e-05, you take product with code: 0316769487
With prob: 7.596632953785859e-05

In [22]:
# Define the list of values you want to filter on in column 3
values_to_filter = []

for index, (item, prob) in enumerate(sorted_p_i):
    if index > 10:
        break
    values_to_filter.append(item)

In [23]:
# Show the first 10 products by page rank and associated title
names = books.filter(lambda x: x.split('\t')[3] in values_to_filter).map(lambda x: (x.split('\t')[3], x.split('\t')[5]))

for i in values_to_filter:
  name = names.filter(lambda x: x[0] == i)
  print(name.take(1))

[('0373250517', 'Spitting Feathers (Red Dress Ink)')]
[('0345458931', 'Body Double')]
[('0399152180', 'Melancholy Baby (A Sunny Randall Novel)')]
[('0385504209', 'The Da Vinci Code')]
[('0385336675', 'The Enemy (Jack Reacher, No. 8)')]
[('0385333927', 'Pagan Babies')]
[('043935806X', 'Harry Potter and the Order of the Phoenix (Book 5)')]
[('0767908171', 'A Short History of Nearly Everything')]
[('0312252617', 'Fast Women')]
[('0451524934', '1984 (Signet Classics)')]
[('0375826688', 'Eragon (Inheritance)')]


TOPIC SENSITIVE PAGE RANK using book rankings

In [24]:
# The same dataset as before, now also containing the ratings for each reviewed product
datats = books.map(lambda x: (x.split('\t')[1], x.split('\t')[3], x.split('\t')[7])).sample(False, 0.1, 125)

In [25]:
# Group the data by costumer id and map values into a list of tuples (product_id, rating)
dfts = datats.groupBy(lambda x: x[0]).mapValues(lambda values: [(value[1], value[2]) for value in values])

In [26]:
# Filter out dead ends
filteredts = dfts.filter(lambda x: len(x[1]) > 1)

In [27]:
# From the filtered df retrieve the list of tuples (product_id, rating)
S = filteredts.flatMap(lambda x: x[1])

In [28]:
# Compute the average rating per book
# - Convert rating to int and create a count
# - Sum ratings and count for each product
# - Divide sum of ratings by count to get average
average_ratings = S \
                  .map(lambda x: (x[0], (int(x[1]), 1))) \
                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                  .mapValues(lambda x: round(float(x[0] / x[1]), 3)) \
                  .sortBy(lambda x: x[0], ascending = True)

In [29]:
# Filter out nodes having a rating greater or equal to 4
S_above3 = average_ratings.filter(lambda x: float(x[1]) >= 4)
# Cardinality of the set of topic-sensitive products
S_count_above3 = S_above3.count()

In [30]:
# Compute the topic-sensitive vector: each book is assigned value 1 if the book is topic-sensitive, 0 otherwise
e_S_above3 = average_ratings\
              .map(lambda x: int(float(x[1])>= 4))\
              .collect()

In [31]:
# Compute the Topic Sensitive PR for each item
def TopicSensitivePR(d, e, n):
    TSPR = {}
    beta = 0.8 #0.6
    for (key_pr, pagerank), element in zip(d.items(), e):
      new_value = 0
      new_value += beta * pagerank + (1 - beta) * element / n
      TSPR[key_pr] = new_value
    return TSPR

In [32]:
TSPR_above3 = TopicSensitivePR(sorted_d, e_S_above3, S_count_above3)

In [33]:
#Sort and print the most recurrent products
sorted_TSPR_above3= sorted(TSPR_above3.items(), key=lambda x:x[1], reverse = True)[:10]
print('Most quoted products:')
for item, prob in sorted_TSPR_above3:
    print(f'With prob: {prob}, you take product with code: {item}')

Most quoted products:
With prob: 9.35242641603886e-05, you take product with code: 0373250517
With prob: 8.827620706926749e-05, you take product with code: 0345458931
With prob: 8.333109728054411e-05, you take product with code: 0399152180
With prob: 7.094398913879701e-05, you take product with code: 0385336675
With prob: 7.032502491922376e-05, you take product with code: 0385504209
With prob: 6.832583934584187e-05, you take product with code: 043935806X
With prob: 6.756698506421859e-05, you take product with code: 0767908171
With prob: 6.674074108690124e-05, you take product with code: 0312252617
With prob: 6.618826182050536e-05, you take product with code: 0451524934
With prob: 6.587718372233013e-05, you take product with code: 0385333927


In [34]:
# Define the list of values you want to filter on in column 3
values_to_filter = []
for index, (item, prob) in enumerate(sorted_TSPR_above3):
    if index > 10:
        break
    values_to_filter.append(item)

In [35]:
# Join the books titles and ratings on book code
names_TSPR_above3 = books.filter(lambda x: x.split('\t')[3] in values_to_filter).map(lambda x: (x.split('\t')[3], x.split('\t')[5], x.split('\t')[7])).distinct()
join_above3 = names_TSPR_above3.join(average_ratings)

In [36]:
# Show the resulting RDD, containing book code, title and average rating
for i in values_to_filter:
  name = join_above3.filter(lambda x: x[0] == i)
  print(name.take(1))

[('0373250517', ('Spitting Feathers (Red Dress Ink)', 5.0))]
[('0345458931', ('Body Double', 4.167))]
[('0399152180', ('Melancholy Baby (A Sunny Randall Novel)', 4.0))]
[('0385336675', ('The Enemy (Jack Reacher, No. 8)', 4.429))]
[('0385504209', ('The Da Vinci Code', 3.841))]
[('043935806X', ('Harry Potter and the Order of the Phoenix (Book 5)', 4.464))]
[('0767908171', ('A Short History of Nearly Everything', 4.706))]
[('0312252617', ('Fast Women', 4.6))]
[('0451524934', ('1984 (Signet Classics)', 4.794))]
[('0385333927', ('Pagan Babies', 3.625))]


In [37]:
# Filter out nodes having a rating larger than 4
S_less4 = average_ratings.filter(lambda x: int(x[1]) < 4)

# Cardinality of the set of topic-sensitive products
S_count_less4 = S_less4.count()

In [38]:
# Compute the topic-sensitive vector: each book is assigned value 1 if the book is topic-sensitive, 0 otherwise
e_S_less4 = average_ratings\
              .map(lambda x: int(int(x[1]) <= 3))\
              .collect()

In [39]:
TSPR_less4 = TopicSensitivePR(sorted_d, e_S_less4, S_count_less4)

In [40]:
#Sort and print the most recurrent products
sorted_TSPR_less4 = sorted(TSPR_less4.items(), key=lambda x:x[1], reverse = True)[:10]
print('Most quoted products:')
for item, prob in sorted_TSPR_less4:
    print(f'With prob: {prob}, you take product with code: {item}')

Most quoted products:
With prob: 9.030764065008736e-05, you take product with code: 0373250517
With prob: 8.505958355896625e-05, you take product with code: 0345458931
With prob: 8.188171046181015e-05, you take product with code: 0385504209
With prob: 8.011447377024287e-05, you take product with code: 0399152180
With prob: 7.743386926491652e-05, you take product with code: 0385333927
With prob: 7.4058310910677e-05, you take product with code: 0375826688
With prob: 7.232974917287325e-05, you take product with code: 039914563X
With prob: 7.122889479081463e-05, you take product with code: 0871138646
With prob: 6.772736562849577e-05, you take product with code: 0385336675
With prob: 6.686930175456411e-05, you take product with code: 0312873441


In [41]:
# Define the list of values you want to filter on in column 3
values_to_filter = []
for index, (item, prob) in enumerate(sorted_TSPR_less4):
    if index > 10:
        break
    values_to_filter.append(item)

In [42]:
# Join the code and book name with the average rating
names_TSPR_less4 = books.filter(lambda x: x.split('\t')[3] in values_to_filter).map(lambda x: (x.split('\t')[3], x.split('\t')[5])).distinct()
join_less4 = names_TSPR_less4.join(average_ratings)

In [43]:
# Show the resulting RDD, containing book code, title and average rating
for i in values_to_filter:
  name = join_less4.filter(lambda x: x[0] == i)
  print(name.take(1))

[('0373250517', ('Spitting Feathers (Red Dress Ink)', 5.0))]
[('0345458931', ('Body Double', 4.167))]
[('0385504209', ('The Da Vinci Code', 3.841))]
[('0399152180', ('Melancholy Baby (A Sunny Randall Novel)', 4.0))]
[('0385333927', ('Pagan Babies', 3.625))]
[('0375826688', ('Eragon (Inheritance)', 3.708))]
[('039914563X', ('The Bear and the Dragon', 2.2))]
[('0871138646', ('Old Flames', 3.5))]
[('0385336675', ('The Enemy (Jack Reacher, No. 8)', 4.429))]
[('0312873441', ('Running On Instinct', 3.5))]
