# PySpark Setup

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!cp drive/MyDrive/MMDS-data/spark/spark-3.1.1-bin-hadoop3.2.tgz .
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Task1').getOrCreate()

# Main

## Read data

In [None]:
data_path = '/content/drive/MyDrive/MMDS-data/ratings2k.csv'

In [None]:
df = spark.read \
          .csv(data_path, header=True, inferSchema=True)

## Required Libraries

In [None]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, IndexedRow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg import Vectors, DenseMatrix
from pyspark.sql import Row
from pyspark.sql import Window

## Main process

In [None]:
# Exclude column 'index'
# Group by 'item' to prepare for pivoting
# Pivot the DataFrame to turn unique users into columns and their ratings into values
# Aggregate to handle multiple ratings from the same user for an item (if any)
# Fill in missing values with 0 to indicate the absence of a rating
um = df.select('user', 'item', 'rating') \
        .groupBy('user') \
        .pivot('item') \
        .agg(F.first('rating')) \
        .na.fill(0.0) \
        .sort('user')

# Merge user columns into a vector column
um = VectorAssembler(inputCols=um.columns[1:],
                     outputCol='features').transform(um)

# Converts vector columns in an input DataFrame to the pyspark.mllib.linalg.Vector type
# from the new pyspark.ml.linalg.Vector type
um = MLUtils.convertVectorColumnsFromML(um, 'features')

In [None]:
um.show(3)

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--

In [None]:
# Create RowMatrix using the 'features' column
rdd = um.select('features') \
        .rdd \
        .map(lambda row: row.features)
row_matrix = RowMatrix(rdd)

In [None]:
svd = row_matrix.computeSVD(32, computeU=True)

In [None]:
U = svd.U
s = svd.s
V = svd.V

In [None]:
# Map queries into 'concept space' to infer the concept ID for each user
concept_user_rm = row_matrix.multiply(V)

# The index of the highest number in each row is the concept ID of the respective user
concept_user_indices = concept_user_rm.rows.map(lambda row: int(row.argmax()))

# Convert to dataframe and merge with the utility_matrix dataframe to get 'user' column
concept_user = concept_user_indices.map(lambda index: Row(concept_id=index)).toDF()
window_spec = Window.orderBy(F.lit(1))
df_concept_user = concept_user.withColumn('key', F.row_number().over(window_spec)) \
                              .join(um.withColumn('key', F.row_number().over(window_spec)), on='key', how='inner') \
                              .select('user', 'concept_id')

In [None]:
df_concept_user.show(5)

+----+----------+
|user|concept_id|
+----+----------+
|   1|        19|
|   2|        16|
|   3|         3|
|   4|        17|
|   5|         4|
+----+----------+
only showing top 5 rows



In [None]:
# Create an IndexedRowMatrix from RowMatrix U
indexed_rows = U.rows.zipWithIndex().map(lambda x: IndexedRow(x[1], x[0]))
indexed_row_matrix = IndexedRowMatrix(indexed_rows)

# Convert IndexedRowMatrix to BlockMatrix
block_matrix = indexed_row_matrix.toBlockMatrix()

# Convert BlockMatrix to DenseMatrix
U_dense_matrix = block_matrix.toLocalMatrix()

In [None]:
# Apply the same process above to infer the concept ID for each item
um_T = df.select('user', 'item', 'rating') \
          .groupBy('item') \
          .pivot('user') \
          .agg(F.first('rating')) \
          .na.fill(0.0) \
          .sort('item')
um_T = VectorAssembler(inputCols=um_T.columns[1:],
                       outputCol='features').transform(um_T)
um_T = MLUtils.convertVectorColumnsFromML(um_T, 'features')
rdd_T = um_T.select('features') \
            .rdd \
            .map(lambda row: row.features)
row_matrix_T = RowMatrix(rdd_T)

concept_item_rm = row_matrix_T.multiply(U_dense_matrix)
concept_item_indices = concept_item_rm.rows.map(lambda row: int(row.argmax()))

concept_item = concept_item_indices.map(lambda index: Row(concept_id=index)).toDF()
window_spec = Window.orderBy(F.lit(1))
df_concept_item = concept_item.withColumn('key', F.row_number().over(window_spec)) \
                              .join(um_T.withColumn('key', F.row_number().over(window_spec)), on='key', how='inner') \
                              .select('item', 'concept_id')

In [None]:
df_concept_item.show(5)

+----+----------+
|item|concept_id|
+----+----------+
|   0|         1|
|   1|         3|
|   2|         7|
|   3|         5|
|   4|         2|
+----+----------+
only showing top 5 rows



In [None]:
# Count users for each concept
user_count = df_concept_user.groupBy('concept_id').agg(F.count('user').alias('user_count'))

# Count items for each concept
item_count = df_concept_item.groupBy('concept_id').agg(F.count('item').alias('item_count'))

# Join the counts
df_concept_counts = user_count.join(item_count, on='concept_id', how='inner')

# Compute the portions
df_concept_portion = df_concept_counts.withColumn(
    'user_portion', F.col('user_count') / F.sum('user_count').over(Window.partitionBy())) \
                                      .withColumn(
    'item_portion', F.col('item_count') / F.sum('item_count').over(Window.partitionBy()))

In [None]:
df_concept_portion.show(5)

+----------+----------+----------+------------------+--------------------+
|concept_id|user_count|item_count|      user_portion|        item_portion|
+----------+----------+----------+------------------+--------------------+
|         1|        11|        11|0.1506849315068493|0.024336283185840708|
|         2|         3|        33|0.0410958904109589| 0.07300884955752213|
|         3|        14|        26|0.1917808219178082| 0.05752212389380531|
|         4|         4|        75|0.0547945205479452| 0.16592920353982302|
|         5|         2|        61|0.0273972602739726| 0.13495575221238937|
+----------+----------+----------+------------------+--------------------+
only showing top 5 rows



In [None]:
# Convert DenseVector s into DenseMatrix
s_matrix = DenseMatrix(len(s), len(s), [s[i] if i == j else 0.0 for i in range(len(s)) for j in range(len(s))])

# Get the user embeddings
embedding_user = U.multiply(s_matrix)

# Convert to dataframe, each row is also convert to string format to save
df_embedding_user = spark.createDataFrame(embedding_user.rows.map(lambda vector: (vector.toArray().tolist(),)), ['embedding']) \
                          .withColumn("embedding", F.expr("concat_ws(',', embedding)"))

In [None]:
df_embedding_user.show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|embedding                                                                                                                                                                                                                                                                                                                                                                         

In [None]:
df_embedding_user.coalesce(1) \
                  .write \
                  .csv('df_embedding_user', header=True)