In [1]:
from pyspark.ml import Transformer
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, FloatType, IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.functions import udf
import math

In [2]:
from pyspark.sql import SparkSession

team = 16
nworkers = 1 # was 3 !!!!
cores = 1
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .config('spark.executor.instances', nworkers)\
        .config("spark.executor.cores", cores)\
        .config("spark.executor.cpus", cores)\
        .config("spark.executor.memory", "1g")\
        .enableHiveSupport()\
        .getOrCreate()
spark

In [4]:
sc = spark.sparkContext
sc.addPyFile('Net.py')

In [5]:
data = spark.read.format("avro").table('team16_projectdb.ecom_part_buck')

In [6]:
class CyclicTransformer(Transformer):
    def __init__(self, input_col):
        super(CyclicTransformer, self).__init__()
        self.input_col = input_col

    def _transform(self, df):
        extract_year = F.udf(lambda x: x.year)
        extract_month = F.udf(lambda x: x.month)
        extract_day = F.udf(lambda x: x.day)
        extract_hour = F.udf(lambda x: x.hour)
        extract_minute = F.udf(lambda x: x.minute)
        extract_second = F.udf(lambda x: x.second)

        return df.withColumn('year', extract_year(self.input_col))\
                 .withColumn('month', extract_month(self.input_col))\
                 .withColumn('day', extract_day(self.input_col))\
                 .withColumn('hour', extract_hour(self.input_col))\
                 .withColumn('minute', extract_minute(self.input_col))\
                 .withColumn('second', extract_second(self.input_col))\
                 .withColumn('month_sin', F.sin(F.col('month') * 2 * math.pi / 12))\
                 .withColumn('month_cos', F.cos(F.col('month') * 2 * math.pi / 12))\
                 .withColumn('day_sin', F.sin(F.col('day') * 2 * math.pi / 31))\
                 .withColumn('day_cos', F.cos(F.col('day') * 2 * math.pi / 31))\
                 .withColumn('hour_sin', F.sin(F.col('hour') * 2 * math.pi / 24))\
                 .withColumn('hour_cos', F.cos(F.col('hour') * 2 * math.pi / 24))\
                 .withColumn('minute_sin', F.sin(F.col('minute') * 2 * math.pi / 60))\
                 .withColumn('minute_cos', F.cos(F.col('minute') * 2 * math.pi / 60))\
                 .withColumn('second_sin', F.sin(F.col('second') * 2 * math.pi / 60))\
                 .withColumn('second_cos', F.cos(F.col('second') * 2 * math.pi / 60))\
                 .drop('month').drop('day')\
                 .drop('hour').drop('minute').drop('second')

In [6]:
cyclic_trans = CyclicTransformer('event_time')
data = cyclic_trans.transform(data)
data = data.na.drop(subset=data.columns)
data = data.filter(data.brand != '')
data = data.filter(data.category_code != '')

In [7]:
event_type_to_rating = F.udf(lambda x: 1 if x == 'purchase' else 0 if x == 'cart' else -1, IntegerType())
data = data.withColumn('rating', event_type_to_rating('event_types')).drop('event_types')

In [8]:
brand_counts = data.groupBy("brand").count()
rare_brands = brand_counts.filter(F.col("count") < 10000).select("brand").rdd.flatMap(lambda x: x).collect()
data = data.withColumn("brand", F.when(F.col("brand").isin(rare_brands), "other").otherwise(F.col("brand")))

In [9]:
user_features = ['user_id']
item_features = ['product_id', 'category_code', 'brand', 'price']
session_features = ['year', 'month_sin', 'month_cos', 'day_sin', 'day_cos', 'hour_sin', 'hour_cos', 'minute_sin', 'minute_cos', 'second_sin', 'second_cos']
target = 'rating'

assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(data_filtered)
scaled_data = scalerModel.transform(data_filtered)

scaled_data = scaled_data.drop(*['year', 'price', 'year_vec', 'price_vec'])

udf_extract_double = udf(lambda vector: vector.tolist()[0], FloatType())
scaled_data = scaled_data.withColumn("year", udf_extract_double("year_scaled")).withColumn("price", udf_extract_double("price_scaled"))
scaled_data = scaled_data.drop(*['year_scaled','price_scaled','count', 'category_id'])price']
session_features = ['year', 'month_sin', 'month_cos', 'day_sin', 'day_cos', 'hour_sin', 'hour_cos', 'minute_sin', 'minute_cos', 'second_sin', 'second_cos']
target = 'rating'

In [10]:
user_interaction_counts = data.groupBy('user_id').count()
active_users = user_interaction_counts.filter(F.col('count') > 5)
data_filtered = data.join(active_users, 'user_id', 'inner')

In [11]:
data_filtered = data_filtered.withColumn('year', F.col('year').cast('int'))

In [12]:
columns_to_scale = ['year', 'price']

assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(data_filtered)
scaled_data = scalerModel.transform(data_filtered)

scaled_data = scaled_data.drop(*['year', 'price', 'year_vec', 'price_vec'])

