In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder \
        .appName("Recommender System") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .getOrCreate()

In [3]:
data = spark.read.csv("s3a://datawarehouse/ml_data.csv", header=True, inferSchema=True)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [4]:
# Assuming 'user_id' and 'product_id' need to be integer types
train_data = train_data.withColumn("user_id", col("user_id").cast("integer"))
train_data = train_data.withColumn("product_id", col("product_id").cast("integer"))


In [5]:
from pyspark.sql.functions import when, col

# Add a 'rating' column to train_data
train_data = train_data.withColumn('rating', when(col('event_type') == 'purchase', 1).otherwise(0))

# Add a 'rating' column to test_data
test_data = test_data.withColumn('rating', when(col('event_type') == 'purchase', 1).otherwise(0))


In [6]:
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="product_id", ratingCol="rating")
model = als.fit(train_data)


In [11]:
from pyspark.sql.functions import isnan

# Remove rows with null predictions
predictions = predictions.na.drop(subset=["prediction"])

# You may also filter out any NaN values if they exist
predictions = predictions.filter(~isnan(col("prediction")))

# Now evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 0.13256806102134328


In [8]:
model.save("recommendation_model")


In [10]:
from pyspark.ml.recommendation import ALSModel
model = ALSModel.load("recommendation_model")
