<a href="https://colab.research.google.com/github/czephyr/AMD_project/blob/main/colab_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [None]:
!pip install -q findspark
!pip install -q pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
os.environ['KAGGLE_USERNAME'] = "xxxxxxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxxxxxxxxxxx"
!kaggle datasets download -f amazon_reviews_us_Baby_v1_00.tsv cynthiarempel/amazon-us-customer-reviews-dataset
!kaggle datasets download -f amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv cynthiarempel/amazon-us-customer-reviews-dataset
!kaggle datasets download -f amazon_reviews_us_Digital_Video_Games_v1_00.tsv cynthiarempel/amazon-us-customer-reviews-dataset

In [None]:
import zipfile
with zipfile.ZipFile("amazon_reviews_us_Baby_v1_00.tsv.zip", 'r') as zip_ref:
    zip_ref.extractall(".")
with zipfile.ZipFile("amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv.zip", 'r') as zip_ref:
    zip_ref.extractall(".")
with zipfile.ZipFile("amazon_reviews_us_Digital_Video_Games_v1_00.tsv.zip", 'r') as zip_ref:
    zip_ref.extractall(".")

### Data to adjacency list

In [None]:
# the dataset is composed by Baby items, Digital Music and Digital Videogames
# RDD (customer,product)
tsv0 = spark.read.option("header", "true").option("sep", "\t").csv("amazon_reviews_us_Baby_v1_00.tsv").select(["customer_id","product_parent"]).rdd
tsv1 = spark.read.option("header", "true").option("sep", "\t").csv("amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv").select(["customer_id","product_parent"]).rdd
tsv2 = spark.read.option("header", "true").option("sep", "\t").csv("amazon_reviews_us_Digital_Video_Games_v1_00.tsv").select(["customer_id","product_parent"]).rdd
tsv = tsv0.union(tsv1).union(tsv2)
data = tsv
print(f"Rows in the dataset: {data.count()}")
data.take(1)

In [None]:
data.take(2)

In [None]:
# Extract id of Digital Videogames products in the database
video_games = tsv2.map(lambda x: x[1]).distinct()
video_games.take(2)

In [None]:
# Extract id of Baby products in the database
baby = tsv0.map(lambda x: x[1]).distinct()
baby.take(2)

In [None]:
# Extract id of Digital Music products in the database
music = tsv1.map(lambda x: x[1]).distinct()
music.take(2)

In [None]:
#### CELL 5
# Create couples of items purchased by the same customer
# RDD [(customer1, (product1,product2)),(customer2, (product3,product4)),(...)]
data_joined = data.join(data)
data_joined.take(2)

In [None]:
#### CELL 6
# Remove duplicate item couples created by the join
# RDD [(customer1, (product1,product2)),(customer2, (product3,product4)),(...)]
data_filtered = data_joined.filter(lambda x : x[1][0]!= x[1][1])
data_filtered.take(2)

In [None]:
#### CELL 7
# Create list of links between products
# RDD [(product1,product2),(product3,product4),(...))]
edge_list = data_filtered.map(lambda x : x[1])
edge_list.take(2) 

In [None]:
#### CELL 8
# Make second item a list
# RDD [(node1, [node2]),(node1, [node2]),(...)]
adjacency1 = edge_list.mapValues(lambda v: [v])

# RDD [(node1,[node2,node3,...]),(...)]
adjacency_list = adjacency1.reduceByKey(lambda a,b: a + b)

adjacency_list.take(1)

### Calculate pagerank

In [None]:
#### CELL 9
def calculate_initial_contribs(row):
    """
    This function takes elements from the joined dataset above and
    computes the contribution to each outgoing link based on the
    current rank.
    """
    item, array = row
    num_edges = len(array)
    for node in array:
        yield item, node, 1/num_edges

# transition matrix stored as triplets
# RDD [(product1,(product2,probability)),(product3,(product4,probability))]
trans_matrix = adjacency_list.flatMap(calculate_initial_contribs).cache()
trans_matrix.take(10) 

In [None]:
# Product names
keys = trans_matrix.map(lambda x : x[0]).distinct().collect()
# map product names to integers
indexmap = dict(zip(keys, range(len(keys))))

# map items to indexes
# RDD [(index_product1,(index_product2,probability)),(index_product3,(index_product4,probability))]
hashed_matrix = trans_matrix.map(lambda x: (indexmap[x[0]],(indexmap[x[1]],x[2])))
hashed_matrix.take(2)

In [None]:
import numpy as np

# Array of pagerank stored in memory
n = len(keys) # num of products in adjacency list
pgrnk = np.ones(n)/n # time t
previus_pgrnk = np.ones(n) # time -1
     
# MSE distance between previous pgrank iteration
def l2distance(v, q):   
    return sum([(q_el - v_el)**2 for v_el, q_el in zip(v, q)])

In [None]:
#### CELL 15
tolerance = 10e-10
max_iterations = 300
i = 0
tax = 0.85

while(l2distance(previus_pgrnk, pgrnk) >= tolerance and i < max_iterations):
    previus_pgrnk = np.copy(pgrnk)

    page_rank_values = (hashed_matrix
                    .mapValues(lambda v: tax*(v[1]*pgrnk[v[0]]) + (1-tax)*(1/n)) 
                    .reduceByKey(lambda a, b: a+b)
                    .sortByKey()
                    .collect()
                    )
  
    pgrnk = np.array([c for (i, c) in page_rank_values])
    i=i+1

In [None]:
# Avg pagerank
np.mean(pgrnk)

### Topic aware page rank

In [None]:
vgdict = video_games.collect()
vg_array = [1 if (x in vgdict) else 0 for x in indexmap]

In [None]:
# Reset arrays
n = len(keys) # num of products in adjacency list
pgrnk = np.ones(n)/n # time t
previus_pgrnk = np.ones(n) # time -1


In [None]:
#### CELL 18
tolerance = 10e-10
max_iterations = 300
i = 0
tax = 0.85

# size of the page in the topic set
len_topic = video_games.count()    

# if the page is in the selected topic set it's pgrank calculation will have
# the added weight from the teletrasportation
def calcPgRnk(row):
    return tax*(row[1]*pgrnk[row[0]]) + vg_array[row[0]]*(1-tax)*(1/len_topic)


while(l2distance(previus_pgrnk, pgrnk) >= tolerance or i < max_iterations):
  
    previus_pgrnk = np.copy(pgrnk) 
    page_rank_values = (hashed_matrix
                        .mapValues(calcPgRnk) 
                        .reduceByKey(lambda a, b: a+b) 
                        .sortByKey()
                        .collect()
                       )
  
    pgrnk = np.array([c for (i, c) in page_rank_values])
    i = i + 1

In [None]:
# Avg pgrnk for videogame products
sum = 0
for pg,vg in zip(pgrnk,vg_array):
    sum = pg*vg + sum 

sum/len_topic

In [None]:
# Avg pgrnk for products in the other two categories
sum_others = 0
for pg,vg in zip(pgrnk,vg_array):
    sum_others = pg*int(not(vg)) + sum_others

sum_others / (len(pgrnk)-len_topic)