In [1]:
from google.cloud import aiplatform
from google.colab import auth
from google.cloud import aiplatform
PROJECT_ID = 'xxx'
LOCATION = 'asia-southeast1'

auth.authenticate_user()
aiplatform.init(project=PROJECT_ID)

In [2]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.2.0-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.org (108.157.173.52)] [                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,0

In [None]:
!python --version

Python 3.10.12


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

import findspark
findspark.init()

In [4]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import math
import random
from pprint import pprint
from matplotlib.lines import Line2D

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType,BinaryType,StringType,DecimalType




from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics


SEED = 1492


In [5]:
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT_ID)

sql = """
SELECT
  user_pseudo_id AS user_id,
  item.item_id,
  item.item_name,
  COUNT(*) AS rating
FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*`,
  UNNEST(items) AS item
WHERE event_name IN ('purchase', 'add_to_cart', 'begin_checkout', 'select_item')
GROUP BY
  user_pseudo_id,
  item.item_id,
  item.item_name;
"""

df = bq.query(sql).to_dataframe()
df.head()

Unnamed: 0,user_id,item_id,item_name,rating
0,56113571.74611874,9195831,Google Campus Bike Bottle,1
1,5565016.922759807,9195244,Google Infant Charcoal Onesie,1
2,7124838.022850932,9197015,Google Austin Campus Ladies Tee,1
3,4067712.1364874127,9196383,Google Mural Bottle,1
4,4511811.399059849,9188203,Google Clear Pen 4-Pack,9


In [6]:
# Function to encode user ID
def encode_user_id(user_id):
    global user_counter
    if user_id not in user_id_map:
        user_id_map[user_id] = user_counter
        user_counter += 1
    return user_id_map[user_id]

# Function to encode item ID
def encode_item_id(item_id):
    global item_counter
    if item_id not in item_id_map:
        item_id_map[item_id] = item_counter
        item_counter += 1
    return item_id_map[item_id]

user_id_map = {}
item_id_map = {}
user_counter = 1
item_counter = 1

In [7]:
df['user_id'] = df['user_id'].apply(encode_user_id)
df['item_id'] = df['item_id'].apply(encode_item_id)
df.head()

Unnamed: 0,user_id,item_id,item_name,rating
0,1,1,Google Campus Bike Bottle,1
1,2,2,Google Infant Charcoal Onesie,1
2,3,3,Google Austin Campus Ladies Tee,1
3,4,4,Google Mural Bottle,1
4,5,5,Google Clear Pen 4-Pack,9


In [19]:
spark = SparkSession.builder \
    .master("local[*]")\
    .appName("ALS Recommend") \
    .getOrCreate()

In [9]:
COL_USER = "user_id"
COL_ITEM = "item_id"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_ITEM_NAME="item_name"

In [10]:
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, IntegerType()),
    )
)

In [11]:
df_map=df.drop(columns=['user_id','rating'])
df.drop(columns=['item_name'],inplace=True)

In [12]:
spark_df = spark.createDataFrame(df, schema=schema)

In [None]:
spark_df.show(5)

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      1|      1|     1|
|      2|      2|     1|
|      3|      3|     1|
|      4|      4|     1|
|      4|      5|     1|
+-------+-------+------+
only showing top 5 rows



In [13]:
df_train, df_test = spark_df.randomSplit([0.8, 0.2],seed=SEED)

In [16]:
%%time
model_ALS = ALS(maxIter=20, rank=20, regParam=0.05, nonnegative=True,
              userCol='user_id', itemCol='item_id', ratingCol='rating',
              coldStartStrategy='drop', implicitPrefs=False, seed=SEED).fit(df_train)

CPU times: user 422 ms, sys: 79.4 ms, total: 502 ms
Wall time: 55.5 s


In [20]:
pred = model_ALS.transform(df_test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(pred)
print('RMSE (Test Set):', rmse)

RMSE (Test Set): 1.5594013973790322


In [21]:
schema_item = StructType(
    (
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_ITEM_NAME, StringType()),
    )
)
item_df = spark.createDataFrame(df_map,schema_item)

In [62]:
def recommend_item(userId,limit):
  test =  model_ALS.recommendForAllUsers(limit).filter(col('user_id')==userId).select("recommendations").collect()
  topitem = []
  for item in test[0][0]:
    topitem.append(item.item_id)
  topitem = pd.DataFrame(topitem, columns=['item_id'])
  item_schema = StructType(
      (
          StructField(COL_ITEM, IntegerType()),
      )
      )
  items= spark.createDataFrame(topitem,schema=item_schema)
  final=items.join(item_df, on='item_id', how='left').dropDuplicates()
  return final.show()

In [63]:
recommend_item(1000,5)

+-------+--------------------+
|item_id|           item_name|
+-------+--------------------+
|    376|Android Lumberjac...|
|    819|  Gift Card - $50.00|
|    863|  Gift Card - $25.00|
|   1266|Google Land & Sea...|
|   1366|Google Badge Heav...|
+-------+--------------------+