udf_extract_double = udf(lambda vector: vector.tolist()[0], FloatType())
scaled_data = scaled_data.withColumn("year", udf_extract_double("year_scaled")).withColumn("price", udf_extract_double("price_scaled"))
scaled_data = scaled_data.drop(*['year_scaled','price_scaled','count', 'category_id'])

In [13]:
unique_user_ids = scaled_data.select('user_id').distinct().rdd.flatMap(lambda x: x).collect()
user_id_mapping = {_id: idx for idx, _id in enumerate(unique_user_ids)}
user_id_mapper = F.udf(lambda x: user_id_mapping[x], IntegerType())
mapped_data = scaled_data.withColumn('user_id', user_id_mapper('user_id'))

In [14]:
unique_product_ids = mapped_data.select('product_id').distinct().rdd.flatMap(lambda x: x).collect()
product_id_mapping = {_id: idx for idx, _id in enumerate(unique_product_ids)}
product_id_mapper = F.udf(lambda x: product_id_mapping[x], IntegerType())
mapped_data = mapped_data.withColumn('product_id', product_id_mapper('product_id'))

In [15]:
tokenizer = RegexTokenizer(inputCol='category_code', outputCol='tokenized_category', pattern="\.")
word2Vec = Word2Vec(vectorSize=16, seed=42, minCount=1, inputCol='tokenized_category', outputCol='category_embedding')
embedding_pipeline = Pipeline(stages=[tokenizer, word2Vec]).fit(mapped_data)
mapped_data = embedding_pipeline.transform(mapped_data)
mapped_data = mapped_data.drop('category_code').withColumnRenamed("category_embedding", "category_code")

In [16]:
unique_brand_ids = mapped_data.select('brand').distinct().rdd.flatMap(lambda x: x).collect()
brand_id_mapping = {_id: idx for idx, _id in enumerate(unique_brand_ids)}
brand_id_mapper = F.udf(lambda x: brand_id_mapping[x], IntegerType())
mapped_data = mapped_data.withColumn('brand', brand_id_mapper('brand'))

In [31]:
# N_users = mapped_data.select('user_id').distinct().count()
# N_products = mapped_data.select('product_id').distinct().count()
# N_brands = mapped_data.select('brand').distinct().count()
N_users = 97917
N_products = 39699
N_brands = 34

In [32]:
N_users, N_products, N_brands

(97917, 39699, 34)

In [19]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('user_id').orderBy('event_time')

df_with_row_number = mapped_data.withColumn('row_number', F.row_number().over(window_spec))

user_count_window = Window.partitionBy('user_id')
total_user_count = F.count('user_id').over(user_count_window)

# Calculate the 80% threshold for each user group
train_test_ratio = 0.8
split_threshold = (total_user_count * train_test_ratio).cast('int')

# Assign a 'train' or 'test' label based on the row number and the split threshold
df_labeled = df_with_row_number.withColumn('split', F.when(F.col('row_number') <= split_threshold, 'train').otherwise('test'))

# Split the DataFrame into train and test sets based on the label
train_df = df_labeled.filter(F.col('split') == 'train').drop('row_number', 'split')
test_df = df_labeled.filter(F.col('split') == 'test').drop('row_number', 'split')

In [20]:
train_df = train_df.select(user_features + item_features + session_features + [target])
test_df = test_df.select(user_features + item_features + session_features + [target])

In [21]:
import os
def run(command):
    return os.popen(command).read()

train_df.coalesce(1)\
        .write\
        .mode("overwrite")\
        .format("json")\
        .save("project/data/train")

run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_df.coalesce(1)\
       .write\
       .mode("overwrite")\
      .format("json")\
      .save("project/data/test")

run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

# Model 1

In [10]:
user_features = ['user_id']
item_features = ['product_id', 'category_code', 'brand', 'price']
session_features = ['year', 'month_sin', 'month_cos', 'day_sin', 'day_cos', 'hour_sin', 'hour_cos', 'minute_sin', 'minute_cos', 'second_sin', 'second_cos']
target = 'rating'

In [6]:
train_df = spark.read.json('project/data/train') # !!!!!
test_df = spark.read.json('project/data/test')

In [7]:
train_df = train_df.withColumn("category_code", F.col("category_code").getField("values"))

In [15]:
from pyspark.ml.linalg import Vectors, VectorUDT

array_to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())
train_df = train_df.withColumn("category_code", array_to_vector(F.col("category_code")))

In [16]:
train_df.show(1, truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+---------+----------+------+-------------------+------------------+-------+----+
|brand|category_code                                                                                                                                                                                                                                                                                                              |day_cos           |day_sin            |hour_cos           |hour_sin           |minute_cos        |minute_sin        

In [17]:
vector_assembler = VectorAssembler(inputCols=user_features + item_features + session_features, outputCol='features')

df2 = vector_assembler.transform(train_df)

In [20]:
df2.select('features').show(1, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                                               

In [23]:
# from pyspark.ml.evaluation import RegressionEvaluator 
# from pyspark.ml.recommendation import ALS 

# als = ALS(maxIter=5,
#           userCol="user_id",
#           itemCol="product_id",
#           ratingCol="rating",
#           coldStartStrategy="drop")

# als_model = als.fit(train_df)

# predictions = als_model.transform(test_df)

In [24]:
# evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction")
# rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
# r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
# print("Root-mean-square error = " + str(rmse) + '\n' + 'R2 = ' + str(r2))

In [25]:
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 
# import numpy as np


# grid = ParamGridBuilder()
# grid = grid.addGrid(als.blockSize, [1024, 2048, 4096])\
#            .addGrid(als.regParam, np.logspace(1e-3,1e-1))\
#                     .build()

# evaluator1 = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")

# cv = CrossValidator(estimator=als,
#                     estimatorParamMaps=grid,
#                     evaluator=evaluator1,
#                     parallelism=5,
#                     numFolds=3)

# cv_model_als = cv.fit(train_df)
# model1 = cv_model_als.bestModel

In [26]:
# model1.write().overwrite().save("project/models/model1")

# run("hdfs dfs -get project/models/model1 models/model1")

In [27]:
# predictions = model1.transform(test_df)

In [None]:
# predictions.select("label", "prediction")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("csv")\
#     .option("sep", ",")\
#     .option("header","true")\
#     .save("project/output/model1_predictions")

# run("hdfs dfs -cat project/output/model1_predictions/*.csv > output/model1_predictions.csv")

In [28]:
# rmse1 = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
# r21 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
# print("Root-mean-square error = " + str(rmse1) + '\n' + 'R2 = ' + str(r21))

# Model 2

In [29]:
# N_brands = 97917
# N_products = 39699
# N_users = 34

In [33]:
from Net import Content_based_filtering
from sparktorch import serialize_torch_obj, SparkTorch
import torch
import torch.nn as nn

model = Content_based_filtering(n_brands=N_brands, n_items=N_products, n_users=N_users, dim=128, brand_dim=16)

torch_obj = serialize_torch_obj(
    model=model,
    criterion=nn.L1Loss(),
    optimizer=torch.optim.Adam,
    lr=0.0001
    # model_parameters={''}
)

spark_model = SparkTorch(
    inputCol='features',
    labelCol=target,
    predictionCol='prediction',
    torchObj=torch_obj,
    iters=1,
    miniBatch=16,
    verbose=1,
)

vector_assembler = VectorAssembler(inputCols=user_features + item_features + session_features, outputCol='features')

spark_model = Pipeline(stages=[vector_assembler, spark_model]).fit(train_df)

In [34]:
from pyspark.sql.functions import monotonically_increasing_id, col

def k_fold_split(df, k=3):
    df_with_id = df.withColumn("id", monotonically_increasing_id())
    total_rows = df_with_id.count()
    fold_size = total_rows // k
    for n in range(k):
        test_fold_start = n * fold_size
        test_fold_end = (n + 1) * fold_size if n != k - 1 else total_rows

        test_fold = df_with_id.filter((col("id") >= test_fold_start) & (col("id") < test_fold_end))

        training_fold = df_with_id.filter((col("id") < test_fold_start) | (col("id") >= test_fold_end))

        test_fold = test_fold.drop("id")
        training_fold = training_fold.drop("id")

        yield (test_fold, training_fold)

In [None]:
from sparktorch import PysparkPipelineWrapper
from pyspark.ml.evaluation import RegressionEvaluator 

brand_dims = [8, 16]
dims = [64, 128]

evaluator2 = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
vector_assembler = VectorAssembler(inputCols=user_features + item_features + session_features, outputCol='features')
best_model = None
best_rmse = 100

iteration = 0

for test_data, train_data in k_fold_split(train_df):
    for brand_dim in brand_dims:
        for dim in dims:
            model = Content_based_filtering(n_brands=N_brands, 
                                            n_items=N_products, 
                                            n_users=N_users,
                                            brand_dim=brand_dim,
                                            dim=dim,
                                           )
            torch_obj = serialize_torch_obj(
                model=model,
                criterion=nn.L1Loss(),
                optimizer=torch.optim.Adam,
                lr=0.0001,
            )
            spark_model = SparkTorch(
                inputCol='features',
                labelCol=target,
                predictionCol='predictions',
                torchObj=torch_obj,
                iters=1,
                miniBatch=16,
                verbose=1,
            )
            spark_model = Pipeline(stages=[vector_assembler, spark_model]).fit(train_data)
            trained_model = PysparkPipelineWrapper.unwrap(spark_model)
            preds = trained_model.transform(test_data)
            rmse = evaluator2.evaluate(preds)
            if rmse < best_rmse:
                best_rmse = rmse
                best_model = spark_model
            iteration += 1
            print(iteration)
model2 = best_model

In [None]:
model2.write().overwrite().save("project/models/model2")

run("hdfs dfs -get project/models/model2 models/model2")

In [None]:
predictions = PysparkPipelineWrapper.unwrap(model2).transform(test_df)

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions")

run("hdfs dfs -cat project/output/model2_predictions/*.csv > output/model2_predictions.csv")

In [None]:
rmse2 = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r22 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("Root-mean-square error = " + str(rmse2) + '\n' + 'R2 = ' + str(r22))

In [None]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

In [21]:
spark.stop()